fedimint_api_client/api/
mod.rs

1mod error;
2pub mod global_api;
3mod iroh;
4pub mod net;
5
6use core::panic;
7use std::collections::{BTreeMap, BTreeSet, HashMap};
8use std::fmt::Debug;
9use std::iter::once;
10use std::pin::Pin;
11use std::result;
12use std::sync::Arc;
13
14use anyhow::{Context, anyhow};
15#[cfg(all(feature = "tor", not(target_family = "wasm")))]
16use arti_client::{TorAddr, TorClient, TorClientConfig};
17use async_channel::bounded;
18use async_trait::async_trait;
19use base64::Engine as _;
20use bitcoin::hashes::sha256;
21use bitcoin::secp256k1;
22pub use error::{FederationError, OutputOutcomeError, PeerError};
23use fedimint_core::admin_client::{PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus};
24use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
25use fedimint_core::core::backup::SignedBackupRequest;
26use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
27use fedimint_core::encoding::{Decodable, Encodable};
28use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
29use fedimint_core::invite_code::InviteCode;
30use fedimint_core::module::audit::AuditSummary;
31use fedimint_core::module::registry::ModuleDecoderRegistry;
32use fedimint_core::module::{
33    ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
34};
35use fedimint_core::net::api_announcement::SignedApiAnnouncement;
36use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
37use fedimint_core::task::{MaybeSend, MaybeSync};
38use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
39use fedimint_core::util::backoff_util::api_networking_backoff;
40use fedimint_core::util::{FmtCompact as _, SafeUrl};
41use fedimint_core::{
42    NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
43};
44use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
45use futures::channel::oneshot;
46use futures::future::pending;
47use futures::stream::FuturesUnordered;
48use futures::{Future, StreamExt};
49use global_api::with_cache::GlobalFederationApiWithCache;
50use jsonrpsee_core::DeserializeOwned;
51use jsonrpsee_core::client::ClientT;
52pub use jsonrpsee_core::client::Error as JsonRpcClientError;
53use jsonrpsee_types::ErrorCode;
54#[cfg(target_family = "wasm")]
55use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
56#[cfg(not(target_family = "wasm"))]
57use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
58#[cfg(not(target_family = "wasm"))]
59use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
60use serde::{Deserialize, Serialize};
61use serde_json::Value;
62use tokio::sync::OnceCell;
63#[cfg(not(target_family = "wasm"))]
64use tokio_rustls::rustls::RootCertStore;
65#[cfg(all(feature = "tor", not(target_family = "wasm")))]
66use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
67use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
68
69use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
70
71pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
72
73pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
74    ApiVersion { major: 0, minor: 1 };
75
76pub type PeerResult<T> = Result<T, PeerError>;
77pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
78pub type FederationResult<T> = Result<T, FederationError>;
79pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
80
81pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
82
83static INSTALL_CRYPTO: OnceCell<()> = OnceCell::const_new();
84
85#[cfg(not(target_family = "wasm"))]
86async fn install_crypto_provider() {
87    INSTALL_CRYPTO
88        .get_or_init(|| async {
89            tokio_rustls::rustls::crypto::ring::default_provider()
90                .install_default()
91                .expect("Failed to install crypto");
92        })
93        .await;
94}
95
96/// Set of api versions for each component (core + modules)
97///
98/// E.g. result of federated common api versions discovery.
99#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
100pub struct ApiVersionSet {
101    pub core: ApiVersion,
102    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
103}
104
105/// An API (module or global) that can query a federation
106#[apply(async_trait_maybe_send!)]
107pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
108    /// List of all federation peers for the purpose of iterating each peer
109    /// in the federation.
110    ///
111    /// The underlying implementation is responsible for knowing how many
112    /// and `PeerId`s of each. The caller of this interface most probably
113    /// have some idea as well, but passing this set across every
114    /// API call to the federation would be inconvenient.
115    fn all_peers(&self) -> &BTreeSet<PeerId>;
116
117    /// PeerId of the Guardian node, if set
118    ///
119    /// This is for using Client in a "Admin" mode, making authenticated
120    /// calls to own `fedimintd` instance.
121    fn self_peer(&self) -> Option<PeerId>;
122
123    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
124
125    /// Make request to a specific federation peer by `peer_id`
126    async fn request_raw(
127        &self,
128        peer_id: PeerId,
129        method: &str,
130        params: &ApiRequestErased,
131    ) -> PeerResult<Value>;
132}
133
134/// An extension trait allowing to making federation-wide API call on top
135/// [`IRawFederationApi`].
136#[apply(async_trait_maybe_send!)]
137pub trait FederationApiExt: IRawFederationApi {
138    async fn request_single_peer<Ret>(
139        &self,
140        method: String,
141        params: ApiRequestErased,
142        peer: PeerId,
143    ) -> PeerResult<Ret>
144    where
145        Ret: DeserializeOwned,
146    {
147        self.request_raw(peer, &method, &params)
148            .await
149            .and_then(|v| {
150                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
151            })
152    }
153
154    async fn request_single_peer_federation<FedRet>(
155        &self,
156        method: String,
157        params: ApiRequestErased,
158        peer_id: PeerId,
159    ) -> FederationResult<FedRet>
160    where
161        FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
162    {
163        self.request_raw(peer_id, &method, &params)
164            .await
165            .and_then(|v| {
166                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
167            })
168            .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
169    }
170
171    /// Make an aggregate request to federation, using `strategy` to logically
172    /// merge the responses.
173    #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
174    async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
175        &self,
176        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
177        method: String,
178        params: ApiRequestErased,
179    ) -> FederationResult<FR> {
180        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
181        // completed results from it and we don't do any `await`s when
182        // processing them, it should be totally OK.
183        #[cfg(not(target_family = "wasm"))]
184        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
185        #[cfg(target_family = "wasm")]
186        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
187
188        for peer in self.all_peers() {
189            futures.push(Box::pin({
190                let method = &method;
191                let params = &params;
192                async move {
193                    let result = self
194                        .request_single_peer(method.clone(), params.clone(), *peer)
195                        .await;
196
197                    (*peer, result)
198                }
199            }));
200        }
201
202        let mut peer_errors = BTreeMap::new();
203        let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
204
205        loop {
206            let (peer, result) = futures
207                .next()
208                .await
209                .expect("Query strategy ran out of peers to query without returning a result");
210
211            match result {
212                Ok(response) => match strategy.process(peer, response) {
213                    QueryStep::Retry(peers) => {
214                        for peer in peers {
215                            futures.push(Box::pin({
216                                let method = &method;
217                                let params = &params;
218                                async move {
219                                    let result = self
220                                        .request_single_peer(method.clone(), params.clone(), peer)
221                                        .await;
222
223                                    (peer, result)
224                                }
225                            }));
226                        }
227                    }
228                    QueryStep::Success(response) => return Ok(response),
229                    QueryStep::Failure(e) => {
230                        peer_errors.insert(peer, e);
231                    }
232                    QueryStep::Continue => {}
233                },
234                Err(e) => {
235                    e.report_if_unusual(peer, "RequestWithStrategy");
236                    peer_errors.insert(peer, e);
237                }
238            }
239
240            if peer_errors.len() == peer_error_threshold {
241                return Err(FederationError::peer_errors(
242                    method.clone(),
243                    params.params.clone(),
244                    peer_errors,
245                ));
246            }
247        }
248    }
249
250    async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
251        &self,
252        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
253        method: String,
254        params: ApiRequestErased,
255    ) -> FR {
256        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
257        // completed results from it and we don't do any `await`s when
258        // processing them, it should be totally OK.
259        #[cfg(not(target_family = "wasm"))]
260        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
261        #[cfg(target_family = "wasm")]
262        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
263
264        for peer in self.all_peers() {
265            futures.push(Box::pin({
266                let method = &method;
267                let params = &params;
268                async move {
269                    let response = util::retry(
270                        format!("api-request-{method}-{peer}"),
271                        api_networking_backoff(),
272                        || async {
273                            self.request_single_peer(method.clone(), params.clone(), *peer)
274                                .await
275                                .inspect_err(|e| {
276                                    e.report_if_unusual(*peer, "QueryWithStrategyRetry");
277                                })
278                                .map_err(|e| anyhow!(e.to_string()))
279                        },
280                    )
281                    .await
282                    .expect("Number of retries has no limit");
283
284                    (*peer, response)
285                }
286            }));
287        }
288
289        loop {
290            let (peer, response) = match futures.next().await {
291                Some(t) => t,
292                None => pending().await,
293            };
294
295            match strategy.process(peer, response) {
296                QueryStep::Retry(peers) => {
297                    for peer in peers {
298                        futures.push(Box::pin({
299                            let method = &method;
300                            let params = &params;
301                            async move {
302                                let response = util::retry(
303                                    format!("api-request-{method}-{peer}"),
304                                    api_networking_backoff(),
305                                    || async {
306                                        self.request_single_peer(
307                                            method.clone(),
308                                            params.clone(),
309                                            peer,
310                                        )
311                                        .await
312                                        .inspect_err(|err| {
313                                            if err.is_unusual() {
314                                                debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
315                                            }
316                                        })
317                                        .map_err(|e| anyhow!(e.to_string()))
318                                    },
319                                )
320                                .await
321                                .expect("Number of retries has no limit");
322
323                                (peer, response)
324                            }
325                        }));
326                    }
327                }
328                QueryStep::Success(response) => return response,
329                QueryStep::Failure(e) => {
330                    warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
331                }
332                QueryStep::Continue => {}
333            }
334        }
335    }
336
337    async fn request_current_consensus<Ret>(
338        &self,
339        method: String,
340        params: ApiRequestErased,
341    ) -> FederationResult<Ret>
342    where
343        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
344    {
345        self.request_with_strategy(
346            ThresholdConsensus::new(self.all_peers().to_num_peers()),
347            method,
348            params,
349        )
350        .await
351    }
352
353    async fn request_current_consensus_retry<Ret>(
354        &self,
355        method: String,
356        params: ApiRequestErased,
357    ) -> Ret
358    where
359        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
360    {
361        self.request_with_strategy_retry(
362            ThresholdConsensus::new(self.all_peers().to_num_peers()),
363            method,
364            params,
365        )
366        .await
367    }
368
369    async fn request_admin<Ret>(
370        &self,
371        method: &str,
372        params: ApiRequestErased,
373        auth: ApiAuth,
374    ) -> FederationResult<Ret>
375    where
376        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
377    {
378        let Some(self_peer_id) = self.self_peer() else {
379            return Err(FederationError::general(
380                method,
381                params,
382                anyhow::format_err!("Admin peer_id not set"),
383            ));
384        };
385
386        self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
387            .await
388    }
389
390    async fn request_admin_no_auth<Ret>(
391        &self,
392        method: &str,
393        params: ApiRequestErased,
394    ) -> FederationResult<Ret>
395    where
396        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
397    {
398        let Some(self_peer_id) = self.self_peer() else {
399            return Err(FederationError::general(
400                method,
401                params,
402                anyhow::format_err!("Admin peer_id not set"),
403            ));
404        };
405
406        self.request_single_peer_federation(method.into(), params, self_peer_id)
407            .await
408    }
409}
410
411#[apply(async_trait_maybe_send!)]
412impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
413
414/// Trait marker for the module (non-global) endpoints
415pub trait IModuleFederationApi: IRawFederationApi {}
416
417dyn_newtype_define! {
418    #[derive(Clone)]
419    pub DynModuleApi(Arc<IModuleFederationApi>)
420}
421
422dyn_newtype_define! {
423    #[derive(Clone)]
424    pub DynGlobalApi(Arc<IGlobalFederationApi>)
425}
426
427impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
428    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
429        self.inner.as_ref()
430    }
431}
432
433impl DynGlobalApi {
434    pub async fn new_admin(
435        peer: PeerId,
436        url: SafeUrl,
437        api_secret: &Option<String>,
438    ) -> anyhow::Result<DynGlobalApi> {
439        Ok(GlobalFederationApiWithCache::new(
440            ReconnectFederationApi::from_endpoints(once((peer, url)), api_secret, Some(peer))
441                .await?,
442        )
443        .into())
444    }
445
446    // FIXME: (@leonardo) Should we have the option to do DKG and config related
447    // actions through Tor ? Should we add the `Connector` choice to
448    // ConfigParams then ?
449    pub async fn from_setup_endpoint(
450        url: SafeUrl,
451        api_secret: &Option<String>,
452    ) -> anyhow::Result<Self> {
453        // PeerIds are used only for informational purposes, but just in case, make a
454        // big number so it stands out
455
456        Self::new_admin(PeerId::from(1024), url, api_secret).await
457    }
458
459    pub async fn from_endpoints(
460        peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
461        api_secret: &Option<String>,
462    ) -> anyhow::Result<Self> {
463        Ok(GlobalFederationApiWithCache::new(
464            ReconnectFederationApi::from_endpoints(peers, api_secret, None).await?,
465        )
466        .into())
467    }
468}
469
470/// The API for the global (non-module) endpoints
471#[apply(async_trait_maybe_send!)]
472pub trait IGlobalFederationApi: IRawFederationApi {
473    async fn submit_transaction(
474        &self,
475        tx: Transaction,
476    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
477
478    async fn await_block(
479        &self,
480        block_index: u64,
481        decoders: &ModuleDecoderRegistry,
482    ) -> anyhow::Result<SessionOutcome>;
483
484    async fn get_session_status(
485        &self,
486        block_index: u64,
487        decoders: &ModuleDecoderRegistry,
488        core_api_version: ApiVersion,
489        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
490    ) -> anyhow::Result<SessionStatus>;
491
492    async fn session_count(&self) -> FederationResult<u64>;
493
494    async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
495
496    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
497
498    async fn download_backup(
499        &self,
500        id: &secp256k1::PublicKey,
501    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
502
503    /// Sets the password used to decrypt the configs and authenticate
504    ///
505    /// Must be called first before any other calls to the API
506    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
507
508    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
509
510    async fn set_local_params(
511        &self,
512        name: String,
513        federation_name: Option<String>,
514        auth: ApiAuth,
515    ) -> FederationResult<String>;
516
517    async fn add_peer_connection_info(
518        &self,
519        info: String,
520        auth: ApiAuth,
521    ) -> FederationResult<String>;
522
523    /// Reset the peer setup codes during the federation setup process
524    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
525
526    /// Returns the setup code if set_local_params was already called
527    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
528
529    /// During config gen, used for an API-to-API call that adds a peer's server
530    /// connection info to the leader.
531    ///
532    /// Note this call will fail until the leader has their API running and has
533    /// `set_server_connections` so clients should retry.
534    ///
535    /// This call is not authenticated because it's guardian-to-guardian
536    async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
537
538    /// During config gen, gets all the server connections we've received from
539    /// peers using `add_config_gen_peer`
540    ///
541    /// Could be called on the leader, so it's not authenticated
542    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
543
544    /// Runs DKG, can only be called once after configs have been generated in
545    /// `get_consensus_config_gen_params`.  If DKG fails this returns a 500
546    /// error and config gen must be restarted.
547    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
548
549    /// After DKG, returns the hash of the consensus config tweaked with our id.
550    /// We need to share this with all other peers to complete verification.
551    async fn get_verify_config_hash(
552        &self,
553        auth: ApiAuth,
554    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
555
556    /// Updates local state and notify leader that we have verified configs.
557    /// This allows for a synchronization point, before we start consensus.
558    async fn verified_configs(
559        &self,
560        auth: ApiAuth,
561    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
562
563    /// Reads the configs from the disk, starts the consensus server, and shuts
564    /// down the config gen API to start the Fedimint API
565    ///
566    /// Clients may receive an error due to forced shutdown, should call the
567    /// `server_status` to see if consensus has started.
568    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
569
570    /// Returns the status of the server
571    async fn status(&self) -> FederationResult<StatusResponse>;
572
573    /// Show an audit across all modules
574    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
575
576    /// Download the guardian config to back it up
577    async fn guardian_config_backup(&self, auth: ApiAuth)
578    -> FederationResult<GuardianConfigBackup>;
579
580    /// Check auth credentials
581    async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
582
583    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
584
585    /// Publish our signed API announcement to other guardians
586    async fn submit_api_announcement(
587        &self,
588        peer_id: PeerId,
589        announcement: SignedApiAnnouncement,
590    ) -> FederationResult<()>;
591
592    async fn api_announcements(
593        &self,
594        guardian: PeerId,
595    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
596
597    async fn sign_api_announcement(
598        &self,
599        api_url: SafeUrl,
600        auth: ApiAuth,
601    ) -> FederationResult<SignedApiAnnouncement>;
602
603    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
604
605    /// Returns the fedimintd version a peer is running
606    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
607
608    /// Fetch the backup statistics from the federation (admin endpoint)
609    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
610
611    /// Get the invite code for the federation guardian.
612    /// For instance, useful after DKG
613    async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode>;
614}
615
616pub fn deserialize_outcome<R>(
617    outcome: &SerdeOutputOutcome,
618    module_decoder: &Decoder,
619) -> OutputOutcomeResult<R>
620where
621    R: OutputOutcome + MaybeSend,
622{
623    let dyn_outcome = outcome
624        .try_into_inner_known_module_kind(module_decoder)
625        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
626
627    let source_instance = dyn_outcome.module_instance_id();
628
629    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
630        let target_type = std::any::type_name::<R>();
631        OutputOutcomeError::ResponseDeserialization(anyhow!(
632            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
633        ))
634    })
635}
636
637#[derive(Debug, Clone)]
638pub struct WebsocketConnector {
639    peers: BTreeMap<PeerId, SafeUrl>,
640    api_secret: Option<String>,
641
642    /// List of overrides to use when attempting to connect to given
643    /// `PeerId`
644    ///
645    /// This is useful for testing, or forcing non-default network
646    /// connectivity.
647    pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
648}
649
650impl WebsocketConnector {
651    fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
652        let mut s = Self::new_no_overrides(peers, api_secret);
653
654        for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
655            s = s.with_connection_override(k, v);
656        }
657
658        Ok(s)
659    }
660    pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
661        self.connection_overrides.insert(peer_id, url);
662        self
663    }
664    pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
665        Self {
666            peers,
667            api_secret,
668            connection_overrides: BTreeMap::default(),
669        }
670    }
671}
672
673#[async_trait]
674impl IClientConnector for WebsocketConnector {
675    fn peers(&self) -> BTreeSet<PeerId> {
676        self.peers.keys().copied().collect()
677    }
678
679    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
680        let api_endpoint = match self.connection_overrides.get(&peer_id) {
681            Some(url) => {
682                trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
683                url
684            }
685            None => self.peers.get(&peer_id).ok_or_else(|| {
686                PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
687            })?,
688        };
689
690        #[cfg(not(target_family = "wasm"))]
691        let mut client = {
692            install_crypto_provider().await;
693            let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
694            let mut root_certs = RootCertStore::empty();
695            root_certs.extend(webpki_roots);
696
697            let tls_cfg = CustomCertStore::builder()
698                .with_root_certificates(root_certs)
699                .with_no_client_auth();
700
701            WsClientBuilder::default()
702                .max_concurrent_requests(u16::MAX as usize)
703                .with_custom_cert_store(tls_cfg)
704        };
705
706        #[cfg(target_family = "wasm")]
707        let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
708
709        if let Some(api_secret) = &self.api_secret {
710            #[cfg(not(target_family = "wasm"))]
711            {
712                // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
713                // but we can set up the headers manually
714                let mut headers = HeaderMap::new();
715
716                let auth = base64::engine::general_purpose::STANDARD
717                    .encode(format!("fedimint:{api_secret}"));
718
719                headers.insert(
720                    "Authorization",
721                    HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
722                );
723
724                client = client.set_headers(headers);
725            }
726            #[cfg(target_family = "wasm")]
727            {
728                // on wasm, url will be handled by the browser, which should take care of
729                // `user:pass@...`
730                let mut url = api_endpoint.clone();
731                url.set_username("fedimint")
732                    .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
733                url.set_password(Some(&api_secret))
734                    .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
735
736                let client = client
737                    .build(url.as_str())
738                    .await
739                    .map_err(|err| PeerError::InternalClientError(err.into()))?;
740
741                return Ok(client.into_dyn());
742            }
743        }
744
745        let client = client
746            .build(api_endpoint.as_str())
747            .await
748            .map_err(|err| PeerError::InternalClientError(err.into()))?;
749
750        Ok(client.into_dyn())
751    }
752}
753
754#[cfg(all(feature = "tor", not(target_family = "wasm")))]
755#[derive(Debug, Clone)]
756pub struct TorConnector {
757    peers: BTreeMap<PeerId, SafeUrl>,
758    api_secret: Option<String>,
759}
760
761#[cfg(all(feature = "tor", not(target_family = "wasm")))]
762impl TorConnector {
763    pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
764        Self { peers, api_secret }
765    }
766}
767
768#[cfg(all(feature = "tor", not(target_family = "wasm")))]
769#[async_trait]
770impl IClientConnector for TorConnector {
771    fn peers(&self) -> BTreeSet<PeerId> {
772        self.peers.keys().copied().collect()
773    }
774
775    #[allow(clippy::too_many_lines)]
776    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
777        let api_endpoint = self
778            .peers
779            .get(&peer_id)
780            .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
781
782        install_crypto_provider().await;
783
784        let tor_config = TorClientConfig::default();
785        let tor_client = TorClient::create_bootstrapped(tor_config)
786            .await
787            .map_err(|err| PeerError::InternalClientError(err.into()))?
788            .isolated_client();
789
790        debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
791
792        // TODO: (@leonardo) should we implement our `IntoTorAddr` for `SafeUrl`
793        // instead?
794        let addr = (
795            api_endpoint
796                .host_str()
797                .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
798            api_endpoint
799                .port_or_known_default()
800                .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
801        );
802        let tor_addr = TorAddr::from(addr).map_err(|e| {
803            PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
804        })?;
805
806        let tor_addr_clone = tor_addr.clone();
807
808        debug!(
809            ?tor_addr,
810            ?addr,
811            "Successfully created `TorAddr` for given address (i.e. host and port)"
812        );
813
814        // TODO: It can be updated to use `is_onion_address()` implementation,
815        // once https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2214 lands.
816        let anonymized_stream = if api_endpoint.is_onion_address() {
817            let mut stream_prefs = arti_client::StreamPrefs::default();
818            stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
819
820            let anonymized_stream = tor_client
821                .connect_with_prefs(tor_addr, &stream_prefs)
822                .await
823                .map_err(|e| PeerError::Connection(e.into()))?;
824
825            debug!(
826                ?tor_addr_clone,
827                "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
828            );
829            anonymized_stream
830        } else {
831            let anonymized_stream = tor_client
832                .connect(tor_addr)
833                .await
834                .map_err(|e| PeerError::Connection(e.into()))?;
835
836            debug!(
837                ?tor_addr_clone,
838                "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
839            );
840            anonymized_stream
841        };
842
843        let is_tls = match api_endpoint.scheme() {
844            "wss" => true,
845            "ws" => false,
846            unexpected_scheme => {
847                return Err(PeerError::InvalidEndpoint(anyhow!(
848                    "Unsupported scheme: {unexpected_scheme}"
849                )));
850            }
851        };
852
853        let tls_connector = if is_tls {
854            let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
855            let mut root_certs = RootCertStore::empty();
856            root_certs.extend(webpki_roots);
857
858            let tls_config = TlsClientConfig::builder()
859                .with_root_certificates(root_certs)
860                .with_no_client_auth();
861            let tls_connector = TlsConnector::from(Arc::new(tls_config));
862            Some(tls_connector)
863        } else {
864            None
865        };
866
867        let mut ws_client_builder =
868            WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
869
870        if let Some(api_secret) = &self.api_secret {
871            // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
872            // but we can set up the headers manually
873            let mut headers = HeaderMap::new();
874
875            let auth =
876                base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
877
878            headers.insert(
879                "Authorization",
880                HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
881            );
882
883            ws_client_builder = ws_client_builder.set_headers(headers);
884        }
885
886        match tls_connector {
887            None => {
888                let client = ws_client_builder
889                    .build_with_stream(api_endpoint.as_str(), anonymized_stream)
890                    .await
891                    .map_err(|e| PeerError::Connection(e.into()))?;
892
893                Ok(client.into_dyn())
894            }
895            Some(tls_connector) => {
896                let host = api_endpoint
897                    .host_str()
898                    .map(ToOwned::to_owned)
899                    .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
900
901                // FIXME: (@leonardo) Is this leaking any data ? Should investigate it further
902                // if it's really needed.
903                let server_name = rustls_pki_types::ServerName::try_from(host)
904                    .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
905
906                let anonymized_tls_stream = tls_connector
907                    .connect(server_name, anonymized_stream)
908                    .await
909                    .map_err(|e| PeerError::Connection(e.into()))?;
910
911                let client = ws_client_builder
912                    .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
913                    .await
914                    .map_err(|e| PeerError::Connection(e.into()))?;
915
916                Ok(client.into_dyn())
917            }
918        }
919    }
920}
921
922fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
923    match jsonrpc_error {
924        JsonRpcClientError::Call(error_object) => {
925            let error = anyhow!(error_object.message().to_owned());
926            match ErrorCode::from(error_object.code()) {
927                ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
928                    PeerError::InvalidRequest(error)
929                }
930                ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
931                ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
932                ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
933                    PeerError::ServerError(error)
934                }
935            }
936        }
937        JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
938        JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
939        JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
940        JsonRpcClientError::InvalidSubscriptionId => {
941            PeerError::Transport(anyhow!("Invalid subscription id"))
942        }
943        JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
944            PeerError::InvalidRequest(anyhow!(invalid_request_id))
945        }
946        JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
947        JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
948        JsonRpcClientError::HttpNotImplemented => {
949            PeerError::ServerError(anyhow!("Http not implemented"))
950        }
951        JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
952            PeerError::InvalidRequest(anyhow!(empty_batch_request))
953        }
954        JsonRpcClientError::RegisterMethod(register_method_error) => {
955            PeerError::InvalidResponse(anyhow!(register_method_error))
956        }
957    }
958}
959
960#[async_trait]
961impl IClientConnection for WsClient {
962    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
963        let method = match method {
964            ApiMethod::Core(method) => method,
965            ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
966        };
967
968        Ok(ClientT::request(self, &method, [request.to_json()])
969            .await
970            .map_err(jsonrpc_error_to_peer_error)?)
971    }
972
973    async fn await_disconnection(&self) {
974        self.on_disconnect().await;
975    }
976}
977
978pub type DynClientConnector = Arc<dyn IClientConnector>;
979
980/// Allows to connect to peers. Connections are request based and should be
981/// authenticated and encrypted for production deployments.
982#[async_trait]
983pub trait IClientConnector: Send + Sync + 'static {
984    fn peers(&self) -> BTreeSet<PeerId>;
985
986    async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
987
988    fn into_dyn(self) -> DynClientConnector
989    where
990        Self: Sized,
991    {
992        Arc::new(self)
993    }
994}
995
996pub type DynClientConnection = Arc<dyn IClientConnection>;
997
998#[async_trait]
999pub trait IClientConnection: Debug + Send + Sync + 'static {
1000    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
1001
1002    async fn await_disconnection(&self);
1003
1004    fn into_dyn(self) -> DynClientConnection
1005    where
1006        Self: Sized,
1007    {
1008        Arc::new(self)
1009    }
1010}
1011
1012#[derive(Clone, Debug)]
1013pub struct ReconnectFederationApi {
1014    peers: BTreeSet<PeerId>,
1015    admin_id: Option<PeerId>,
1016    module_id: Option<ModuleInstanceId>,
1017    connections: ReconnectClientConnections,
1018}
1019
1020impl ReconnectFederationApi {
1021    fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
1022        Self {
1023            peers: connector.peers(),
1024            admin_id,
1025            module_id: None,
1026            connections: ReconnectClientConnections::new(connector),
1027        }
1028    }
1029
1030    pub async fn new_admin(
1031        peer: PeerId,
1032        url: SafeUrl,
1033        api_secret: &Option<String>,
1034    ) -> anyhow::Result<Self> {
1035        Self::from_endpoints(once((peer, url)), api_secret, Some(peer)).await
1036    }
1037
1038    pub async fn from_endpoints(
1039        peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1040        api_secret: &Option<String>,
1041        admin_id: Option<PeerId>,
1042    ) -> anyhow::Result<Self> {
1043        let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1044
1045        let scheme = peers
1046            .values()
1047            .next()
1048            .expect("Federation api has been initialized with no peers")
1049            .scheme();
1050
1051        let connector = match scheme {
1052            "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1053            #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1054            "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1055            "iroh" => iroh::IrohConnector::new(peers).await?.into_dyn(),
1056            scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1057        };
1058
1059        Ok(ReconnectFederationApi::new(&connector, admin_id))
1060    }
1061}
1062
1063impl IModuleFederationApi for ReconnectFederationApi {}
1064
1065#[apply(async_trait_maybe_send!)]
1066impl IRawFederationApi for ReconnectFederationApi {
1067    fn all_peers(&self) -> &BTreeSet<PeerId> {
1068        &self.peers
1069    }
1070
1071    fn self_peer(&self) -> Option<PeerId> {
1072        self.admin_id
1073    }
1074
1075    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1076        ReconnectFederationApi {
1077            peers: self.peers.clone(),
1078            admin_id: self.admin_id,
1079            module_id: Some(id),
1080            connections: self.connections.clone(),
1081        }
1082        .into()
1083    }
1084
1085    #[instrument(
1086        target = LOG_NET_API,
1087        skip_all,
1088        fields(
1089            peer_id = %peer_id,
1090            method = %method,
1091            params = %params.params,
1092        )
1093    )]
1094    async fn request_raw(
1095        &self,
1096        peer_id: PeerId,
1097        method: &str,
1098        params: &ApiRequestErased,
1099    ) -> PeerResult<Value> {
1100        let method = match self.module_id {
1101            Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1102            None => ApiMethod::Core(method.to_string()),
1103        };
1104
1105        self.connections
1106            .request(peer_id, method, params.clone())
1107            .await
1108    }
1109}
1110
1111#[derive(Clone, Debug)]
1112pub struct ReconnectClientConnections {
1113    connections: BTreeMap<PeerId, ClientConnection>,
1114}
1115
1116impl ReconnectClientConnections {
1117    pub fn new(connector: &DynClientConnector) -> Self {
1118        ReconnectClientConnections {
1119            connections: connector
1120                .peers()
1121                .into_iter()
1122                .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1123                .collect(),
1124        }
1125    }
1126
1127    async fn request(
1128        &self,
1129        peer: PeerId,
1130        method: ApiMethod,
1131        request: ApiRequestErased,
1132    ) -> PeerResult<Value> {
1133        trace!(target: LOG_NET_API, %method, "Api request");
1134        let res = self
1135            .connections
1136            .get(&peer)
1137            .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1138            .connection()
1139            .await
1140            .context("Failed to connect to peer")
1141            .map_err(PeerError::Connection)?
1142            .request(method.clone(), request)
1143            .await;
1144
1145        trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1146
1147        res
1148    }
1149}
1150
1151#[derive(Clone, Debug)]
1152struct ClientConnection {
1153    sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1154}
1155
1156impl ClientConnection {
1157    fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1158        let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1159
1160        fedimint_core::task::spawn(
1161            "peer-api-connection",
1162            async move {
1163                let mut backoff = api_networking_backoff();
1164
1165                while let Ok(sender) = receiver.recv().await {
1166                    let mut senders = vec![sender];
1167
1168                    // Drain the queue, so we everyone that already joined fail or succeed
1169                    // together.
1170                    while let Ok(sender) = receiver.try_recv() {
1171                        senders.push(sender);
1172                    }
1173
1174                    match connector.connect(peer).await {
1175                        Ok(connection) => {
1176                            trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1177
1178                            for sender in senders {
1179                                sender.send(connection.clone()).ok();
1180                            }
1181
1182                            loop {
1183                                tokio::select! {
1184                                    sender = receiver.recv() => {
1185                                        match sender.ok() {
1186                                            Some(sender) => sender.send(connection.clone()).ok(),
1187                                            None => break,
1188                                        };
1189                                    }
1190                                    () = connection.await_disconnection() => break,
1191                                }
1192                            }
1193
1194                            trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1195
1196                            backoff = api_networking_backoff();
1197                        }
1198                        Err(e) => {
1199                            trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1200
1201                            fedimint_core::task::sleep(
1202                                backoff.next().expect("No limit to the number of retries"),
1203                            )
1204                            .await;
1205                        }
1206                    }
1207                }
1208
1209                trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1210            }
1211            .instrument(trace_span!("peer-api-connection", ?peer)),
1212        );
1213
1214        ClientConnection { sender }
1215    }
1216
1217    async fn connection(&self) -> Option<DynClientConnection> {
1218        let (sender, receiver) = oneshot::channel();
1219
1220        self.sender
1221            .send(sender)
1222            .await
1223            .inspect_err(|err| {
1224                warn!(
1225                    target: LOG_CLIENT_NET_API,
1226                    err = %err.fmt_compact(),
1227                    "Api connection request channel closed unexpectedly"
1228                );
1229            })
1230            .ok()?;
1231
1232        receiver.await.ok()
1233    }
1234}
1235
1236/// The status of a server, including how it views its peers
1237#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1238pub struct LegacyFederationStatus {
1239    pub session_count: u64,
1240    pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1241    pub peers_online: u64,
1242    pub peers_offline: u64,
1243    /// This should always be 0 if everything is okay, so a monitoring tool
1244    /// should generate an alert if this is not the case.
1245    pub peers_flagged: u64,
1246    pub scheduled_shutdown: Option<u64>,
1247}
1248
1249#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1250pub struct LegacyPeerStatus {
1251    pub last_contribution: Option<u64>,
1252    pub connection_status: LegacyP2PConnectionStatus,
1253    /// Indicates that this peer needs attention from the operator since
1254    /// it has not contributed to the consensus in a long time
1255    pub flagged: bool,
1256}
1257
1258#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1259#[serde(rename_all = "snake_case")]
1260pub enum LegacyP2PConnectionStatus {
1261    #[default]
1262    Disconnected,
1263    Connected,
1264}
1265
1266#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1267pub struct StatusResponse {
1268    pub server: ServerStatusLegacy,
1269    pub federation: Option<LegacyFederationStatus>,
1270}
1271
1272/// Archive of all the guardian config files that can be used to recover a lost
1273/// guardian node.
1274#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1275pub struct GuardianConfigBackup {
1276    #[serde(with = "fedimint_core::hex::serde")]
1277    pub tar_archive_bytes: Vec<u8>,
1278}
1279
1280#[cfg(test)]
1281mod tests;