fedimint_api_client/api/
mod.rs

1use core::panic;
2use std::collections::{BTreeMap, BTreeSet, HashMap};
3use std::fmt::Debug;
4use std::iter::once;
5use std::pin::Pin;
6use std::result;
7use std::sync::Arc;
8
9use anyhow::{Context, anyhow};
10#[cfg(all(feature = "tor", not(target_family = "wasm")))]
11use arti_client::{TorAddr, TorClient, TorClientConfig};
12use async_channel::bounded;
13use async_trait::async_trait;
14use base64::Engine as _;
15use bitcoin::hashes::sha256;
16use bitcoin::secp256k1;
17pub use error::{FederationError, OutputOutcomeError, PeerError};
18use fedimint_core::admin_client::{
19    ConfigGenConnectionsRequest, PeerServerParams, ServerStatus, ServerStatusLegacy,
20};
21use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
22use fedimint_core::core::backup::SignedBackupRequest;
23use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
24use fedimint_core::encoding::{Decodable, Encodable};
25use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
26use fedimint_core::module::audit::AuditSummary;
27use fedimint_core::module::registry::ModuleDecoderRegistry;
28use fedimint_core::module::{
29    ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
30};
31use fedimint_core::net::api_announcement::SignedApiAnnouncement;
32use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
33use fedimint_core::task::{MaybeSend, MaybeSync};
34use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
35use fedimint_core::util::backoff_util::api_networking_backoff;
36use fedimint_core::util::{FmtCompact as _, SafeUrl};
37use fedimint_core::{
38    NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
39};
40use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
41use futures::channel::oneshot;
42use futures::future::pending;
43use futures::stream::FuturesUnordered;
44use futures::{Future, StreamExt};
45use global_api::with_cache::GlobalFederationApiWithCache;
46use jsonrpsee_core::DeserializeOwned;
47use jsonrpsee_core::client::ClientT;
48pub use jsonrpsee_core::client::Error as JsonRpcClientError;
49use jsonrpsee_types::ErrorCode;
50#[cfg(target_family = "wasm")]
51use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
52#[cfg(not(target_family = "wasm"))]
53use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
54#[cfg(not(target_family = "wasm"))]
55use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
56use serde::{Deserialize, Serialize};
57use serde_json::Value;
58#[cfg(not(target_family = "wasm"))]
59use tokio_rustls::rustls::RootCertStore;
60#[cfg(all(feature = "tor", not(target_family = "wasm")))]
61use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
62use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
63
64use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
65mod error;
66pub mod global_api;
67pub mod net;
68
69pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
70
71pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
72    ApiVersion { major: 0, minor: 1 };
73
74pub type PeerResult<T> = Result<T, PeerError>;
75pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
76pub type FederationResult<T> = Result<T, FederationError>;
77pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
78
79pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
80
81/// Set of api versions for each component (core + modules)
82///
83/// E.g. result of federated common api versions discovery.
84#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
85pub struct ApiVersionSet {
86    pub core: ApiVersion,
87    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
88}
89
90/// An API (module or global) that can query a federation
91#[apply(async_trait_maybe_send!)]
92pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
93    /// List of all federation peers for the purpose of iterating each peer
94    /// in the federation.
95    ///
96    /// The underlying implementation is responsible for knowing how many
97    /// and `PeerId`s of each. The caller of this interface most probably
98    /// have some idea as well, but passing this set across every
99    /// API call to the federation would be inconvenient.
100    fn all_peers(&self) -> &BTreeSet<PeerId>;
101
102    /// PeerId of the Guardian node, if set
103    ///
104    /// This is for using Client in a "Admin" mode, making authenticated
105    /// calls to own `fedimintd` instance.
106    fn self_peer(&self) -> Option<PeerId>;
107
108    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
109
110    /// Make request to a specific federation peer by `peer_id`
111    async fn request_raw(
112        &self,
113        peer_id: PeerId,
114        method: &str,
115        params: &ApiRequestErased,
116    ) -> PeerResult<Value>;
117}
118
119/// An extension trait allowing to making federation-wide API call on top
120/// [`IRawFederationApi`].
121#[apply(async_trait_maybe_send!)]
122pub trait FederationApiExt: IRawFederationApi {
123    async fn request_single_peer<Ret>(
124        &self,
125        method: String,
126        params: ApiRequestErased,
127        peer: PeerId,
128    ) -> PeerResult<Ret>
129    where
130        Ret: DeserializeOwned,
131    {
132        self.request_raw(peer, &method, &params)
133            .await
134            .and_then(|v| {
135                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
136            })
137    }
138
139    async fn request_single_peer_federation<FedRet>(
140        &self,
141        method: String,
142        params: ApiRequestErased,
143        peer_id: PeerId,
144    ) -> FederationResult<FedRet>
145    where
146        FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
147    {
148        self.request_raw(peer_id, &method, &params)
149            .await
150            .and_then(|v| {
151                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
152            })
153            .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
154    }
155
156    /// Make an aggregate request to federation, using `strategy` to logically
157    /// merge the responses.
158    #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
159    async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
160        &self,
161        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
162        method: String,
163        params: ApiRequestErased,
164    ) -> FederationResult<FR> {
165        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
166        // completed results from it and we don't do any `await`s when
167        // processing them, it should be totally OK.
168        #[cfg(not(target_family = "wasm"))]
169        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
170        #[cfg(target_family = "wasm")]
171        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
172
173        for peer in self.all_peers() {
174            futures.push(Box::pin({
175                let method = &method;
176                let params = &params;
177                async move {
178                    let result = self
179                        .request_single_peer(method.clone(), params.clone(), *peer)
180                        .await;
181
182                    (*peer, result)
183                }
184            }));
185        }
186
187        let mut peer_errors = BTreeMap::new();
188        let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
189
190        loop {
191            let (peer, result) = futures
192                .next()
193                .await
194                .expect("Query strategy ran out of peers to query without returning a result");
195
196            match result {
197                Ok(response) => match strategy.process(peer, response) {
198                    QueryStep::Retry(peers) => {
199                        for peer in peers {
200                            futures.push(Box::pin({
201                                let method = &method;
202                                let params = &params;
203                                async move {
204                                    let result = self
205                                        .request_single_peer(method.clone(), params.clone(), peer)
206                                        .await;
207
208                                    (peer, result)
209                                }
210                            }));
211                        }
212                    }
213                    QueryStep::Success(response) => return Ok(response),
214                    QueryStep::Failure(e) => {
215                        peer_errors.insert(peer, e);
216                    }
217                    QueryStep::Continue => {}
218                },
219                Err(e) => {
220                    e.report_if_unusual(peer, "RequestWithStrategy");
221                    peer_errors.insert(peer, e);
222                }
223            }
224
225            if peer_errors.len() == peer_error_threshold {
226                return Err(FederationError::peer_errors(
227                    method.clone(),
228                    params.params.clone(),
229                    peer_errors,
230                ));
231            }
232        }
233    }
234
235    async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
236        &self,
237        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
238        method: String,
239        params: ApiRequestErased,
240    ) -> FR {
241        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
242        // completed results from it and we don't do any `await`s when
243        // processing them, it should be totally OK.
244        #[cfg(not(target_family = "wasm"))]
245        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
246        #[cfg(target_family = "wasm")]
247        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
248
249        for peer in self.all_peers() {
250            futures.push(Box::pin({
251                let method = &method;
252                let params = &params;
253                async move {
254                    let response = util::retry(
255                        "api-request-{method}-{peer}",
256                        api_networking_backoff(),
257                        || async {
258                            self.request_single_peer(method.clone(), params.clone(), *peer)
259                                .await
260                                .inspect_err(|e| {
261                                    e.report_if_unusual(*peer, "QueryWithStrategyRetry");
262                                })
263                                .map_err(|e| anyhow!(e.to_string()))
264                        },
265                    )
266                    .await
267                    .expect("Number of retries has no limit");
268
269                    (*peer, response)
270                }
271            }));
272        }
273
274        loop {
275            let (peer, response) = match futures.next().await {
276                Some(t) => t,
277                None => pending().await,
278            };
279
280            match strategy.process(peer, response) {
281                QueryStep::Retry(peers) => {
282                    for peer in peers {
283                        futures.push(Box::pin({
284                            let method = &method;
285                            let params = &params;
286                            async move {
287                                let response = util::retry(
288                                    "api-request-{method}-{peer}",
289                                    api_networking_backoff(),
290                                    || async {
291                                        self.request_single_peer(
292                                            method.clone(),
293                                            params.clone(),
294                                            peer,
295                                        )
296                                        .await
297                                        .inspect_err(|err| {
298                                            if err.is_unusual() {
299                                                debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
300                                            }
301                                        })
302                                        .map_err(|e| anyhow!(e.to_string()))
303                                    },
304                                )
305                                .await
306                                .expect("Number of retries has no limit");
307
308                                (peer, response)
309                            }
310                        }));
311                    }
312                }
313                QueryStep::Success(response) => return response,
314                QueryStep::Failure(e) => {
315                    warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
316                }
317                QueryStep::Continue => {}
318            }
319        }
320    }
321
322    async fn request_current_consensus<Ret>(
323        &self,
324        method: String,
325        params: ApiRequestErased,
326    ) -> FederationResult<Ret>
327    where
328        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
329    {
330        self.request_with_strategy(
331            ThresholdConsensus::new(self.all_peers().to_num_peers()),
332            method,
333            params,
334        )
335        .await
336    }
337
338    async fn request_current_consensus_retry<Ret>(
339        &self,
340        method: String,
341        params: ApiRequestErased,
342    ) -> Ret
343    where
344        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
345    {
346        self.request_with_strategy_retry(
347            ThresholdConsensus::new(self.all_peers().to_num_peers()),
348            method,
349            params,
350        )
351        .await
352    }
353
354    async fn request_admin<Ret>(
355        &self,
356        method: &str,
357        params: ApiRequestErased,
358        auth: ApiAuth,
359    ) -> FederationResult<Ret>
360    where
361        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
362    {
363        let Some(self_peer_id) = self.self_peer() else {
364            return Err(FederationError::general(
365                method,
366                params,
367                anyhow::format_err!("Admin peer_id not set"),
368            ));
369        };
370
371        self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
372            .await
373    }
374
375    async fn request_admin_no_auth<Ret>(
376        &self,
377        method: &str,
378        params: ApiRequestErased,
379    ) -> FederationResult<Ret>
380    where
381        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
382    {
383        let Some(self_peer_id) = self.self_peer() else {
384            return Err(FederationError::general(
385                method,
386                params,
387                anyhow::format_err!("Admin peer_id not set"),
388            ));
389        };
390
391        self.request_single_peer_federation(method.into(), params, self_peer_id)
392            .await
393    }
394}
395
396#[apply(async_trait_maybe_send!)]
397impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
398
399/// Trait marker for the module (non-global) endpoints
400pub trait IModuleFederationApi: IRawFederationApi {}
401
402dyn_newtype_define! {
403    #[derive(Clone)]
404    pub DynModuleApi(Arc<IModuleFederationApi>)
405}
406
407dyn_newtype_define! {
408    #[derive(Clone)]
409    pub DynGlobalApi(Arc<IGlobalFederationApi>)
410}
411
412impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
413    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
414        self.inner.as_ref()
415    }
416}
417
418impl DynGlobalApi {
419    pub async fn new_admin(
420        peer: PeerId,
421        url: SafeUrl,
422        api_secret: &Option<String>,
423    ) -> anyhow::Result<DynGlobalApi> {
424        Ok(GlobalFederationApiWithCache::new(
425            ReconnectFederationApi::from_endpoints(once((peer, url)), api_secret, Some(peer))
426                .await?,
427        )
428        .into())
429    }
430
431    // FIXME: (@leonardo) Should we have the option to do DKG and config related
432    // actions through Tor ? Should we add the `Connector` choice to
433    // ConfigParams then ?
434    pub async fn from_pre_peer_id_admin_endpoint(
435        url: SafeUrl,
436        api_secret: &Option<String>,
437    ) -> anyhow::Result<Self> {
438        // PeerIds are used only for informational purposes, but just in case, make a
439        // big number so it stands out
440
441        Self::new_admin(PeerId::from(1024), url, api_secret).await
442    }
443
444    pub async fn from_endpoints(
445        peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
446        api_secret: &Option<String>,
447    ) -> anyhow::Result<Self> {
448        Ok(GlobalFederationApiWithCache::new(
449            ReconnectFederationApi::from_endpoints(peers, api_secret, None).await?,
450        )
451        .into())
452    }
453}
454
455/// The API for the global (non-module) endpoints
456#[apply(async_trait_maybe_send!)]
457pub trait IGlobalFederationApi: IRawFederationApi {
458    async fn submit_transaction(
459        &self,
460        tx: Transaction,
461    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
462
463    async fn await_block(
464        &self,
465        block_index: u64,
466        decoders: &ModuleDecoderRegistry,
467    ) -> anyhow::Result<SessionOutcome>;
468
469    async fn get_session_status(
470        &self,
471        block_index: u64,
472        decoders: &ModuleDecoderRegistry,
473        core_api_version: ApiVersion,
474        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
475    ) -> anyhow::Result<SessionStatus>;
476
477    async fn session_count(&self) -> FederationResult<u64>;
478
479    async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
480
481    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
482
483    async fn download_backup(
484        &self,
485        id: &secp256k1::PublicKey,
486    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
487
488    /// Sets the password used to decrypt the configs and authenticate
489    ///
490    /// Must be called first before any other calls to the API
491    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
492
493    async fn server_status(&self, auth: ApiAuth) -> FederationResult<ServerStatus>;
494
495    async fn set_local_params(
496        &self,
497        name: String,
498        federation_name: Option<String>,
499        auth: ApiAuth,
500    ) -> FederationResult<String>;
501
502    async fn add_peer_connection_info(
503        &self,
504        info: String,
505        auth: ApiAuth,
506    ) -> FederationResult<String>;
507
508    /// During config gen, sets the server connection containing our endpoints
509    ///
510    /// Optionally sends our server info to the config gen leader using
511    /// `add_config_gen_peer`
512    async fn set_config_gen_connections(
513        &self,
514        info: ConfigGenConnectionsRequest,
515        auth: ApiAuth,
516    ) -> FederationResult<()>;
517
518    /// During config gen, used for an API-to-API call that adds a peer's server
519    /// connection info to the leader.
520    ///
521    /// Note this call will fail until the leader has their API running and has
522    /// `set_server_connections` so clients should retry.
523    ///
524    /// This call is not authenticated because it's guardian-to-guardian
525    async fn add_config_gen_peer(&self, peer: PeerServerParams) -> FederationResult<()>;
526
527    /// During config gen, gets all the server connections we've received from
528    /// peers using `add_config_gen_peer`
529    ///
530    /// Could be called on the leader, so it's not authenticated
531    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParams>>;
532
533    /// Runs DKG, can only be called once after configs have been generated in
534    /// `get_consensus_config_gen_params`.  If DKG fails this returns a 500
535    /// error and config gen must be restarted.
536    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
537
538    /// After DKG, returns the hash of the consensus config tweaked with our id.
539    /// We need to share this with all other peers to complete verification.
540    async fn get_verify_config_hash(
541        &self,
542        auth: ApiAuth,
543    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
544
545    /// Updates local state and notify leader that we have verified configs.
546    /// This allows for a synchronization point, before we start consensus.
547    async fn verified_configs(
548        &self,
549        auth: ApiAuth,
550    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
551
552    /// Reads the configs from the disk, starts the consensus server, and shuts
553    /// down the config gen API to start the Fedimint API
554    ///
555    /// Clients may receive an error due to forced shutdown, should call the
556    /// `server_status` to see if consensus has started.
557    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
558
559    /// Returns the status of the server
560    async fn status(&self) -> FederationResult<StatusResponse>;
561
562    /// Show an audit across all modules
563    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
564
565    /// Download the guardian config to back it up
566    async fn guardian_config_backup(&self, auth: ApiAuth)
567    -> FederationResult<GuardianConfigBackup>;
568
569    /// Check auth credentials
570    async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
571
572    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
573
574    /// Publish our signed API announcement to other guardians
575    async fn submit_api_announcement(
576        &self,
577        peer_id: PeerId,
578        announcement: SignedApiAnnouncement,
579    ) -> FederationResult<()>;
580
581    async fn api_announcements(
582        &self,
583        guardian: PeerId,
584    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
585
586    async fn sign_api_announcement(
587        &self,
588        api_url: SafeUrl,
589        auth: ApiAuth,
590    ) -> FederationResult<SignedApiAnnouncement>;
591
592    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
593
594    /// Returns the fedimintd version a peer is running
595    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
596
597    /// Fetch the backup statistics from the federation (admin endpoint)
598    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
599}
600
601pub fn deserialize_outcome<R>(
602    outcome: &SerdeOutputOutcome,
603    module_decoder: &Decoder,
604) -> OutputOutcomeResult<R>
605where
606    R: OutputOutcome + MaybeSend,
607{
608    let dyn_outcome = outcome
609        .try_into_inner_known_module_kind(module_decoder)
610        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
611
612    let source_instance = dyn_outcome.module_instance_id();
613
614    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
615        let target_type = std::any::type_name::<R>();
616        OutputOutcomeError::ResponseDeserialization(anyhow!(
617            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
618        ))
619    })
620}
621
622#[derive(Debug, Clone)]
623pub struct WebsocketConnector {
624    peers: BTreeMap<PeerId, SafeUrl>,
625    api_secret: Option<String>,
626
627    /// List of overrides to use when attempting to connect to given
628    /// `PeerId`
629    ///
630    /// This is useful for testing, or forcing non-default network
631    /// connectivity.
632    pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
633}
634
635impl WebsocketConnector {
636    fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
637        let mut s = Self::new_no_overrides(peers, api_secret);
638
639        for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
640            s = s.with_connection_override(k, v);
641        }
642
643        Ok(s)
644    }
645    pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
646        self.connection_overrides.insert(peer_id, url);
647        self
648    }
649    pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
650        Self {
651            peers,
652            api_secret,
653            connection_overrides: BTreeMap::default(),
654        }
655    }
656}
657
658#[async_trait]
659impl IClientConnector for WebsocketConnector {
660    fn peers(&self) -> BTreeSet<PeerId> {
661        self.peers.keys().copied().collect()
662    }
663
664    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
665        let api_endpoint = match self.connection_overrides.get(&peer_id) {
666            Some(url) => {
667                trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
668                url
669            }
670            None => self.peers.get(&peer_id).ok_or_else(|| {
671                PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
672            })?,
673        };
674
675        #[cfg(not(target_family = "wasm"))]
676        let mut client = {
677            let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
678            let mut root_certs = RootCertStore::empty();
679            root_certs.extend(webpki_roots);
680
681            let tls_cfg = CustomCertStore::builder()
682                .with_root_certificates(root_certs)
683                .with_no_client_auth();
684
685            WsClientBuilder::default()
686                .max_concurrent_requests(u16::MAX as usize)
687                .with_custom_cert_store(tls_cfg)
688        };
689
690        #[cfg(target_family = "wasm")]
691        let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
692
693        if let Some(api_secret) = &self.api_secret {
694            #[cfg(not(target_family = "wasm"))]
695            {
696                // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
697                // but we can set up the headers manually
698                let mut headers = HeaderMap::new();
699
700                let auth = base64::engine::general_purpose::STANDARD
701                    .encode(format!("fedimint:{api_secret}"));
702
703                headers.insert(
704                    "Authorization",
705                    HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
706                );
707
708                client = client.set_headers(headers);
709            }
710            #[cfg(target_family = "wasm")]
711            {
712                // on wasm, url will be handled by the browser, which should take care of
713                // `user:pass@...`
714                let mut url = api_endpoint.clone();
715                url.set_username("fedimint")
716                    .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
717                url.set_password(Some(&api_secret))
718                    .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
719
720                let client = client
721                    .build(url.as_str())
722                    .await
723                    .map_err(|err| PeerError::InternalClientError(err.into()))?;
724
725                return Ok(client.into_dyn());
726            }
727        }
728
729        let client = client
730            .build(api_endpoint.as_str())
731            .await
732            .map_err(|err| PeerError::InternalClientError(err.into()))?;
733
734        Ok(client.into_dyn())
735    }
736}
737
738#[cfg(all(feature = "tor", not(target_family = "wasm")))]
739#[derive(Debug, Clone)]
740pub struct TorConnector {
741    peers: BTreeMap<PeerId, SafeUrl>,
742    api_secret: Option<String>,
743}
744
745#[cfg(all(feature = "tor", not(target_family = "wasm")))]
746impl TorConnector {
747    pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
748        Self { peers, api_secret }
749    }
750}
751
752#[cfg(all(feature = "tor", not(target_family = "wasm")))]
753#[async_trait]
754impl IClientConnector for TorConnector {
755    fn peers(&self) -> BTreeSet<PeerId> {
756        self.peers.keys().copied().collect()
757    }
758
759    #[allow(clippy::too_many_lines)]
760    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
761        let api_endpoint = self
762            .peers
763            .get(&peer_id)
764            .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
765
766        let tor_config = TorClientConfig::default();
767        let tor_client = TorClient::create_bootstrapped(tor_config)
768            .await
769            .map_err(|err| PeerError::InternalClientError(err.into()))?
770            .isolated_client();
771
772        debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
773
774        // TODO: (@leonardo) should we implement our `IntoTorAddr` for `SafeUrl`
775        // instead?
776        let addr = (
777            api_endpoint
778                .host_str()
779                .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
780            api_endpoint
781                .port_or_known_default()
782                .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
783        );
784        let tor_addr = TorAddr::from(addr).map_err(|e| {
785            PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
786        })?;
787
788        let tor_addr_clone = tor_addr.clone();
789
790        debug!(
791            ?tor_addr,
792            ?addr,
793            "Successfully created `TorAddr` for given address (i.e. host and port)"
794        );
795
796        // TODO: It can be updated to use `is_onion_address()` implementation,
797        // once https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2214 lands.
798        let anonymized_stream = if api_endpoint.is_onion_address() {
799            let mut stream_prefs = arti_client::StreamPrefs::default();
800            stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
801
802            let anonymized_stream = tor_client
803                .connect_with_prefs(tor_addr, &stream_prefs)
804                .await
805                .map_err(|e| PeerError::Connection(e.into()))?;
806
807            debug!(
808                ?tor_addr_clone,
809                "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
810            );
811            anonymized_stream
812        } else {
813            let anonymized_stream = tor_client
814                .connect(tor_addr)
815                .await
816                .map_err(|e| PeerError::Connection(e.into()))?;
817
818            debug!(
819                ?tor_addr_clone,
820                "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
821            );
822            anonymized_stream
823        };
824
825        let is_tls = match api_endpoint.scheme() {
826            "wss" => true,
827            "ws" => false,
828            unexpected_scheme => {
829                return Err(PeerError::InvalidEndpoint(anyhow!(
830                    "Unsupported scheme: {unexpected_scheme}"
831                )));
832            }
833        };
834
835        let tls_connector = if is_tls {
836            let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
837            let mut root_certs = RootCertStore::empty();
838            root_certs.extend(webpki_roots);
839
840            let tls_config = TlsClientConfig::builder()
841                .with_root_certificates(root_certs)
842                .with_no_client_auth();
843            let tls_connector = TlsConnector::from(Arc::new(tls_config));
844            Some(tls_connector)
845        } else {
846            None
847        };
848
849        let mut ws_client_builder =
850            WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
851
852        if let Some(api_secret) = &self.api_secret {
853            // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
854            // but we can set up the headers manually
855            let mut headers = HeaderMap::new();
856
857            let auth =
858                base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
859
860            headers.insert(
861                "Authorization",
862                HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
863            );
864
865            ws_client_builder = ws_client_builder.set_headers(headers);
866        }
867
868        match tls_connector {
869            None => {
870                let client = ws_client_builder
871                    .build_with_stream(api_endpoint.as_str(), anonymized_stream)
872                    .await
873                    .map_err(|e| PeerError::Connection(e.into()))?;
874
875                Ok(client.into_dyn())
876            }
877            Some(tls_connector) => {
878                let host = api_endpoint
879                    .host_str()
880                    .map(ToOwned::to_owned)
881                    .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
882
883                // FIXME: (@leonardo) Is this leaking any data ? Should investigate it further
884                // if it's really needed.
885                let server_name = rustls_pki_types::ServerName::try_from(host)
886                    .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
887
888                let anonymized_tls_stream = tls_connector
889                    .connect(server_name, anonymized_stream)
890                    .await
891                    .map_err(|e| PeerError::Connection(e.into()))?;
892
893                let client = ws_client_builder
894                    .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
895                    .await
896                    .map_err(|e| PeerError::Connection(e.into()))?;
897
898                Ok(client.into_dyn())
899            }
900        }
901    }
902}
903
904fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
905    match jsonrpc_error {
906        JsonRpcClientError::Call(error_object) => {
907            let error = anyhow!(error_object.message().to_owned());
908            match ErrorCode::from(error_object.code()) {
909                ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
910                    PeerError::InvalidRequest(error)
911                }
912                ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
913                ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
914                ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
915                    PeerError::ServerError(error)
916                }
917            }
918        }
919        JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
920        JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
921        JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
922        JsonRpcClientError::InvalidSubscriptionId => todo!(),
923        JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
924            PeerError::InvalidRequest(anyhow!(invalid_request_id))
925        }
926        JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
927        JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
928        JsonRpcClientError::HttpNotImplemented => {
929            PeerError::ServerError(anyhow!("Http not implemented"))
930        }
931        JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
932            PeerError::InvalidRequest(anyhow!(empty_batch_request))
933        }
934        JsonRpcClientError::RegisterMethod(register_method_error) => {
935            PeerError::InvalidResponse(anyhow!(register_method_error))
936        }
937    }
938}
939
940#[async_trait]
941impl IClientConnection for WsClient {
942    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
943        let method = match method {
944            ApiMethod::Core(method) => method,
945            ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
946        };
947
948        Ok(ClientT::request(self, &method, [request.to_json()])
949            .await
950            .map_err(jsonrpc_error_to_peer_error)?)
951    }
952
953    async fn await_disconnection(&self) {
954        self.on_disconnect().await;
955    }
956}
957
958pub type DynClientConnector = Arc<dyn IClientConnector>;
959
960/// Allows to connect to peers. Connections are request based and should be
961/// authenticated and encrypted for production deployments.
962#[async_trait]
963pub trait IClientConnector: Send + Sync + 'static {
964    fn peers(&self) -> BTreeSet<PeerId>;
965
966    async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
967
968    fn into_dyn(self) -> DynClientConnector
969    where
970        Self: Sized,
971    {
972        Arc::new(self)
973    }
974}
975
976pub type DynClientConnection = Arc<dyn IClientConnection>;
977
978#[async_trait]
979pub trait IClientConnection: Debug + Send + Sync + 'static {
980    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
981
982    async fn await_disconnection(&self);
983
984    fn into_dyn(self) -> DynClientConnection
985    where
986        Self: Sized,
987    {
988        Arc::new(self)
989    }
990}
991
992#[derive(Clone, Debug)]
993pub struct ReconnectFederationApi {
994    peers: BTreeSet<PeerId>,
995    admin_id: Option<PeerId>,
996    module_id: Option<ModuleInstanceId>,
997    connections: ReconnectClientConnections,
998}
999
1000impl ReconnectFederationApi {
1001    fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
1002        Self {
1003            peers: connector.peers(),
1004            admin_id,
1005            module_id: None,
1006            connections: ReconnectClientConnections::new(connector),
1007        }
1008    }
1009
1010    pub async fn new_admin(
1011        peer: PeerId,
1012        url: SafeUrl,
1013        api_secret: &Option<String>,
1014    ) -> anyhow::Result<Self> {
1015        Self::from_endpoints(once((peer, url)), api_secret, Some(peer)).await
1016    }
1017
1018    pub async fn from_endpoints(
1019        peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1020        api_secret: &Option<String>,
1021        admin_id: Option<PeerId>,
1022    ) -> anyhow::Result<Self> {
1023        let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1024
1025        let scheme = peers
1026            .values()
1027            .next()
1028            .expect("Federation api has been initialized with no peers")
1029            .scheme();
1030
1031        let connector = match scheme {
1032            "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1033            #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1034            "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1035            #[cfg(all(feature = "iroh", not(target_family = "wasm")))]
1036            "iroh" => iroh::IrohConnector::new(peers).await?.into_dyn(),
1037            scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1038        };
1039
1040        Ok(ReconnectFederationApi::new(&connector, admin_id))
1041    }
1042}
1043
1044impl IModuleFederationApi for ReconnectFederationApi {}
1045
1046#[apply(async_trait_maybe_send!)]
1047impl IRawFederationApi for ReconnectFederationApi {
1048    fn all_peers(&self) -> &BTreeSet<PeerId> {
1049        &self.peers
1050    }
1051
1052    fn self_peer(&self) -> Option<PeerId> {
1053        self.admin_id
1054    }
1055
1056    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1057        ReconnectFederationApi {
1058            peers: self.peers.clone(),
1059            admin_id: self.admin_id,
1060            module_id: Some(id),
1061            connections: self.connections.clone(),
1062        }
1063        .into()
1064    }
1065
1066    async fn request_raw(
1067        &self,
1068        peer_id: PeerId,
1069        method: &str,
1070        params: &ApiRequestErased,
1071    ) -> PeerResult<Value> {
1072        let method = match self.module_id {
1073            Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1074            None => ApiMethod::Core(method.to_string()),
1075        };
1076
1077        self.connections
1078            .request(peer_id, method, params.clone())
1079            .await
1080    }
1081}
1082
1083#[derive(Clone, Debug)]
1084pub struct ReconnectClientConnections {
1085    connections: BTreeMap<PeerId, ClientConnection>,
1086}
1087
1088impl ReconnectClientConnections {
1089    pub fn new(connector: &DynClientConnector) -> Self {
1090        ReconnectClientConnections {
1091            connections: connector
1092                .peers()
1093                .into_iter()
1094                .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1095                .collect(),
1096        }
1097    }
1098
1099    async fn request(
1100        &self,
1101        peer: PeerId,
1102        method: ApiMethod,
1103        request: ApiRequestErased,
1104    ) -> PeerResult<Value> {
1105        trace!(target: LOG_NET_API, %method, "Api request");
1106        let res = self
1107            .connections
1108            .get(&peer)
1109            .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1110            .connection()
1111            .await
1112            .context("Failed to connect to peer")
1113            .map_err(PeerError::Connection)?
1114            .request(method.clone(), request)
1115            .await;
1116
1117        trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1118
1119        res
1120    }
1121}
1122
1123#[derive(Clone, Debug)]
1124struct ClientConnection {
1125    sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1126}
1127
1128impl ClientConnection {
1129    fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1130        let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1131
1132        fedimint_core::task::spawn(
1133            "peer-api-connection",
1134            async move {
1135                let mut backoff = api_networking_backoff();
1136
1137                while let Ok(sender) = receiver.recv().await {
1138                    let mut senders = vec![sender];
1139
1140                    // Drain the queue, so we everyone that already joined fail or succeed
1141                    // together.
1142                    while let Ok(sender) = receiver.try_recv() {
1143                        senders.push(sender);
1144                    }
1145
1146                    match connector.connect(peer).await {
1147                        Ok(connection) => {
1148                            trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1149
1150                            for sender in senders {
1151                                sender.send(connection.clone()).ok();
1152                            }
1153
1154                            loop {
1155                                tokio::select! {
1156                                    sender = receiver.recv() => {
1157                                        match sender.ok() {
1158                                            Some(sender) => sender.send(connection.clone()).ok(),
1159                                            None => break,
1160                                        };
1161                                    }
1162                                    () = connection.await_disconnection() => break,
1163                                }
1164                            }
1165
1166                            trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1167
1168                            backoff = api_networking_backoff();
1169                        }
1170                        Err(e) => {
1171                            trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1172
1173                            fedimint_core::task::sleep(
1174                                backoff.next().expect("No limit to the number of retries"),
1175                            )
1176                            .await;
1177                        }
1178                    }
1179                }
1180
1181                trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1182            }
1183            .instrument(trace_span!("peer-api-connection", ?peer)),
1184        );
1185
1186        ClientConnection { sender }
1187    }
1188
1189    async fn connection(&self) -> Option<DynClientConnection> {
1190        let (sender, receiver) = oneshot::channel();
1191
1192        self.sender
1193            .send(sender)
1194            .await
1195            .expect("Api connection request channel closed unexpectedly");
1196
1197        receiver.await.ok()
1198    }
1199}
1200
1201#[cfg(feature = "iroh")]
1202mod iroh {
1203    use std::collections::{BTreeMap, BTreeSet};
1204    use std::str::FromStr;
1205
1206    use anyhow::Context;
1207    use async_trait::async_trait;
1208    use fedimint_core::PeerId;
1209    use fedimint_core::envs::parse_kv_list_from_env;
1210    use fedimint_core::module::{
1211        ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
1212    };
1213    use fedimint_core::util::SafeUrl;
1214    use fedimint_logging::LOG_NET_IROH;
1215    use iroh::endpoint::Connection;
1216    use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
1217    use iroh_base::ticket::NodeTicket;
1218    use serde_json::Value;
1219    use tracing::{trace, warn};
1220
1221    use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
1222
1223    #[derive(Debug, Clone)]
1224    pub struct IrohConnector {
1225        node_ids: BTreeMap<PeerId, NodeId>,
1226        endpoint: Endpoint,
1227
1228        /// List of overrides to use when attempting to connect to given
1229        /// `NodeId`
1230        ///
1231        /// This is useful for testing, or forcing non-default network
1232        /// connectivity.
1233        pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
1234    }
1235
1236    impl IrohConnector {
1237        pub async fn new(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
1238            const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
1239            warn!(target: LOG_NET_IROH, "Iroh support is experimental");
1240            let mut s = Self::new_no_overrides(peers).await?;
1241
1242            for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
1243                s = s.with_connection_override(k, v.into());
1244            }
1245
1246            Ok(s)
1247        }
1248
1249        pub async fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
1250            let node_ids = peers
1251                .into_iter()
1252                .map(|(peer, url)| {
1253                    let host = url.host_str().context("Url is missing host")?;
1254
1255                    let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
1256
1257                    Ok((peer, node_id))
1258                })
1259                .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
1260
1261            Ok(Self {
1262                node_ids,
1263                endpoint: Endpoint::builder()
1264                    .discovery_n0()
1265                    .discovery_dht()
1266                    .bind()
1267                    .await?,
1268                connection_overrides: BTreeMap::new(),
1269            })
1270        }
1271
1272        pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
1273            self.connection_overrides.insert(node, addr);
1274            self
1275        }
1276    }
1277
1278    #[async_trait]
1279    impl IClientConnector for IrohConnector {
1280        fn peers(&self) -> BTreeSet<PeerId> {
1281            self.node_ids.keys().copied().collect()
1282        }
1283
1284        async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
1285            let node_id = *self
1286                .node_ids
1287                .get(&peer_id)
1288                .ok_or(PeerError::InvalidPeerId { peer_id })?;
1289
1290            let connection = match self.connection_overrides.get(&node_id) {
1291                Some(node_addr) => {
1292                    trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
1293                    self.endpoint
1294                        .connect(node_addr.clone(), FEDIMINT_API_ALPN)
1295                        .await
1296                }
1297                None => self.endpoint.connect(node_id, FEDIMINT_API_ALPN).await,
1298            }.map_err(PeerError::Connection)?;
1299
1300            Ok(connection.into_dyn())
1301        }
1302    }
1303
1304    #[async_trait]
1305    impl IClientConnection for Connection {
1306        async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
1307            let json = serde_json::to_vec(&IrohApiRequest { method, request })
1308                .expect("Serialization to vec can't fail");
1309
1310            let (mut sink, mut stream) = self
1311                .open_bi()
1312                .await
1313                .map_err(|e| PeerError::Transport(e.into()))?;
1314
1315            sink.write_all(&json)
1316                .await
1317                .map_err(|e| PeerError::Transport(e.into()))?;
1318
1319            sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
1320
1321            let response = stream
1322                .read_to_end(1_000_000)
1323                .await
1324                .map_err(|e| PeerError::Transport(e.into()))?;
1325
1326            // TODO: We should not be serializing Results on the wire
1327            let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
1328                .map_err(|e| PeerError::InvalidResponse(e.into()))?;
1329
1330            response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
1331        }
1332
1333        async fn await_disconnection(&self) {
1334            self.closed().await;
1335        }
1336    }
1337}
1338
1339/// The status of a server, including how it views its peers
1340#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1341pub struct FederationStatus {
1342    pub session_count: u64,
1343    pub status_by_peer: HashMap<PeerId, PeerStatus>,
1344    pub peers_online: u64,
1345    pub peers_offline: u64,
1346    /// This should always be 0 if everything is okay, so a monitoring tool
1347    /// should generate an alert if this is not the case.
1348    pub peers_flagged: u64,
1349    pub scheduled_shutdown: Option<u64>,
1350}
1351
1352#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1353pub struct PeerStatus {
1354    pub last_contribution: Option<u64>,
1355    pub connection_status: P2PConnectionStatus,
1356    /// Indicates that this peer needs attention from the operator since
1357    /// it has not contributed to the consensus in a long time
1358    pub flagged: bool,
1359}
1360
1361#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1362#[serde(rename_all = "snake_case")]
1363pub enum P2PConnectionStatus {
1364    #[default]
1365    Disconnected,
1366    Connected,
1367}
1368
1369#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1370pub struct StatusResponse {
1371    pub server: ServerStatusLegacy,
1372    pub federation: Option<FederationStatus>,
1373}
1374
1375/// Archive of all the guardian config files that can be used to recover a lost
1376/// guardian node.
1377#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1378pub struct GuardianConfigBackup {
1379    #[serde(with = "fedimint_core::hex::serde")]
1380    pub tar_archive_bytes: Vec<u8>,
1381}
1382
1383#[cfg(test)]
1384mod tests {
1385    use std::str::FromStr as _;
1386
1387    use fedimint_core::config::FederationId;
1388    use fedimint_core::invite_code::InviteCode;
1389
1390    use super::*;
1391
1392    #[test]
1393    fn converts_invite_code() {
1394        let connect = InviteCode::new(
1395            "ws://test1".parse().unwrap(),
1396            PeerId::from(1),
1397            FederationId::dummy(),
1398            Some("api_secret".into()),
1399        );
1400
1401        let bech32 = connect.to_string();
1402        let connect_parsed = InviteCode::from_str(&bech32).expect("parses");
1403        assert_eq!(connect, connect_parsed);
1404
1405        let json = serde_json::to_string(&connect).unwrap();
1406        let connect_as_string: String = serde_json::from_str(&json).unwrap();
1407        assert_eq!(connect_as_string, bech32);
1408        let connect_parsed_json: InviteCode = serde_json::from_str(&json).unwrap();
1409        assert_eq!(connect_parsed_json, connect_parsed);
1410    }
1411
1412    #[test]
1413    fn creates_essential_guardians_invite_code() {
1414        let mut peer_to_url_map = BTreeMap::new();
1415        peer_to_url_map.insert(PeerId::from(0), "ws://test1".parse().expect("URL fail"));
1416        peer_to_url_map.insert(PeerId::from(1), "ws://test2".parse().expect("URL fail"));
1417        peer_to_url_map.insert(PeerId::from(2), "ws://test3".parse().expect("URL fail"));
1418        peer_to_url_map.insert(PeerId::from(3), "ws://test4".parse().expect("URL fail"));
1419        let max_size = peer_to_url_map.to_num_peers().max_evil() + 1;
1420
1421        let code =
1422            InviteCode::new_with_essential_num_guardians(&peer_to_url_map, FederationId::dummy());
1423
1424        assert_eq!(FederationId::dummy(), code.federation_id());
1425
1426        let expected_map: BTreeMap<PeerId, SafeUrl> =
1427            peer_to_url_map.into_iter().take(max_size).collect();
1428        assert_eq!(expected_map, code.peers());
1429    }
1430}