fedimint_api_client/api/
mod.rs

1mod error;
2pub mod global_api;
3mod iroh;
4pub mod net;
5
6use core::panic;
7use std::collections::{BTreeMap, BTreeSet, HashMap};
8use std::fmt::Debug;
9use std::iter::once;
10use std::pin::Pin;
11use std::result;
12use std::sync::Arc;
13
14use anyhow::{Context, anyhow};
15#[cfg(all(feature = "tor", not(target_family = "wasm")))]
16use arti_client::{TorAddr, TorClient, TorClientConfig};
17use async_channel::bounded;
18use async_trait::async_trait;
19use base64::Engine as _;
20use bitcoin::hashes::sha256;
21use bitcoin::secp256k1;
22pub use error::{FederationError, OutputOutcomeError, PeerError};
23use fedimint_core::admin_client::{PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus};
24use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
25use fedimint_core::core::backup::SignedBackupRequest;
26use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
27use fedimint_core::encoding::{Decodable, Encodable};
28use fedimint_core::envs::{
29    FM_IROH_DNS_ENV, FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env,
30};
31use fedimint_core::invite_code::InviteCode;
32use fedimint_core::module::audit::AuditSummary;
33use fedimint_core::module::registry::ModuleDecoderRegistry;
34use fedimint_core::module::{
35    ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
36};
37use fedimint_core::net::api_announcement::SignedApiAnnouncement;
38use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
39use fedimint_core::task::{MaybeSend, MaybeSync};
40use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
41use fedimint_core::util::backoff_util::api_networking_backoff;
42use fedimint_core::util::{FmtCompact as _, SafeUrl};
43use fedimint_core::{
44    NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
45};
46use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
47use futures::channel::oneshot;
48use futures::future::pending;
49use futures::stream::FuturesUnordered;
50use futures::{Future, StreamExt};
51use global_api::with_cache::GlobalFederationApiWithCache;
52use jsonrpsee_core::DeserializeOwned;
53use jsonrpsee_core::client::ClientT;
54pub use jsonrpsee_core::client::Error as JsonRpcClientError;
55use jsonrpsee_types::ErrorCode;
56#[cfg(target_family = "wasm")]
57use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
58#[cfg(not(target_family = "wasm"))]
59use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
60#[cfg(not(target_family = "wasm"))]
61use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
62use serde::{Deserialize, Serialize};
63use serde_json::Value;
64#[cfg(not(target_family = "wasm"))]
65use tokio_rustls::rustls::RootCertStore;
66#[cfg(all(feature = "tor", not(target_family = "wasm")))]
67use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
68use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
69
70use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
71
72pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
73
74pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
75    ApiVersion { major: 0, minor: 1 };
76
77pub type PeerResult<T> = Result<T, PeerError>;
78pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
79pub type FederationResult<T> = Result<T, FederationError>;
80pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
81
82pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
83
84#[cfg(not(target_family = "wasm"))]
85fn install_crypto_provider() {
86    let _ = tokio_rustls::rustls::crypto::ring::default_provider().install_default();
87}
88
89/// Set of api versions for each component (core + modules)
90///
91/// E.g. result of federated common api versions discovery.
92#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
93pub struct ApiVersionSet {
94    pub core: ApiVersion,
95    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
96}
97
98/// An API (module or global) that can query a federation
99#[apply(async_trait_maybe_send!)]
100pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
101    /// List of all federation peers for the purpose of iterating each peer
102    /// in the federation.
103    ///
104    /// The underlying implementation is responsible for knowing how many
105    /// and `PeerId`s of each. The caller of this interface most probably
106    /// have some idea as well, but passing this set across every
107    /// API call to the federation would be inconvenient.
108    fn all_peers(&self) -> &BTreeSet<PeerId>;
109
110    /// PeerId of the Guardian node, if set
111    ///
112    /// This is for using Client in a "Admin" mode, making authenticated
113    /// calls to own `fedimintd` instance.
114    fn self_peer(&self) -> Option<PeerId>;
115
116    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
117
118    /// Make request to a specific federation peer by `peer_id`
119    async fn request_raw(
120        &self,
121        peer_id: PeerId,
122        method: &str,
123        params: &ApiRequestErased,
124    ) -> PeerResult<Value>;
125}
126
127/// An extension trait allowing to making federation-wide API call on top
128/// [`IRawFederationApi`].
129#[apply(async_trait_maybe_send!)]
130pub trait FederationApiExt: IRawFederationApi {
131    async fn request_single_peer<Ret>(
132        &self,
133        method: String,
134        params: ApiRequestErased,
135        peer: PeerId,
136    ) -> PeerResult<Ret>
137    where
138        Ret: DeserializeOwned,
139    {
140        self.request_raw(peer, &method, &params)
141            .await
142            .and_then(|v| {
143                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
144            })
145    }
146
147    async fn request_single_peer_federation<FedRet>(
148        &self,
149        method: String,
150        params: ApiRequestErased,
151        peer_id: PeerId,
152    ) -> FederationResult<FedRet>
153    where
154        FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
155    {
156        self.request_raw(peer_id, &method, &params)
157            .await
158            .and_then(|v| {
159                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
160            })
161            .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
162    }
163
164    /// Make an aggregate request to federation, using `strategy` to logically
165    /// merge the responses.
166    #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
167    async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
168        &self,
169        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
170        method: String,
171        params: ApiRequestErased,
172    ) -> FederationResult<FR> {
173        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
174        // completed results from it and we don't do any `await`s when
175        // processing them, it should be totally OK.
176        #[cfg(not(target_family = "wasm"))]
177        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
178        #[cfg(target_family = "wasm")]
179        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
180
181        for peer in self.all_peers() {
182            futures.push(Box::pin({
183                let method = &method;
184                let params = &params;
185                async move {
186                    let result = self
187                        .request_single_peer(method.clone(), params.clone(), *peer)
188                        .await;
189
190                    (*peer, result)
191                }
192            }));
193        }
194
195        let mut peer_errors = BTreeMap::new();
196        let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
197
198        loop {
199            let (peer, result) = futures
200                .next()
201                .await
202                .expect("Query strategy ran out of peers to query without returning a result");
203
204            match result {
205                Ok(response) => match strategy.process(peer, response) {
206                    QueryStep::Retry(peers) => {
207                        for peer in peers {
208                            futures.push(Box::pin({
209                                let method = &method;
210                                let params = &params;
211                                async move {
212                                    let result = self
213                                        .request_single_peer(method.clone(), params.clone(), peer)
214                                        .await;
215
216                                    (peer, result)
217                                }
218                            }));
219                        }
220                    }
221                    QueryStep::Success(response) => return Ok(response),
222                    QueryStep::Failure(e) => {
223                        peer_errors.insert(peer, e);
224                    }
225                    QueryStep::Continue => {}
226                },
227                Err(e) => {
228                    e.report_if_unusual(peer, "RequestWithStrategy");
229                    peer_errors.insert(peer, e);
230                }
231            }
232
233            if peer_errors.len() == peer_error_threshold {
234                return Err(FederationError::peer_errors(
235                    method.clone(),
236                    params.params.clone(),
237                    peer_errors,
238                ));
239            }
240        }
241    }
242
243    async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
244        &self,
245        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
246        method: String,
247        params: ApiRequestErased,
248    ) -> FR {
249        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
250        // completed results from it and we don't do any `await`s when
251        // processing them, it should be totally OK.
252        #[cfg(not(target_family = "wasm"))]
253        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
254        #[cfg(target_family = "wasm")]
255        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
256
257        for peer in self.all_peers() {
258            futures.push(Box::pin({
259                let method = &method;
260                let params = &params;
261                async move {
262                    let response = util::retry(
263                        format!("api-request-{method}-{peer}"),
264                        api_networking_backoff(),
265                        || async {
266                            self.request_single_peer(method.clone(), params.clone(), *peer)
267                                .await
268                                .inspect_err(|e| {
269                                    e.report_if_unusual(*peer, "QueryWithStrategyRetry");
270                                })
271                                .map_err(|e| anyhow!(e.to_string()))
272                        },
273                    )
274                    .await
275                    .expect("Number of retries has no limit");
276
277                    (*peer, response)
278                }
279            }));
280        }
281
282        loop {
283            let (peer, response) = match futures.next().await {
284                Some(t) => t,
285                None => pending().await,
286            };
287
288            match strategy.process(peer, response) {
289                QueryStep::Retry(peers) => {
290                    for peer in peers {
291                        futures.push(Box::pin({
292                            let method = &method;
293                            let params = &params;
294                            async move {
295                                let response = util::retry(
296                                    format!("api-request-{method}-{peer}"),
297                                    api_networking_backoff(),
298                                    || async {
299                                        self.request_single_peer(
300                                            method.clone(),
301                                            params.clone(),
302                                            peer,
303                                        )
304                                        .await
305                                        .inspect_err(|err| {
306                                            if err.is_unusual() {
307                                                debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
308                                            }
309                                        })
310                                        .map_err(|e| anyhow!(e.to_string()))
311                                    },
312                                )
313                                .await
314                                .expect("Number of retries has no limit");
315
316                                (peer, response)
317                            }
318                        }));
319                    }
320                }
321                QueryStep::Success(response) => return response,
322                QueryStep::Failure(e) => {
323                    warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
324                }
325                QueryStep::Continue => {}
326            }
327        }
328    }
329
330    async fn request_current_consensus<Ret>(
331        &self,
332        method: String,
333        params: ApiRequestErased,
334    ) -> FederationResult<Ret>
335    where
336        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
337    {
338        self.request_with_strategy(
339            ThresholdConsensus::new(self.all_peers().to_num_peers()),
340            method,
341            params,
342        )
343        .await
344    }
345
346    async fn request_current_consensus_retry<Ret>(
347        &self,
348        method: String,
349        params: ApiRequestErased,
350    ) -> Ret
351    where
352        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
353    {
354        self.request_with_strategy_retry(
355            ThresholdConsensus::new(self.all_peers().to_num_peers()),
356            method,
357            params,
358        )
359        .await
360    }
361
362    async fn request_admin<Ret>(
363        &self,
364        method: &str,
365        params: ApiRequestErased,
366        auth: ApiAuth,
367    ) -> FederationResult<Ret>
368    where
369        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
370    {
371        let Some(self_peer_id) = self.self_peer() else {
372            return Err(FederationError::general(
373                method,
374                params,
375                anyhow::format_err!("Admin peer_id not set"),
376            ));
377        };
378
379        self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
380            .await
381    }
382
383    async fn request_admin_no_auth<Ret>(
384        &self,
385        method: &str,
386        params: ApiRequestErased,
387    ) -> FederationResult<Ret>
388    where
389        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
390    {
391        let Some(self_peer_id) = self.self_peer() else {
392            return Err(FederationError::general(
393                method,
394                params,
395                anyhow::format_err!("Admin peer_id not set"),
396            ));
397        };
398
399        self.request_single_peer_federation(method.into(), params, self_peer_id)
400            .await
401    }
402}
403
404#[apply(async_trait_maybe_send!)]
405impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
406
407/// Trait marker for the module (non-global) endpoints
408pub trait IModuleFederationApi: IRawFederationApi {}
409
410dyn_newtype_define! {
411    #[derive(Clone)]
412    pub DynModuleApi(Arc<IModuleFederationApi>)
413}
414
415dyn_newtype_define! {
416    #[derive(Clone)]
417    pub DynGlobalApi(Arc<IGlobalFederationApi>)
418}
419
420impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
421    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
422        self.inner.as_ref()
423    }
424}
425
426impl DynGlobalApi {
427    pub async fn new_admin(
428        peer: PeerId,
429        url: SafeUrl,
430        api_secret: &Option<String>,
431    ) -> anyhow::Result<DynGlobalApi> {
432        Ok(GlobalFederationApiWithCache::new(
433            ReconnectFederationApi::from_endpoints(once((peer, url)), api_secret, Some(peer))
434                .await?,
435        )
436        .into())
437    }
438
439    // FIXME: (@leonardo) Should we have the option to do DKG and config related
440    // actions through Tor ? Should we add the `Connector` choice to
441    // ConfigParams then ?
442    pub async fn from_setup_endpoint(
443        url: SafeUrl,
444        api_secret: &Option<String>,
445    ) -> anyhow::Result<Self> {
446        // PeerIds are used only for informational purposes, but just in case, make a
447        // big number so it stands out
448
449        Self::new_admin(PeerId::from(1024), url, api_secret).await
450    }
451
452    pub async fn from_endpoints(
453        peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
454        api_secret: &Option<String>,
455    ) -> anyhow::Result<Self> {
456        Ok(GlobalFederationApiWithCache::new(
457            ReconnectFederationApi::from_endpoints(peers, api_secret, None).await?,
458        )
459        .into())
460    }
461}
462
463/// The API for the global (non-module) endpoints
464#[apply(async_trait_maybe_send!)]
465pub trait IGlobalFederationApi: IRawFederationApi {
466    async fn submit_transaction(
467        &self,
468        tx: Transaction,
469    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
470
471    async fn await_block(
472        &self,
473        block_index: u64,
474        decoders: &ModuleDecoderRegistry,
475    ) -> anyhow::Result<SessionOutcome>;
476
477    async fn get_session_status(
478        &self,
479        block_index: u64,
480        decoders: &ModuleDecoderRegistry,
481        core_api_version: ApiVersion,
482        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
483    ) -> anyhow::Result<SessionStatus>;
484
485    async fn session_count(&self) -> FederationResult<u64>;
486
487    async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
488
489    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
490
491    async fn download_backup(
492        &self,
493        id: &secp256k1::PublicKey,
494    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
495
496    /// Sets the password used to decrypt the configs and authenticate
497    ///
498    /// Must be called first before any other calls to the API
499    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
500
501    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
502
503    async fn set_local_params(
504        &self,
505        name: String,
506        federation_name: Option<String>,
507        auth: ApiAuth,
508    ) -> FederationResult<String>;
509
510    async fn add_peer_connection_info(
511        &self,
512        info: String,
513        auth: ApiAuth,
514    ) -> FederationResult<String>;
515
516    /// Reset the peer setup codes during the federation setup process
517    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
518
519    /// Returns the setup code if set_local_params was already called
520    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
521
522    /// During config gen, used for an API-to-API call that adds a peer's server
523    /// connection info to the leader.
524    ///
525    /// Note this call will fail until the leader has their API running and has
526    /// `set_server_connections` so clients should retry.
527    ///
528    /// This call is not authenticated because it's guardian-to-guardian
529    async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
530
531    /// During config gen, gets all the server connections we've received from
532    /// peers using `add_config_gen_peer`
533    ///
534    /// Could be called on the leader, so it's not authenticated
535    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
536
537    /// Runs DKG, can only be called once after configs have been generated in
538    /// `get_consensus_config_gen_params`.  If DKG fails this returns a 500
539    /// error and config gen must be restarted.
540    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
541
542    /// After DKG, returns the hash of the consensus config tweaked with our id.
543    /// We need to share this with all other peers to complete verification.
544    async fn get_verify_config_hash(
545        &self,
546        auth: ApiAuth,
547    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
548
549    /// Updates local state and notify leader that we have verified configs.
550    /// This allows for a synchronization point, before we start consensus.
551    async fn verified_configs(
552        &self,
553        auth: ApiAuth,
554    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
555
556    /// Reads the configs from the disk, starts the consensus server, and shuts
557    /// down the config gen API to start the Fedimint API
558    ///
559    /// Clients may receive an error due to forced shutdown, should call the
560    /// `server_status` to see if consensus has started.
561    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
562
563    /// Returns the status of the server
564    async fn status(&self) -> FederationResult<StatusResponse>;
565
566    /// Show an audit across all modules
567    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
568
569    /// Download the guardian config to back it up
570    async fn guardian_config_backup(&self, auth: ApiAuth)
571    -> FederationResult<GuardianConfigBackup>;
572
573    /// Check auth credentials
574    async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
575
576    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
577
578    /// Publish our signed API announcement to other guardians
579    async fn submit_api_announcement(
580        &self,
581        peer_id: PeerId,
582        announcement: SignedApiAnnouncement,
583    ) -> FederationResult<()>;
584
585    async fn api_announcements(
586        &self,
587        guardian: PeerId,
588    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
589
590    async fn sign_api_announcement(
591        &self,
592        api_url: SafeUrl,
593        auth: ApiAuth,
594    ) -> FederationResult<SignedApiAnnouncement>;
595
596    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
597
598    /// Returns the fedimintd version a peer is running
599    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
600
601    /// Fetch the backup statistics from the federation (admin endpoint)
602    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
603
604    /// Get the invite code for the federation guardian.
605    /// For instance, useful after DKG
606    async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode>;
607}
608
609pub fn deserialize_outcome<R>(
610    outcome: &SerdeOutputOutcome,
611    module_decoder: &Decoder,
612) -> OutputOutcomeResult<R>
613where
614    R: OutputOutcome + MaybeSend,
615{
616    let dyn_outcome = outcome
617        .try_into_inner_known_module_kind(module_decoder)
618        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
619
620    let source_instance = dyn_outcome.module_instance_id();
621
622    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
623        let target_type = std::any::type_name::<R>();
624        OutputOutcomeError::ResponseDeserialization(anyhow!(
625            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
626        ))
627    })
628}
629
630#[derive(Debug, Clone)]
631pub struct WebsocketConnector {
632    peers: BTreeMap<PeerId, SafeUrl>,
633    api_secret: Option<String>,
634
635    /// List of overrides to use when attempting to connect to given
636    /// `PeerId`
637    ///
638    /// This is useful for testing, or forcing non-default network
639    /// connectivity.
640    pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
641}
642
643impl WebsocketConnector {
644    fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
645        let mut s = Self::new_no_overrides(peers, api_secret);
646
647        for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
648            s = s.with_connection_override(k, v);
649        }
650
651        Ok(s)
652    }
653    pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
654        self.connection_overrides.insert(peer_id, url);
655        self
656    }
657    pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
658        Self {
659            peers,
660            api_secret,
661            connection_overrides: BTreeMap::default(),
662        }
663    }
664}
665
666#[async_trait]
667impl IClientConnector for WebsocketConnector {
668    fn peers(&self) -> BTreeSet<PeerId> {
669        self.peers.keys().copied().collect()
670    }
671
672    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
673        let api_endpoint = match self.connection_overrides.get(&peer_id) {
674            Some(url) => {
675                trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
676                url
677            }
678            None => self.peers.get(&peer_id).ok_or_else(|| {
679                PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
680            })?,
681        };
682
683        #[cfg(not(target_family = "wasm"))]
684        let mut client = {
685            install_crypto_provider();
686            let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
687            let mut root_certs = RootCertStore::empty();
688            root_certs.extend(webpki_roots);
689
690            let tls_cfg = CustomCertStore::builder()
691                .with_root_certificates(root_certs)
692                .with_no_client_auth();
693
694            WsClientBuilder::default()
695                .max_concurrent_requests(u16::MAX as usize)
696                .with_custom_cert_store(tls_cfg)
697        };
698
699        #[cfg(target_family = "wasm")]
700        let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
701
702        if let Some(api_secret) = &self.api_secret {
703            #[cfg(not(target_family = "wasm"))]
704            {
705                // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
706                // but we can set up the headers manually
707                let mut headers = HeaderMap::new();
708
709                let auth = base64::engine::general_purpose::STANDARD
710                    .encode(format!("fedimint:{api_secret}"));
711
712                headers.insert(
713                    "Authorization",
714                    HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
715                );
716
717                client = client.set_headers(headers);
718            }
719            #[cfg(target_family = "wasm")]
720            {
721                // on wasm, url will be handled by the browser, which should take care of
722                // `user:pass@...`
723                let mut url = api_endpoint.clone();
724                url.set_username("fedimint")
725                    .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
726                url.set_password(Some(&api_secret))
727                    .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
728
729                let client = client
730                    .build(url.as_str())
731                    .await
732                    .map_err(|err| PeerError::InternalClientError(err.into()))?;
733
734                return Ok(client.into_dyn());
735            }
736        }
737
738        let client = client
739            .build(api_endpoint.as_str())
740            .await
741            .map_err(|err| PeerError::InternalClientError(err.into()))?;
742
743        Ok(client.into_dyn())
744    }
745}
746
747#[cfg(all(feature = "tor", not(target_family = "wasm")))]
748#[derive(Debug, Clone)]
749pub struct TorConnector {
750    peers: BTreeMap<PeerId, SafeUrl>,
751    api_secret: Option<String>,
752}
753
754#[cfg(all(feature = "tor", not(target_family = "wasm")))]
755impl TorConnector {
756    pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
757        Self { peers, api_secret }
758    }
759}
760
761#[cfg(all(feature = "tor", not(target_family = "wasm")))]
762#[async_trait]
763impl IClientConnector for TorConnector {
764    fn peers(&self) -> BTreeSet<PeerId> {
765        self.peers.keys().copied().collect()
766    }
767
768    #[allow(clippy::too_many_lines)]
769    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
770        let api_endpoint = self
771            .peers
772            .get(&peer_id)
773            .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
774
775        install_crypto_provider();
776
777        let tor_config = TorClientConfig::default();
778        let tor_client = TorClient::create_bootstrapped(tor_config)
779            .await
780            .map_err(|err| PeerError::InternalClientError(err.into()))?
781            .isolated_client();
782
783        debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
784
785        // TODO: (@leonardo) should we implement our `IntoTorAddr` for `SafeUrl`
786        // instead?
787        let addr = (
788            api_endpoint
789                .host_str()
790                .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
791            api_endpoint
792                .port_or_known_default()
793                .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
794        );
795        let tor_addr = TorAddr::from(addr).map_err(|e| {
796            PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
797        })?;
798
799        let tor_addr_clone = tor_addr.clone();
800
801        debug!(
802            ?tor_addr,
803            ?addr,
804            "Successfully created `TorAddr` for given address (i.e. host and port)"
805        );
806
807        // TODO: It can be updated to use `is_onion_address()` implementation,
808        // once https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2214 lands.
809        let anonymized_stream = if api_endpoint.is_onion_address() {
810            let mut stream_prefs = arti_client::StreamPrefs::default();
811            stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
812
813            let anonymized_stream = tor_client
814                .connect_with_prefs(tor_addr, &stream_prefs)
815                .await
816                .map_err(|e| PeerError::Connection(e.into()))?;
817
818            debug!(
819                ?tor_addr_clone,
820                "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
821            );
822            anonymized_stream
823        } else {
824            let anonymized_stream = tor_client
825                .connect(tor_addr)
826                .await
827                .map_err(|e| PeerError::Connection(e.into()))?;
828
829            debug!(
830                ?tor_addr_clone,
831                "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
832            );
833            anonymized_stream
834        };
835
836        let is_tls = match api_endpoint.scheme() {
837            "wss" => true,
838            "ws" => false,
839            unexpected_scheme => {
840                return Err(PeerError::InvalidEndpoint(anyhow!(
841                    "Unsupported scheme: {unexpected_scheme}"
842                )));
843            }
844        };
845
846        let tls_connector = if is_tls {
847            let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
848            let mut root_certs = RootCertStore::empty();
849            root_certs.extend(webpki_roots);
850
851            let tls_config = TlsClientConfig::builder()
852                .with_root_certificates(root_certs)
853                .with_no_client_auth();
854            let tls_connector = TlsConnector::from(Arc::new(tls_config));
855            Some(tls_connector)
856        } else {
857            None
858        };
859
860        let mut ws_client_builder =
861            WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
862
863        if let Some(api_secret) = &self.api_secret {
864            // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
865            // but we can set up the headers manually
866            let mut headers = HeaderMap::new();
867
868            let auth =
869                base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
870
871            headers.insert(
872                "Authorization",
873                HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
874            );
875
876            ws_client_builder = ws_client_builder.set_headers(headers);
877        }
878
879        match tls_connector {
880            None => {
881                let client = ws_client_builder
882                    .build_with_stream(api_endpoint.as_str(), anonymized_stream)
883                    .await
884                    .map_err(|e| PeerError::Connection(e.into()))?;
885
886                Ok(client.into_dyn())
887            }
888            Some(tls_connector) => {
889                let host = api_endpoint
890                    .host_str()
891                    .map(ToOwned::to_owned)
892                    .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
893
894                // FIXME: (@leonardo) Is this leaking any data ? Should investigate it further
895                // if it's really needed.
896                let server_name = rustls_pki_types::ServerName::try_from(host)
897                    .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
898
899                let anonymized_tls_stream = tls_connector
900                    .connect(server_name, anonymized_stream)
901                    .await
902                    .map_err(|e| PeerError::Connection(e.into()))?;
903
904                let client = ws_client_builder
905                    .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
906                    .await
907                    .map_err(|e| PeerError::Connection(e.into()))?;
908
909                Ok(client.into_dyn())
910            }
911        }
912    }
913}
914
915fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
916    match jsonrpc_error {
917        JsonRpcClientError::Call(error_object) => {
918            let error = anyhow!(error_object.message().to_owned());
919            match ErrorCode::from(error_object.code()) {
920                ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
921                    PeerError::InvalidRequest(error)
922                }
923                ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
924                ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
925                ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
926                    PeerError::ServerError(error)
927                }
928            }
929        }
930        JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
931        JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
932        JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
933        JsonRpcClientError::InvalidSubscriptionId => {
934            PeerError::Transport(anyhow!("Invalid subscription id"))
935        }
936        JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
937            PeerError::InvalidRequest(anyhow!(invalid_request_id))
938        }
939        JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
940        JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
941        JsonRpcClientError::HttpNotImplemented => {
942            PeerError::ServerError(anyhow!("Http not implemented"))
943        }
944        JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
945            PeerError::InvalidRequest(anyhow!(empty_batch_request))
946        }
947        JsonRpcClientError::RegisterMethod(register_method_error) => {
948            PeerError::InvalidResponse(anyhow!(register_method_error))
949        }
950    }
951}
952
953#[async_trait]
954impl IClientConnection for WsClient {
955    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
956        let method = match method {
957            ApiMethod::Core(method) => method,
958            ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
959        };
960
961        Ok(ClientT::request(self, &method, [request.to_json()])
962            .await
963            .map_err(jsonrpc_error_to_peer_error)?)
964    }
965
966    async fn await_disconnection(&self) {
967        self.on_disconnect().await;
968    }
969}
970
971pub type DynClientConnector = Arc<dyn IClientConnector>;
972
973/// Allows to connect to peers. Connections are request based and should be
974/// authenticated and encrypted for production deployments.
975#[async_trait]
976pub trait IClientConnector: Send + Sync + 'static {
977    fn peers(&self) -> BTreeSet<PeerId>;
978
979    async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
980
981    fn into_dyn(self) -> DynClientConnector
982    where
983        Self: Sized,
984    {
985        Arc::new(self)
986    }
987}
988
989pub type DynClientConnection = Arc<dyn IClientConnection>;
990
991#[async_trait]
992pub trait IClientConnection: Debug + Send + Sync + 'static {
993    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
994
995    async fn await_disconnection(&self);
996
997    fn into_dyn(self) -> DynClientConnection
998    where
999        Self: Sized,
1000    {
1001        Arc::new(self)
1002    }
1003}
1004
1005#[derive(Clone, Debug)]
1006pub struct ReconnectFederationApi {
1007    peers: BTreeSet<PeerId>,
1008    admin_id: Option<PeerId>,
1009    module_id: Option<ModuleInstanceId>,
1010    connections: ReconnectClientConnections,
1011}
1012
1013impl ReconnectFederationApi {
1014    fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
1015        Self {
1016            peers: connector.peers(),
1017            admin_id,
1018            module_id: None,
1019            connections: ReconnectClientConnections::new(connector),
1020        }
1021    }
1022
1023    pub async fn new_admin(
1024        peer: PeerId,
1025        url: SafeUrl,
1026        api_secret: &Option<String>,
1027    ) -> anyhow::Result<Self> {
1028        Self::from_endpoints(once((peer, url)), api_secret, Some(peer)).await
1029    }
1030
1031    pub async fn from_endpoints(
1032        peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1033        api_secret: &Option<String>,
1034        admin_id: Option<PeerId>,
1035    ) -> anyhow::Result<Self> {
1036        let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1037
1038        let scheme = peers
1039            .values()
1040            .next()
1041            .expect("Federation api has been initialized with no peers")
1042            .scheme();
1043
1044        let connector = match scheme {
1045            "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1046            #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1047            "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1048            "iroh" => {
1049                let iroh_dns = std::env::var(FM_IROH_DNS_ENV)
1050                    .ok()
1051                    .and_then(|dns| dns.parse().ok());
1052                iroh::IrohConnector::new(peers, iroh_dns).await?.into_dyn()
1053            }
1054            scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1055        };
1056
1057        Ok(ReconnectFederationApi::new(&connector, admin_id))
1058    }
1059}
1060
1061impl IModuleFederationApi for ReconnectFederationApi {}
1062
1063#[apply(async_trait_maybe_send!)]
1064impl IRawFederationApi for ReconnectFederationApi {
1065    fn all_peers(&self) -> &BTreeSet<PeerId> {
1066        &self.peers
1067    }
1068
1069    fn self_peer(&self) -> Option<PeerId> {
1070        self.admin_id
1071    }
1072
1073    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1074        ReconnectFederationApi {
1075            peers: self.peers.clone(),
1076            admin_id: self.admin_id,
1077            module_id: Some(id),
1078            connections: self.connections.clone(),
1079        }
1080        .into()
1081    }
1082
1083    #[instrument(
1084        target = LOG_NET_API,
1085        skip_all,
1086        fields(
1087            peer_id = %peer_id,
1088            method = %method,
1089            params = %params.params,
1090        )
1091    )]
1092    async fn request_raw(
1093        &self,
1094        peer_id: PeerId,
1095        method: &str,
1096        params: &ApiRequestErased,
1097    ) -> PeerResult<Value> {
1098        let method = match self.module_id {
1099            Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1100            None => ApiMethod::Core(method.to_string()),
1101        };
1102
1103        self.connections
1104            .request(peer_id, method, params.clone())
1105            .await
1106    }
1107}
1108
1109#[derive(Clone, Debug)]
1110pub struct ReconnectClientConnections {
1111    connections: BTreeMap<PeerId, ClientConnection>,
1112}
1113
1114impl ReconnectClientConnections {
1115    pub fn new(connector: &DynClientConnector) -> Self {
1116        ReconnectClientConnections {
1117            connections: connector
1118                .peers()
1119                .into_iter()
1120                .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1121                .collect(),
1122        }
1123    }
1124
1125    async fn request(
1126        &self,
1127        peer: PeerId,
1128        method: ApiMethod,
1129        request: ApiRequestErased,
1130    ) -> PeerResult<Value> {
1131        trace!(target: LOG_NET_API, %method, "Api request");
1132        let res = self
1133            .connections
1134            .get(&peer)
1135            .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1136            .connection()
1137            .await
1138            .context("Failed to connect to peer")
1139            .map_err(PeerError::Connection)?
1140            .request(method.clone(), request)
1141            .await;
1142
1143        trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1144
1145        res
1146    }
1147}
1148
1149#[derive(Clone, Debug)]
1150struct ClientConnection {
1151    sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1152}
1153
1154impl ClientConnection {
1155    fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1156        let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1157
1158        fedimint_core::task::spawn(
1159            "peer-api-connection",
1160            async move {
1161                let mut backoff = api_networking_backoff();
1162
1163                while let Ok(sender) = receiver.recv().await {
1164                    let mut senders = vec![sender];
1165
1166                    // Drain the queue, so we everyone that already joined fail or succeed
1167                    // together.
1168                    while let Ok(sender) = receiver.try_recv() {
1169                        senders.push(sender);
1170                    }
1171
1172                    match connector.connect(peer).await {
1173                        Ok(connection) => {
1174                            trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1175
1176                            for sender in senders {
1177                                sender.send(connection.clone()).ok();
1178                            }
1179
1180                            loop {
1181                                tokio::select! {
1182                                    sender = receiver.recv() => {
1183                                        match sender.ok() {
1184                                            Some(sender) => sender.send(connection.clone()).ok(),
1185                                            None => break,
1186                                        };
1187                                    }
1188                                    () = connection.await_disconnection() => break,
1189                                }
1190                            }
1191
1192                            trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1193
1194                            backoff = api_networking_backoff();
1195                        }
1196                        Err(e) => {
1197                            trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1198
1199                            fedimint_core::task::sleep(
1200                                backoff.next().expect("No limit to the number of retries"),
1201                            )
1202                            .await;
1203                        }
1204                    }
1205                }
1206
1207                trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1208            }
1209            .instrument(trace_span!("peer-api-connection", ?peer)),
1210        );
1211
1212        ClientConnection { sender }
1213    }
1214
1215    async fn connection(&self) -> Option<DynClientConnection> {
1216        let (sender, receiver) = oneshot::channel();
1217
1218        self.sender
1219            .send(sender)
1220            .await
1221            .inspect_err(|err| {
1222                warn!(
1223                    target: LOG_CLIENT_NET_API,
1224                    err = %err.fmt_compact(),
1225                    "Api connection request channel closed unexpectedly"
1226                );
1227            })
1228            .ok()?;
1229
1230        receiver.await.ok()
1231    }
1232}
1233
1234/// The status of a server, including how it views its peers
1235#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1236pub struct LegacyFederationStatus {
1237    pub session_count: u64,
1238    pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1239    pub peers_online: u64,
1240    pub peers_offline: u64,
1241    /// This should always be 0 if everything is okay, so a monitoring tool
1242    /// should generate an alert if this is not the case.
1243    pub peers_flagged: u64,
1244    pub scheduled_shutdown: Option<u64>,
1245}
1246
1247#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1248pub struct LegacyPeerStatus {
1249    pub last_contribution: Option<u64>,
1250    pub connection_status: LegacyP2PConnectionStatus,
1251    /// Indicates that this peer needs attention from the operator since
1252    /// it has not contributed to the consensus in a long time
1253    pub flagged: bool,
1254}
1255
1256#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1257#[serde(rename_all = "snake_case")]
1258pub enum LegacyP2PConnectionStatus {
1259    #[default]
1260    Disconnected,
1261    Connected,
1262}
1263
1264#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1265pub struct StatusResponse {
1266    pub server: ServerStatusLegacy,
1267    pub federation: Option<LegacyFederationStatus>,
1268}
1269
1270/// Archive of all the guardian config files that can be used to recover a lost
1271/// guardian node.
1272#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1273pub struct GuardianConfigBackup {
1274    #[serde(with = "fedimint_core::hex::serde")]
1275    pub tar_archive_bytes: Vec<u8>,
1276}
1277
1278#[cfg(test)]
1279mod tests;