Skip to main content

fedimint_api_client/api/
mod.rs

1mod error;
2pub mod global_api;
3
4use std::collections::{BTreeMap, BTreeSet, HashMap};
5use std::fmt::Debug;
6use std::future::pending;
7use std::pin::Pin;
8use std::result;
9use std::sync::Arc;
10
11use anyhow::{Context, anyhow};
12use bitcoin::secp256k1;
13pub use error::{FederationError, OutputOutcomeError};
14pub use fedimint_connectors::ServerResult;
15pub use fedimint_connectors::error::ServerError;
16use fedimint_connectors::{
17    ConnectionPool, Connectivity, ConnectorRegistry, DynGuaridianConnection, IGuardianConnection,
18    PeerStatus,
19};
20use fedimint_core::admin_client::{GuardianConfigBackup, ServerStatusLegacy, SetupStatus};
21use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
22use fedimint_core::core::backup::SignedBackupRequest;
23use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, ModuleKind, OutputOutcome};
24use fedimint_core::encoding::{Decodable, Encodable};
25use fedimint_core::invite_code::InviteCode;
26use fedimint_core::module::audit::AuditSummary;
27use fedimint_core::module::registry::ModuleDecoderRegistry;
28use fedimint_core::module::{
29    ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
30};
31use fedimint_core::net::api_announcement::SignedApiAnnouncement;
32use fedimint_core::net::guardian_metadata::SignedGuardianMetadata;
33use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
34use fedimint_core::task::{MaybeSend, MaybeSync};
35use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
36use fedimint_core::util::backoff_util::api_networking_backoff;
37use fedimint_core::util::{FmtCompact as _, SafeUrl};
38use fedimint_core::{
39    ChainId, NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define,
40    util,
41};
42use fedimint_logging::LOG_CLIENT_NET_API;
43use fedimint_metrics::HistogramExt as _;
44use futures::stream::{BoxStream, FuturesUnordered};
45use futures::{Future, StreamExt};
46use global_api::with_cache::GlobalFederationApiWithCache;
47use jsonrpsee_core::DeserializeOwned;
48use serde::{Deserialize, Serialize};
49use serde_json::Value;
50use tokio::sync::watch;
51use tokio_stream::wrappers::WatchStream;
52use tracing::{debug, instrument, trace, warn};
53
54use crate::metrics::{CLIENT_API_REQUEST_DURATION_SECONDS, CLIENT_API_REQUESTS_TOTAL};
55use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
56
57pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
58
59pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
60    ApiVersion { major: 0, minor: 1 };
61
62pub const VERSION_THAT_INTRODUCED_AWAIT_OUTPUTS_OUTCOMES: ApiVersion = ApiVersion::new(0, 8);
63pub type FederationResult<T> = Result<T, FederationError>;
64pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
65
66pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
67
68/// Set of api versions for each component (core + modules)
69///
70/// E.g. result of federated common api versions discovery.
71#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
72pub struct ApiVersionSet {
73    pub core: ApiVersion,
74    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
75}
76
77/// An API (module or global) that can query a federation
78#[apply(async_trait_maybe_send!)]
79pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
80    /// List of all federation peers for the purpose of iterating each peer
81    /// in the federation.
82    ///
83    /// The underlying implementation is responsible for knowing how many
84    /// and `PeerId`s of each. The caller of this interface most probably
85    /// have some idea as well, but passing this set across every
86    /// API call to the federation would be inconvenient.
87    fn all_peers(&self) -> &BTreeSet<PeerId>;
88
89    /// `PeerId` of the Guardian node, if set
90    ///
91    /// This is for using Client in a "Admin" mode, making authenticated
92    /// calls to own `fedimintd` instance.
93    fn self_peer(&self) -> Option<PeerId>;
94
95    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
96
97    /// Make request to a specific federation peer by `peer_id`
98    async fn request_raw(
99        &self,
100        peer_id: PeerId,
101        method: &str,
102        params: &ApiRequestErased,
103    ) -> ServerResult<Value>;
104
105    /// Returns a stream of connection status for each peer.
106    ///
107    /// The stream emits a new value whenever either the set of active
108    /// connections in the pool changes, or a connector observes a
109    /// transport-level path change on an existing connection (e.g. an iroh
110    /// connection upgrading from relay to direct).
111    ///
112    /// Each peer's entry is [`PeerStatus::Disconnected`] if there is no
113    /// active pooled connection, or [`PeerStatus::Connected`] carrying the
114    /// current [`fedimint_connectors::Connectivity`] otherwise. If the pool
115    /// reports the peer as active but the underlying connector no longer
116    /// has a known path for it (a disconnection racing the pool update),
117    /// the peer is reported as [`PeerStatus::Disconnected`] — the next
118    /// emission will confirm.
119    fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, PeerStatus>>;
120    /// Wait for some connections being initialized
121    ///
122    /// This is useful to avoid initializing networking by
123    /// tasks that are not high priority.
124    async fn wait_for_initialized_connections(&self);
125
126    /// Get or create a connection to a specific peer, returning the connection
127    /// object.
128    ///
129    /// This can be used to monitor connection status and await disconnection
130    /// for proactive reconnection strategies.
131    async fn get_peer_connection(&self, peer_id: PeerId) -> ServerResult<DynGuaridianConnection>;
132}
133
134/// An extension trait allowing to making federation-wide API call on top
135/// [`IRawFederationApi`].
136#[apply(async_trait_maybe_send!)]
137pub trait FederationApiExt: IRawFederationApi {
138    async fn request_single_peer<Ret>(
139        &self,
140        method: String,
141        params: ApiRequestErased,
142        peer: PeerId,
143    ) -> ServerResult<Ret>
144    where
145        Ret: DeserializeOwned,
146    {
147        self.request_raw(peer, &method, &params)
148            .await
149            .and_then(|v| {
150                serde_json::from_value(v)
151                    .map_err(|e| ServerError::ResponseDeserialization(e.into()))
152            })
153    }
154
155    async fn request_single_peer_federation<FedRet>(
156        &self,
157        method: String,
158        params: ApiRequestErased,
159        peer_id: PeerId,
160    ) -> FederationResult<FedRet>
161    where
162        FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
163    {
164        self.request_raw(peer_id, &method, &params)
165            .await
166            .and_then(|v| {
167                serde_json::from_value(v)
168                    .map_err(|e| ServerError::ResponseDeserialization(e.into()))
169            })
170            .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
171    }
172
173    /// Make an aggregate request to federation, using `strategy` to logically
174    /// merge the responses.
175    #[instrument(target = LOG_CLIENT_NET_API, skip_all, fields(method=method))]
176    async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
177        &self,
178        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
179        method: String,
180        params: ApiRequestErased,
181    ) -> FederationResult<FR> {
182        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
183        // completed results from it and we don't do any `await`s when
184        // processing them, it should be totally OK.
185        #[cfg(not(target_family = "wasm"))]
186        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
187        #[cfg(target_family = "wasm")]
188        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
189
190        for peer in self.all_peers() {
191            futures.push(Box::pin({
192                let method = &method;
193                let params = &params;
194                async move {
195                    let result = self
196                        .request_single_peer(method.clone(), params.clone(), *peer)
197                        .await;
198
199                    (*peer, result)
200                }
201            }));
202        }
203
204        let mut peer_errors = BTreeMap::new();
205        let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
206
207        loop {
208            let (peer, result) = futures
209                .next()
210                .await
211                .expect("Query strategy ran out of peers to query without returning a result");
212
213            match result {
214                Ok(response) => match strategy.process(peer, response) {
215                    QueryStep::Retry(peers) => {
216                        for peer in peers {
217                            futures.push(Box::pin({
218                                let method = &method;
219                                let params = &params;
220                                async move {
221                                    let result = self
222                                        .request_single_peer(method.clone(), params.clone(), peer)
223                                        .await;
224
225                                    (peer, result)
226                                }
227                            }));
228                        }
229                    }
230                    QueryStep::Success(response) => return Ok(response),
231                    QueryStep::Failure(e) => {
232                        peer_errors.insert(peer, e);
233                    }
234                    QueryStep::Continue => {}
235                },
236                Err(e) => {
237                    e.report_if_unusual(peer, "RequestWithStrategy");
238                    peer_errors.insert(peer, e);
239                }
240            }
241
242            if peer_errors.len() == peer_error_threshold {
243                return Err(FederationError::peer_errors(
244                    method.clone(),
245                    params.params.clone(),
246                    peer_errors,
247                ));
248            }
249        }
250    }
251
252    #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
253    async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
254        &self,
255        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
256        method: String,
257        params: ApiRequestErased,
258    ) -> FR {
259        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
260        // completed results from it and we don't do any `await`s when
261        // processing them, it should be totally OK.
262        #[cfg(not(target_family = "wasm"))]
263        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
264        #[cfg(target_family = "wasm")]
265        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
266
267        for peer in self.all_peers() {
268            futures.push(Box::pin({
269                let method = &method;
270                let params = &params;
271                async move {
272                    let response = util::retry(
273                        format!("api-request-{method}-{peer}"),
274                        api_networking_backoff(),
275                        || async {
276                            self.request_single_peer(method.clone(), params.clone(), *peer)
277                                .await
278                                .inspect_err(|e| {
279                                    e.report_if_unusual(*peer, "QueryWithStrategyRetry");
280                                })
281                                .map_err(|e| anyhow!(e.to_string()))
282                        },
283                    )
284                    .await
285                    .expect("Number of retries has no limit");
286
287                    (*peer, response)
288                }
289            }));
290        }
291
292        loop {
293            let (peer, response) = match futures.next().await {
294                Some(t) => t,
295                None => pending().await,
296            };
297
298            match strategy.process(peer, response) {
299                QueryStep::Retry(peers) => {
300                    for peer in peers {
301                        futures.push(Box::pin({
302                            let method = &method;
303                            let params = &params;
304                            async move {
305                                let response = util::retry(
306                                    format!("api-request-{method}-{peer}"),
307                                    api_networking_backoff(),
308                                    || async {
309                                        self.request_single_peer(
310                                            method.clone(),
311                                            params.clone(),
312                                            peer,
313                                        )
314                                        .await
315                                        .inspect_err(|err| {
316                                            if err.is_unusual() {
317                                                debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
318                                            }
319                                        })
320                                        .map_err(|e| anyhow!(e.to_string()))
321                                    },
322                                )
323                                .await
324                                .expect("Number of retries has no limit");
325
326                                (peer, response)
327                            }
328                        }));
329                    }
330                }
331                QueryStep::Success(response) => return response,
332                QueryStep::Failure(e) => {
333                    warn!(target: LOG_CLIENT_NET_API, "Query strategy returned non-retryable failure for peer {peer}: {e}");
334                }
335                QueryStep::Continue => {}
336            }
337        }
338    }
339
340    async fn request_current_consensus<Ret>(
341        &self,
342        method: String,
343        params: ApiRequestErased,
344    ) -> FederationResult<Ret>
345    where
346        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
347    {
348        self.request_with_strategy(
349            ThresholdConsensus::new(self.all_peers().to_num_peers()),
350            method,
351            params,
352        )
353        .await
354    }
355
356    async fn request_current_consensus_retry<Ret>(
357        &self,
358        method: String,
359        params: ApiRequestErased,
360    ) -> Ret
361    where
362        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
363    {
364        self.request_with_strategy_retry(
365            ThresholdConsensus::new(self.all_peers().to_num_peers()),
366            method,
367            params,
368        )
369        .await
370    }
371
372    async fn request_admin<Ret>(
373        &self,
374        method: &str,
375        params: ApiRequestErased,
376        auth: ApiAuth,
377    ) -> FederationResult<Ret>
378    where
379        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
380    {
381        let Some(self_peer_id) = self.self_peer() else {
382            return Err(FederationError::general(
383                method,
384                params,
385                anyhow::format_err!("Admin peer_id not set"),
386            ));
387        };
388
389        self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
390            .await
391    }
392
393    async fn request_admin_no_auth<Ret>(
394        &self,
395        method: &str,
396        params: ApiRequestErased,
397    ) -> FederationResult<Ret>
398    where
399        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
400    {
401        let Some(self_peer_id) = self.self_peer() else {
402            return Err(FederationError::general(
403                method,
404                params,
405                anyhow::format_err!("Admin peer_id not set"),
406            ));
407        };
408
409        self.request_single_peer_federation(method.into(), params, self_peer_id)
410            .await
411    }
412}
413
414#[apply(async_trait_maybe_send!)]
415impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
416
417/// Trait marker for the module (non-global) endpoints
418pub trait IModuleFederationApi: IRawFederationApi {}
419
420dyn_newtype_define! {
421    #[derive(Clone)]
422    pub DynModuleApi(Arc<IModuleFederationApi>)
423}
424
425dyn_newtype_define! {
426    #[derive(Clone)]
427    pub DynGlobalApi(Arc<IGlobalFederationApi>)
428}
429
430impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
431    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
432        self.inner.as_ref()
433    }
434}
435
436impl DynGlobalApi {
437    pub fn new(
438        connectors: ConnectorRegistry,
439        peers: BTreeMap<PeerId, SafeUrl>,
440        api_secret: Option<&str>,
441    ) -> anyhow::Result<Self> {
442        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
443            connectors, peers, None, api_secret,
444        ))
445        .into())
446    }
447    pub fn new_admin(
448        connectors: ConnectorRegistry,
449        peer: PeerId,
450        url: SafeUrl,
451        api_secret: Option<&str>,
452    ) -> anyhow::Result<DynGlobalApi> {
453        Ok(GlobalFederationApiWithCache::new(FederationApi::new(
454            connectors,
455            [(peer, url)].into(),
456            Some(peer),
457            api_secret,
458        ))
459        .into())
460    }
461
462    pub fn new_admin_setup(connectors: ConnectorRegistry, url: SafeUrl) -> anyhow::Result<Self> {
463        // PeerIds are used only for informational purposes, but just in case, make a
464        // big number so it stands out
465        Self::new_admin(
466            connectors,
467            PeerId::from(1024),
468            url,
469            // Setup does not have api secrets yet
470            None,
471        )
472    }
473}
474
475/// The API for the global (non-module) endpoints
476#[apply(async_trait_maybe_send!)]
477pub trait IGlobalFederationApi: IRawFederationApi {
478    async fn submit_transaction(
479        &self,
480        tx: Transaction,
481    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
482
483    async fn await_block(
484        &self,
485        block_index: u64,
486        decoders: &ModuleDecoderRegistry,
487    ) -> anyhow::Result<SessionOutcome>;
488
489    async fn get_session_status(
490        &self,
491        block_index: u64,
492        decoders: &ModuleDecoderRegistry,
493        core_api_version: ApiVersion,
494        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
495    ) -> anyhow::Result<SessionStatus>;
496
497    async fn session_count(&self) -> FederationResult<u64>;
498
499    async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
500
501    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
502
503    async fn download_backup(
504        &self,
505        id: &secp256k1::PublicKey,
506    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
507
508    /// Sets the password used to decrypt the configs and authenticate
509    ///
510    /// Must be called first before any other calls to the API
511    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
512
513    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
514
515    async fn set_local_params(
516        &self,
517        name: String,
518        federation_name: Option<String>,
519        disable_base_fees: Option<bool>,
520        enabled_modules: Option<BTreeSet<ModuleKind>>,
521        federation_size: Option<u32>,
522        auth: ApiAuth,
523    ) -> FederationResult<String>;
524
525    async fn add_peer_connection_info(
526        &self,
527        info: String,
528        auth: ApiAuth,
529    ) -> FederationResult<String>;
530
531    /// Reset the peer setup codes during the federation setup process
532    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
533
534    /// Returns the setup code if `set_local_params` was already called
535    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
536
537    /// Runs DKG, can only be called once after configs have been generated in
538    /// `get_consensus_config_gen_params`.  If DKG fails this returns a 500
539    /// error and config gen must be restarted.
540    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
541
542    /// Returns the status of the server
543    async fn status(&self) -> FederationResult<StatusResponse>;
544
545    /// Show an audit across all modules
546    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
547
548    /// Download the guardian config to back it up
549    async fn guardian_config_backup(&self, auth: ApiAuth)
550    -> FederationResult<GuardianConfigBackup>;
551
552    /// Check auth credentials
553    async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
554
555    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
556
557    /// Publish our signed API announcement to other guardians
558    async fn submit_api_announcement(
559        &self,
560        peer_id: PeerId,
561        announcement: SignedApiAnnouncement,
562    ) -> FederationResult<()>;
563
564    async fn api_announcements(
565        &self,
566        guardian: PeerId,
567    ) -> ServerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
568
569    async fn sign_api_announcement(
570        &self,
571        api_url: SafeUrl,
572        auth: ApiAuth,
573    ) -> FederationResult<SignedApiAnnouncement>;
574
575    /// Publish our signed guardian metadata to other guardians
576    async fn submit_guardian_metadata(
577        &self,
578        peer_id: PeerId,
579        metadata: SignedGuardianMetadata,
580    ) -> FederationResult<()>;
581
582    async fn guardian_metadata(
583        &self,
584        guardian: PeerId,
585    ) -> ServerResult<BTreeMap<PeerId, SignedGuardianMetadata>>;
586
587    async fn sign_guardian_metadata(
588        &self,
589        metadata: fedimint_core::net::guardian_metadata::GuardianMetadata,
590        auth: ApiAuth,
591    ) -> FederationResult<SignedGuardianMetadata>;
592
593    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
594
595    /// Returns the fedimintd version a peer is running
596    async fn fedimintd_version(&self, peer_id: PeerId) -> ServerResult<String>;
597
598    /// Fetch the backup statistics from the federation (admin endpoint)
599    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
600
601    /// Get the invite code for the federation guardian.
602    /// For instance, useful after DKG
603    async fn get_invite_code(&self, guardian: PeerId) -> ServerResult<InviteCode>;
604
605    /// Change the password used to encrypt the configs and for guardian
606    /// authentication
607    async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()>;
608
609    /// Returns the chain ID (bitcoin block hash at height 1) from the
610    /// federation
611    async fn chain_id(&self) -> FederationResult<ChainId>;
612}
613
614pub fn deserialize_outcome<R>(
615    outcome: &SerdeOutputOutcome,
616    module_decoder: &Decoder,
617) -> OutputOutcomeResult<R>
618where
619    R: OutputOutcome + MaybeSend,
620{
621    let dyn_outcome = outcome
622        .try_into_inner_known_module_kind(module_decoder)
623        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
624
625    let source_instance = dyn_outcome.module_instance_id();
626
627    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
628        let target_type = std::any::type_name::<R>();
629        OutputOutcomeError::ResponseDeserialization(anyhow!(
630            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
631        ))
632    })
633}
634
635/// Federation API client
636///
637/// The core underlying object used to make API requests to a federation.
638///
639/// It has an `connectors` handle to actually making outgoing connections
640/// to given URLs, and knows which peers there are and what URLs to connect to
641/// to reach them.
642// TODO: As it is currently it mixes a bit the role of connecting to "peers" with
643// general purpose outgoing connection. Not a big deal, but might need refactor
644// in the future.
645#[derive(Clone, Debug)]
646pub struct FederationApi {
647    /// Map of known URLs to use to connect to peers
648    peers: BTreeMap<PeerId, SafeUrl>,
649    /// List of peer ids, redundant to avoid collecting all the time
650    peers_keys: BTreeSet<PeerId>,
651    /// Our own [`PeerId`] to use when making admin apis
652    admin_id: Option<PeerId>,
653    /// Set when this API is used to communicate with a module
654    module_id: Option<ModuleInstanceId>,
655    /// Api secret of the federation
656    api_secret: Option<String>,
657    /// Connection pool
658    connection_pool: ConnectionPool<dyn IGuardianConnection>,
659}
660
661impl FederationApi {
662    pub fn new(
663        connectors: ConnectorRegistry,
664        peers: BTreeMap<PeerId, SafeUrl>,
665        admin_peer_id: Option<PeerId>,
666        api_secret: Option<&str>,
667    ) -> Self {
668        Self {
669            peers_keys: peers.keys().copied().collect(),
670            peers,
671            admin_id: admin_peer_id,
672            module_id: None,
673            api_secret: api_secret.map(ToOwned::to_owned),
674            connection_pool: ConnectionPool::new(connectors),
675        }
676    }
677
678    async fn get_or_create_connection(
679        &self,
680        url: &SafeUrl,
681        api_secret: Option<&str>,
682    ) -> ServerResult<DynGuaridianConnection> {
683        self.connection_pool
684            .get_or_create_connection(url, api_secret, |url, api_secret, connectors| async move {
685                let conn = connectors
686                    .connect_guardian(&url, api_secret.as_deref())
687                    .await?;
688                Ok(conn)
689            })
690            .await
691    }
692
693    async fn request(
694        &self,
695        peer: PeerId,
696        method: ApiMethod,
697        request: ApiRequestErased,
698    ) -> ServerResult<Value> {
699        trace!(target: LOG_CLIENT_NET_API, %peer, %method, "Api request");
700        let url = self
701            .peers
702            .get(&peer)
703            .ok_or_else(|| ServerError::InvalidPeerId { peer_id: peer })?;
704        let conn = self
705            .get_or_create_connection(url, self.api_secret.as_deref())
706            .await
707            .context("Failed to connect to peer")
708            .map_err(ServerError::Connection)?;
709
710        let method_str = method.to_string();
711        let peer_str = peer.to_string();
712        let timer = CLIENT_API_REQUEST_DURATION_SECONDS
713            .with_label_values(&[&method_str, &peer_str])
714            .start_timer_ext();
715
716        let res = conn.request(method.clone(), request).await;
717
718        timer.observe_duration();
719
720        let result_label = if res.is_ok() { "success" } else { "error" }.to_string();
721        CLIENT_API_REQUESTS_TOTAL
722            .with_label_values(&[&method_str, &peer_str, &result_label])
723            .inc();
724
725        trace!(target: LOG_CLIENT_NET_API, ?method, res_ok = res.is_ok(), "Api response");
726
727        res
728    }
729
730    /// Get receiver for changes in the active connections
731    ///
732    /// This allows real-time monitoring of connection status.
733    pub fn get_active_connection_receiver(&self) -> watch::Receiver<BTreeSet<SafeUrl>> {
734        self.connection_pool.get_active_connection_receiver()
735    }
736}
737
738impl IModuleFederationApi for FederationApi {}
739
740#[apply(async_trait_maybe_send!)]
741impl IRawFederationApi for FederationApi {
742    fn all_peers(&self) -> &BTreeSet<PeerId> {
743        &self.peers_keys
744    }
745
746    fn self_peer(&self) -> Option<PeerId> {
747        self.admin_id
748    }
749
750    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
751        FederationApi {
752            api_secret: self.api_secret.clone(),
753            peers: self.peers.clone(),
754            peers_keys: self.peers_keys.clone(),
755            admin_id: self.admin_id,
756            module_id: Some(id),
757            connection_pool: self.connection_pool.clone(),
758        }
759        .into()
760    }
761
762    #[instrument(
763        target = LOG_CLIENT_NET_API,
764        skip_all,
765        fields(
766            peer_id = %peer_id,
767            method = %method,
768            params = %params.params,
769        )
770    )]
771    async fn request_raw(
772        &self,
773        peer_id: PeerId,
774        method: &str,
775        params: &ApiRequestErased,
776    ) -> ServerResult<Value> {
777        let method = match self.module_id {
778            Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
779            None => ApiMethod::Core(method.to_string()),
780        };
781
782        self.request(peer_id, method, params.clone()).await
783    }
784
785    fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, PeerStatus>> {
786        let peers = self.peers.clone();
787        let pool = self.connection_pool.clone();
788        let active_rx = self.connection_pool.get_active_connection_receiver();
789
790        // Tick on either (a) a change to the set of active pooled
791        // connections, or (b) a transport-level path change reported by
792        // the underlying connectors (e.g. iroh relay → direct). Both
793        // streams emit their current value immediately on subscription,
794        // so consumers see a snapshot right away.
795        let membership_ticks = WatchStream::new(active_rx.clone()).map(|_| ());
796        let path_change_ticks =
797            WatchStream::new(self.connection_pool.connectivity_change_notifier()).map(|_| ());
798        let ticks = futures::stream::select(membership_ticks, path_change_ticks);
799
800        ticks
801            .map(move |()| {
802                let active_urls = active_rx.borrow().clone();
803                peers
804                    .iter()
805                    .map(|(peer_id, url)| {
806                        let status = if active_urls.contains(url) {
807                            // The active-set snapshot and the per-connector
808                            // path state are separate sources of truth, so a
809                            // disconnection can race between them. If we saw
810                            // the url as active but the connector no longer
811                            // reports a known path, treat the race as
812                            // "disconnection won" and report Disconnected
813                            // for a consistent view — the next stream tick
814                            // will confirm.
815                            match pool.connectivity(url) {
816                                Connectivity::Unknown => PeerStatus::Disconnected,
817                                connectivity => PeerStatus::Connected(connectivity),
818                            }
819                        } else {
820                            PeerStatus::Disconnected
821                        };
822                        (*peer_id, status)
823                    })
824                    .collect()
825            })
826            .boxed()
827    }
828    async fn wait_for_initialized_connections(&self) {
829        self.connection_pool
830            .wait_for_initialized_connections()
831            .await;
832    }
833
834    async fn get_peer_connection(&self, peer_id: PeerId) -> ServerResult<DynGuaridianConnection> {
835        let url = self
836            .peers
837            .get(&peer_id)
838            .ok_or_else(|| ServerError::InvalidPeerId { peer_id })?;
839        self.get_or_create_connection(url, self.api_secret.as_deref())
840            .await
841    }
842}
843
844/// The status of a server, including how it views its peers
845#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
846pub struct LegacyFederationStatus {
847    pub session_count: u64,
848    pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
849    pub peers_online: u64,
850    pub peers_offline: u64,
851    /// This should always be 0 if everything is okay, so a monitoring tool
852    /// should generate an alert if this is not the case.
853    pub peers_flagged: u64,
854    pub scheduled_shutdown: Option<u64>,
855}
856
857#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
858pub struct LegacyPeerStatus {
859    pub last_contribution: Option<u64>,
860    pub connection_status: LegacyP2PConnectionStatus,
861    /// Indicates that this peer needs attention from the operator since
862    /// it has not contributed to the consensus in a long time
863    pub flagged: bool,
864}
865
866#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
867#[serde(rename_all = "snake_case")]
868pub enum LegacyP2PConnectionStatus {
869    #[default]
870    Disconnected,
871    Connected,
872}
873
874#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
875pub struct StatusResponse {
876    pub server: ServerStatusLegacy,
877    pub federation: Option<LegacyFederationStatus>,
878}
879
880#[cfg(test)]
881mod tests;