1use core::panic;
2use std::collections::{BTreeMap, BTreeSet, HashMap};
3use std::fmt::Debug;
4use std::iter::once;
5use std::pin::Pin;
6use std::result;
7use std::sync::Arc;
8
9use anyhow::{Context, anyhow};
10#[cfg(all(feature = "tor", not(target_family = "wasm")))]
11use arti_client::{TorAddr, TorClient, TorClientConfig};
12use async_channel::bounded;
13use async_trait::async_trait;
14use base64::Engine as _;
15use bitcoin::hashes::sha256;
16use bitcoin::secp256k1;
17pub use error::{FederationError, OutputOutcomeError, PeerError};
18use fedimint_core::admin_client::{
19 ConfigGenConnectionsRequest, PeerServerParams, ServerStatus, ServerStatusLegacy,
20};
21use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
22use fedimint_core::core::backup::SignedBackupRequest;
23use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
24use fedimint_core::encoding::{Decodable, Encodable};
25use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
26use fedimint_core::module::audit::AuditSummary;
27use fedimint_core::module::registry::ModuleDecoderRegistry;
28use fedimint_core::module::{
29 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
30};
31use fedimint_core::net::api_announcement::SignedApiAnnouncement;
32use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
33use fedimint_core::task::{MaybeSend, MaybeSync};
34use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
35use fedimint_core::util::backoff_util::api_networking_backoff;
36use fedimint_core::util::{FmtCompact as _, SafeUrl};
37use fedimint_core::{
38 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
39};
40use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
41use futures::channel::oneshot;
42use futures::future::pending;
43use futures::stream::FuturesUnordered;
44use futures::{Future, StreamExt};
45use global_api::with_cache::GlobalFederationApiWithCache;
46use jsonrpsee_core::DeserializeOwned;
47use jsonrpsee_core::client::ClientT;
48pub use jsonrpsee_core::client::Error as JsonRpcClientError;
49use jsonrpsee_types::ErrorCode;
50#[cfg(target_family = "wasm")]
51use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
52#[cfg(not(target_family = "wasm"))]
53use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
54#[cfg(not(target_family = "wasm"))]
55use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
56use serde::{Deserialize, Serialize};
57use serde_json::Value;
58#[cfg(not(target_family = "wasm"))]
59use tokio_rustls::rustls::RootCertStore;
60#[cfg(all(feature = "tor", not(target_family = "wasm")))]
61use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
62use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
63
64use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
65mod error;
66pub mod global_api;
67pub mod net;
68
69pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
70
71pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
72 ApiVersion { major: 0, minor: 1 };
73
74pub type PeerResult<T> = Result<T, PeerError>;
75pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
76pub type FederationResult<T> = Result<T, FederationError>;
77pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
78
79pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
80
81#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
85pub struct ApiVersionSet {
86 pub core: ApiVersion,
87 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
88}
89
90#[apply(async_trait_maybe_send!)]
92pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
93 fn all_peers(&self) -> &BTreeSet<PeerId>;
101
102 fn self_peer(&self) -> Option<PeerId>;
107
108 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
109
110 async fn request_raw(
112 &self,
113 peer_id: PeerId,
114 method: &str,
115 params: &ApiRequestErased,
116 ) -> PeerResult<Value>;
117}
118
119#[apply(async_trait_maybe_send!)]
122pub trait FederationApiExt: IRawFederationApi {
123 async fn request_single_peer<Ret>(
124 &self,
125 method: String,
126 params: ApiRequestErased,
127 peer: PeerId,
128 ) -> PeerResult<Ret>
129 where
130 Ret: DeserializeOwned,
131 {
132 self.request_raw(peer, &method, ¶ms)
133 .await
134 .and_then(|v| {
135 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
136 })
137 }
138
139 async fn request_single_peer_federation<FedRet>(
140 &self,
141 method: String,
142 params: ApiRequestErased,
143 peer_id: PeerId,
144 ) -> FederationResult<FedRet>
145 where
146 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
147 {
148 self.request_raw(peer_id, &method, ¶ms)
149 .await
150 .and_then(|v| {
151 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
152 })
153 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
154 }
155
156 #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
159 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
160 &self,
161 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
162 method: String,
163 params: ApiRequestErased,
164 ) -> FederationResult<FR> {
165 #[cfg(not(target_family = "wasm"))]
169 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
170 #[cfg(target_family = "wasm")]
171 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
172
173 for peer in self.all_peers() {
174 futures.push(Box::pin({
175 let method = &method;
176 let params = ¶ms;
177 async move {
178 let result = self
179 .request_single_peer(method.clone(), params.clone(), *peer)
180 .await;
181
182 (*peer, result)
183 }
184 }));
185 }
186
187 let mut peer_errors = BTreeMap::new();
188 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
189
190 loop {
191 let (peer, result) = futures
192 .next()
193 .await
194 .expect("Query strategy ran out of peers to query without returning a result");
195
196 match result {
197 Ok(response) => match strategy.process(peer, response) {
198 QueryStep::Retry(peers) => {
199 for peer in peers {
200 futures.push(Box::pin({
201 let method = &method;
202 let params = ¶ms;
203 async move {
204 let result = self
205 .request_single_peer(method.clone(), params.clone(), peer)
206 .await;
207
208 (peer, result)
209 }
210 }));
211 }
212 }
213 QueryStep::Success(response) => return Ok(response),
214 QueryStep::Failure(e) => {
215 peer_errors.insert(peer, e);
216 }
217 QueryStep::Continue => {}
218 },
219 Err(e) => {
220 e.report_if_unusual(peer, "RequestWithStrategy");
221 peer_errors.insert(peer, e);
222 }
223 }
224
225 if peer_errors.len() == peer_error_threshold {
226 return Err(FederationError::peer_errors(
227 method.clone(),
228 params.params.clone(),
229 peer_errors,
230 ));
231 }
232 }
233 }
234
235 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
236 &self,
237 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
238 method: String,
239 params: ApiRequestErased,
240 ) -> FR {
241 #[cfg(not(target_family = "wasm"))]
245 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
246 #[cfg(target_family = "wasm")]
247 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
248
249 for peer in self.all_peers() {
250 futures.push(Box::pin({
251 let method = &method;
252 let params = ¶ms;
253 async move {
254 let response = util::retry(
255 "api-request-{method}-{peer}",
256 api_networking_backoff(),
257 || async {
258 self.request_single_peer(method.clone(), params.clone(), *peer)
259 .await
260 .inspect_err(|e| {
261 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
262 })
263 .map_err(|e| anyhow!(e.to_string()))
264 },
265 )
266 .await
267 .expect("Number of retries has no limit");
268
269 (*peer, response)
270 }
271 }));
272 }
273
274 loop {
275 let (peer, response) = match futures.next().await {
276 Some(t) => t,
277 None => pending().await,
278 };
279
280 match strategy.process(peer, response) {
281 QueryStep::Retry(peers) => {
282 for peer in peers {
283 futures.push(Box::pin({
284 let method = &method;
285 let params = ¶ms;
286 async move {
287 let response = util::retry(
288 "api-request-{method}-{peer}",
289 api_networking_backoff(),
290 || async {
291 self.request_single_peer(
292 method.clone(),
293 params.clone(),
294 peer,
295 )
296 .await
297 .inspect_err(|err| {
298 if err.is_unusual() {
299 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
300 }
301 })
302 .map_err(|e| anyhow!(e.to_string()))
303 },
304 )
305 .await
306 .expect("Number of retries has no limit");
307
308 (peer, response)
309 }
310 }));
311 }
312 }
313 QueryStep::Success(response) => return response,
314 QueryStep::Failure(e) => {
315 warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
316 }
317 QueryStep::Continue => {}
318 }
319 }
320 }
321
322 async fn request_current_consensus<Ret>(
323 &self,
324 method: String,
325 params: ApiRequestErased,
326 ) -> FederationResult<Ret>
327 where
328 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
329 {
330 self.request_with_strategy(
331 ThresholdConsensus::new(self.all_peers().to_num_peers()),
332 method,
333 params,
334 )
335 .await
336 }
337
338 async fn request_current_consensus_retry<Ret>(
339 &self,
340 method: String,
341 params: ApiRequestErased,
342 ) -> Ret
343 where
344 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
345 {
346 self.request_with_strategy_retry(
347 ThresholdConsensus::new(self.all_peers().to_num_peers()),
348 method,
349 params,
350 )
351 .await
352 }
353
354 async fn request_admin<Ret>(
355 &self,
356 method: &str,
357 params: ApiRequestErased,
358 auth: ApiAuth,
359 ) -> FederationResult<Ret>
360 where
361 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
362 {
363 let Some(self_peer_id) = self.self_peer() else {
364 return Err(FederationError::general(
365 method,
366 params,
367 anyhow::format_err!("Admin peer_id not set"),
368 ));
369 };
370
371 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
372 .await
373 }
374
375 async fn request_admin_no_auth<Ret>(
376 &self,
377 method: &str,
378 params: ApiRequestErased,
379 ) -> FederationResult<Ret>
380 where
381 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
382 {
383 let Some(self_peer_id) = self.self_peer() else {
384 return Err(FederationError::general(
385 method,
386 params,
387 anyhow::format_err!("Admin peer_id not set"),
388 ));
389 };
390
391 self.request_single_peer_federation(method.into(), params, self_peer_id)
392 .await
393 }
394}
395
396#[apply(async_trait_maybe_send!)]
397impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
398
399pub trait IModuleFederationApi: IRawFederationApi {}
401
402dyn_newtype_define! {
403 #[derive(Clone)]
404 pub DynModuleApi(Arc<IModuleFederationApi>)
405}
406
407dyn_newtype_define! {
408 #[derive(Clone)]
409 pub DynGlobalApi(Arc<IGlobalFederationApi>)
410}
411
412impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
413 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
414 self.inner.as_ref()
415 }
416}
417
418impl DynGlobalApi {
419 pub async fn new_admin(
420 peer: PeerId,
421 url: SafeUrl,
422 api_secret: &Option<String>,
423 ) -> anyhow::Result<DynGlobalApi> {
424 Ok(GlobalFederationApiWithCache::new(
425 ReconnectFederationApi::from_endpoints(once((peer, url)), api_secret, Some(peer))
426 .await?,
427 )
428 .into())
429 }
430
431 pub async fn from_pre_peer_id_admin_endpoint(
435 url: SafeUrl,
436 api_secret: &Option<String>,
437 ) -> anyhow::Result<Self> {
438 Self::new_admin(PeerId::from(1024), url, api_secret).await
442 }
443
444 pub async fn from_endpoints(
445 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
446 api_secret: &Option<String>,
447 ) -> anyhow::Result<Self> {
448 Ok(GlobalFederationApiWithCache::new(
449 ReconnectFederationApi::from_endpoints(peers, api_secret, None).await?,
450 )
451 .into())
452 }
453}
454
455#[apply(async_trait_maybe_send!)]
457pub trait IGlobalFederationApi: IRawFederationApi {
458 async fn submit_transaction(
459 &self,
460 tx: Transaction,
461 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
462
463 async fn await_block(
464 &self,
465 block_index: u64,
466 decoders: &ModuleDecoderRegistry,
467 ) -> anyhow::Result<SessionOutcome>;
468
469 async fn get_session_status(
470 &self,
471 block_index: u64,
472 decoders: &ModuleDecoderRegistry,
473 core_api_version: ApiVersion,
474 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
475 ) -> anyhow::Result<SessionStatus>;
476
477 async fn session_count(&self) -> FederationResult<u64>;
478
479 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
480
481 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
482
483 async fn download_backup(
484 &self,
485 id: &secp256k1::PublicKey,
486 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
487
488 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
492
493 async fn server_status(&self, auth: ApiAuth) -> FederationResult<ServerStatus>;
494
495 async fn set_local_params(
496 &self,
497 name: String,
498 federation_name: Option<String>,
499 auth: ApiAuth,
500 ) -> FederationResult<String>;
501
502 async fn add_peer_connection_info(
503 &self,
504 info: String,
505 auth: ApiAuth,
506 ) -> FederationResult<String>;
507
508 async fn set_config_gen_connections(
513 &self,
514 info: ConfigGenConnectionsRequest,
515 auth: ApiAuth,
516 ) -> FederationResult<()>;
517
518 async fn add_config_gen_peer(&self, peer: PeerServerParams) -> FederationResult<()>;
526
527 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParams>>;
532
533 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
537
538 async fn get_verify_config_hash(
541 &self,
542 auth: ApiAuth,
543 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
544
545 async fn verified_configs(
548 &self,
549 auth: ApiAuth,
550 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
551
552 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
558
559 async fn status(&self) -> FederationResult<StatusResponse>;
561
562 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
564
565 async fn guardian_config_backup(&self, auth: ApiAuth)
567 -> FederationResult<GuardianConfigBackup>;
568
569 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
571
572 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
573
574 async fn submit_api_announcement(
576 &self,
577 peer_id: PeerId,
578 announcement: SignedApiAnnouncement,
579 ) -> FederationResult<()>;
580
581 async fn api_announcements(
582 &self,
583 guardian: PeerId,
584 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
585
586 async fn sign_api_announcement(
587 &self,
588 api_url: SafeUrl,
589 auth: ApiAuth,
590 ) -> FederationResult<SignedApiAnnouncement>;
591
592 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
593
594 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
596
597 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
599}
600
601pub fn deserialize_outcome<R>(
602 outcome: &SerdeOutputOutcome,
603 module_decoder: &Decoder,
604) -> OutputOutcomeResult<R>
605where
606 R: OutputOutcome + MaybeSend,
607{
608 let dyn_outcome = outcome
609 .try_into_inner_known_module_kind(module_decoder)
610 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
611
612 let source_instance = dyn_outcome.module_instance_id();
613
614 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
615 let target_type = std::any::type_name::<R>();
616 OutputOutcomeError::ResponseDeserialization(anyhow!(
617 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
618 ))
619 })
620}
621
622#[derive(Debug, Clone)]
623pub struct WebsocketConnector {
624 peers: BTreeMap<PeerId, SafeUrl>,
625 api_secret: Option<String>,
626
627 pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
633}
634
635impl WebsocketConnector {
636 fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
637 let mut s = Self::new_no_overrides(peers, api_secret);
638
639 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
640 s = s.with_connection_override(k, v);
641 }
642
643 Ok(s)
644 }
645 pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
646 self.connection_overrides.insert(peer_id, url);
647 self
648 }
649 pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
650 Self {
651 peers,
652 api_secret,
653 connection_overrides: BTreeMap::default(),
654 }
655 }
656}
657
658#[async_trait]
659impl IClientConnector for WebsocketConnector {
660 fn peers(&self) -> BTreeSet<PeerId> {
661 self.peers.keys().copied().collect()
662 }
663
664 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
665 let api_endpoint = match self.connection_overrides.get(&peer_id) {
666 Some(url) => {
667 trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
668 url
669 }
670 None => self.peers.get(&peer_id).ok_or_else(|| {
671 PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
672 })?,
673 };
674
675 #[cfg(not(target_family = "wasm"))]
676 let mut client = {
677 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
678 let mut root_certs = RootCertStore::empty();
679 root_certs.extend(webpki_roots);
680
681 let tls_cfg = CustomCertStore::builder()
682 .with_root_certificates(root_certs)
683 .with_no_client_auth();
684
685 WsClientBuilder::default()
686 .max_concurrent_requests(u16::MAX as usize)
687 .with_custom_cert_store(tls_cfg)
688 };
689
690 #[cfg(target_family = "wasm")]
691 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
692
693 if let Some(api_secret) = &self.api_secret {
694 #[cfg(not(target_family = "wasm"))]
695 {
696 let mut headers = HeaderMap::new();
699
700 let auth = base64::engine::general_purpose::STANDARD
701 .encode(format!("fedimint:{api_secret}"));
702
703 headers.insert(
704 "Authorization",
705 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
706 );
707
708 client = client.set_headers(headers);
709 }
710 #[cfg(target_family = "wasm")]
711 {
712 let mut url = api_endpoint.clone();
715 url.set_username("fedimint")
716 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
717 url.set_password(Some(&api_secret))
718 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
719
720 let client = client
721 .build(url.as_str())
722 .await
723 .map_err(|err| PeerError::InternalClientError(err.into()))?;
724
725 return Ok(client.into_dyn());
726 }
727 }
728
729 let client = client
730 .build(api_endpoint.as_str())
731 .await
732 .map_err(|err| PeerError::InternalClientError(err.into()))?;
733
734 Ok(client.into_dyn())
735 }
736}
737
738#[cfg(all(feature = "tor", not(target_family = "wasm")))]
739#[derive(Debug, Clone)]
740pub struct TorConnector {
741 peers: BTreeMap<PeerId, SafeUrl>,
742 api_secret: Option<String>,
743}
744
745#[cfg(all(feature = "tor", not(target_family = "wasm")))]
746impl TorConnector {
747 pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
748 Self { peers, api_secret }
749 }
750}
751
752#[cfg(all(feature = "tor", not(target_family = "wasm")))]
753#[async_trait]
754impl IClientConnector for TorConnector {
755 fn peers(&self) -> BTreeSet<PeerId> {
756 self.peers.keys().copied().collect()
757 }
758
759 #[allow(clippy::too_many_lines)]
760 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
761 let api_endpoint = self
762 .peers
763 .get(&peer_id)
764 .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
765
766 let tor_config = TorClientConfig::default();
767 let tor_client = TorClient::create_bootstrapped(tor_config)
768 .await
769 .map_err(|err| PeerError::InternalClientError(err.into()))?
770 .isolated_client();
771
772 debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
773
774 let addr = (
777 api_endpoint
778 .host_str()
779 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
780 api_endpoint
781 .port_or_known_default()
782 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
783 );
784 let tor_addr = TorAddr::from(addr).map_err(|e| {
785 PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
786 })?;
787
788 let tor_addr_clone = tor_addr.clone();
789
790 debug!(
791 ?tor_addr,
792 ?addr,
793 "Successfully created `TorAddr` for given address (i.e. host and port)"
794 );
795
796 let anonymized_stream = if api_endpoint.is_onion_address() {
799 let mut stream_prefs = arti_client::StreamPrefs::default();
800 stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
801
802 let anonymized_stream = tor_client
803 .connect_with_prefs(tor_addr, &stream_prefs)
804 .await
805 .map_err(|e| PeerError::Connection(e.into()))?;
806
807 debug!(
808 ?tor_addr_clone,
809 "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
810 );
811 anonymized_stream
812 } else {
813 let anonymized_stream = tor_client
814 .connect(tor_addr)
815 .await
816 .map_err(|e| PeerError::Connection(e.into()))?;
817
818 debug!(
819 ?tor_addr_clone,
820 "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
821 );
822 anonymized_stream
823 };
824
825 let is_tls = match api_endpoint.scheme() {
826 "wss" => true,
827 "ws" => false,
828 unexpected_scheme => {
829 return Err(PeerError::InvalidEndpoint(anyhow!(
830 "Unsupported scheme: {unexpected_scheme}"
831 )));
832 }
833 };
834
835 let tls_connector = if is_tls {
836 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
837 let mut root_certs = RootCertStore::empty();
838 root_certs.extend(webpki_roots);
839
840 let tls_config = TlsClientConfig::builder()
841 .with_root_certificates(root_certs)
842 .with_no_client_auth();
843 let tls_connector = TlsConnector::from(Arc::new(tls_config));
844 Some(tls_connector)
845 } else {
846 None
847 };
848
849 let mut ws_client_builder =
850 WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
851
852 if let Some(api_secret) = &self.api_secret {
853 let mut headers = HeaderMap::new();
856
857 let auth =
858 base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
859
860 headers.insert(
861 "Authorization",
862 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
863 );
864
865 ws_client_builder = ws_client_builder.set_headers(headers);
866 }
867
868 match tls_connector {
869 None => {
870 let client = ws_client_builder
871 .build_with_stream(api_endpoint.as_str(), anonymized_stream)
872 .await
873 .map_err(|e| PeerError::Connection(e.into()))?;
874
875 Ok(client.into_dyn())
876 }
877 Some(tls_connector) => {
878 let host = api_endpoint
879 .host_str()
880 .map(ToOwned::to_owned)
881 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
882
883 let server_name = rustls_pki_types::ServerName::try_from(host)
886 .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
887
888 let anonymized_tls_stream = tls_connector
889 .connect(server_name, anonymized_stream)
890 .await
891 .map_err(|e| PeerError::Connection(e.into()))?;
892
893 let client = ws_client_builder
894 .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
895 .await
896 .map_err(|e| PeerError::Connection(e.into()))?;
897
898 Ok(client.into_dyn())
899 }
900 }
901 }
902}
903
904fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
905 match jsonrpc_error {
906 JsonRpcClientError::Call(error_object) => {
907 let error = anyhow!(error_object.message().to_owned());
908 match ErrorCode::from(error_object.code()) {
909 ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
910 PeerError::InvalidRequest(error)
911 }
912 ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
913 ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
914 ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
915 PeerError::ServerError(error)
916 }
917 }
918 }
919 JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
920 JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
921 JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
922 JsonRpcClientError::InvalidSubscriptionId => todo!(),
923 JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
924 PeerError::InvalidRequest(anyhow!(invalid_request_id))
925 }
926 JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
927 JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
928 JsonRpcClientError::HttpNotImplemented => {
929 PeerError::ServerError(anyhow!("Http not implemented"))
930 }
931 JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
932 PeerError::InvalidRequest(anyhow!(empty_batch_request))
933 }
934 JsonRpcClientError::RegisterMethod(register_method_error) => {
935 PeerError::InvalidResponse(anyhow!(register_method_error))
936 }
937 }
938}
939
940#[async_trait]
941impl IClientConnection for WsClient {
942 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
943 let method = match method {
944 ApiMethod::Core(method) => method,
945 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
946 };
947
948 Ok(ClientT::request(self, &method, [request.to_json()])
949 .await
950 .map_err(jsonrpc_error_to_peer_error)?)
951 }
952
953 async fn await_disconnection(&self) {
954 self.on_disconnect().await;
955 }
956}
957
958pub type DynClientConnector = Arc<dyn IClientConnector>;
959
960#[async_trait]
963pub trait IClientConnector: Send + Sync + 'static {
964 fn peers(&self) -> BTreeSet<PeerId>;
965
966 async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
967
968 fn into_dyn(self) -> DynClientConnector
969 where
970 Self: Sized,
971 {
972 Arc::new(self)
973 }
974}
975
976pub type DynClientConnection = Arc<dyn IClientConnection>;
977
978#[async_trait]
979pub trait IClientConnection: Debug + Send + Sync + 'static {
980 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
981
982 async fn await_disconnection(&self);
983
984 fn into_dyn(self) -> DynClientConnection
985 where
986 Self: Sized,
987 {
988 Arc::new(self)
989 }
990}
991
992#[derive(Clone, Debug)]
993pub struct ReconnectFederationApi {
994 peers: BTreeSet<PeerId>,
995 admin_id: Option<PeerId>,
996 module_id: Option<ModuleInstanceId>,
997 connections: ReconnectClientConnections,
998}
999
1000impl ReconnectFederationApi {
1001 fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
1002 Self {
1003 peers: connector.peers(),
1004 admin_id,
1005 module_id: None,
1006 connections: ReconnectClientConnections::new(connector),
1007 }
1008 }
1009
1010 pub async fn new_admin(
1011 peer: PeerId,
1012 url: SafeUrl,
1013 api_secret: &Option<String>,
1014 ) -> anyhow::Result<Self> {
1015 Self::from_endpoints(once((peer, url)), api_secret, Some(peer)).await
1016 }
1017
1018 pub async fn from_endpoints(
1019 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1020 api_secret: &Option<String>,
1021 admin_id: Option<PeerId>,
1022 ) -> anyhow::Result<Self> {
1023 let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1024
1025 let scheme = peers
1026 .values()
1027 .next()
1028 .expect("Federation api has been initialized with no peers")
1029 .scheme();
1030
1031 let connector = match scheme {
1032 "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1033 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1034 "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1035 #[cfg(all(feature = "iroh", not(target_family = "wasm")))]
1036 "iroh" => iroh::IrohConnector::new(peers).await?.into_dyn(),
1037 scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1038 };
1039
1040 Ok(ReconnectFederationApi::new(&connector, admin_id))
1041 }
1042}
1043
1044impl IModuleFederationApi for ReconnectFederationApi {}
1045
1046#[apply(async_trait_maybe_send!)]
1047impl IRawFederationApi for ReconnectFederationApi {
1048 fn all_peers(&self) -> &BTreeSet<PeerId> {
1049 &self.peers
1050 }
1051
1052 fn self_peer(&self) -> Option<PeerId> {
1053 self.admin_id
1054 }
1055
1056 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1057 ReconnectFederationApi {
1058 peers: self.peers.clone(),
1059 admin_id: self.admin_id,
1060 module_id: Some(id),
1061 connections: self.connections.clone(),
1062 }
1063 .into()
1064 }
1065
1066 async fn request_raw(
1067 &self,
1068 peer_id: PeerId,
1069 method: &str,
1070 params: &ApiRequestErased,
1071 ) -> PeerResult<Value> {
1072 let method = match self.module_id {
1073 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1074 None => ApiMethod::Core(method.to_string()),
1075 };
1076
1077 self.connections
1078 .request(peer_id, method, params.clone())
1079 .await
1080 }
1081}
1082
1083#[derive(Clone, Debug)]
1084pub struct ReconnectClientConnections {
1085 connections: BTreeMap<PeerId, ClientConnection>,
1086}
1087
1088impl ReconnectClientConnections {
1089 pub fn new(connector: &DynClientConnector) -> Self {
1090 ReconnectClientConnections {
1091 connections: connector
1092 .peers()
1093 .into_iter()
1094 .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1095 .collect(),
1096 }
1097 }
1098
1099 async fn request(
1100 &self,
1101 peer: PeerId,
1102 method: ApiMethod,
1103 request: ApiRequestErased,
1104 ) -> PeerResult<Value> {
1105 trace!(target: LOG_NET_API, %method, "Api request");
1106 let res = self
1107 .connections
1108 .get(&peer)
1109 .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1110 .connection()
1111 .await
1112 .context("Failed to connect to peer")
1113 .map_err(PeerError::Connection)?
1114 .request(method.clone(), request)
1115 .await;
1116
1117 trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1118
1119 res
1120 }
1121}
1122
1123#[derive(Clone, Debug)]
1124struct ClientConnection {
1125 sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1126}
1127
1128impl ClientConnection {
1129 fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1130 let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1131
1132 fedimint_core::task::spawn(
1133 "peer-api-connection",
1134 async move {
1135 let mut backoff = api_networking_backoff();
1136
1137 while let Ok(sender) = receiver.recv().await {
1138 let mut senders = vec![sender];
1139
1140 while let Ok(sender) = receiver.try_recv() {
1143 senders.push(sender);
1144 }
1145
1146 match connector.connect(peer).await {
1147 Ok(connection) => {
1148 trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1149
1150 for sender in senders {
1151 sender.send(connection.clone()).ok();
1152 }
1153
1154 loop {
1155 tokio::select! {
1156 sender = receiver.recv() => {
1157 match sender.ok() {
1158 Some(sender) => sender.send(connection.clone()).ok(),
1159 None => break,
1160 };
1161 }
1162 () = connection.await_disconnection() => break,
1163 }
1164 }
1165
1166 trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1167
1168 backoff = api_networking_backoff();
1169 }
1170 Err(e) => {
1171 trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1172
1173 fedimint_core::task::sleep(
1174 backoff.next().expect("No limit to the number of retries"),
1175 )
1176 .await;
1177 }
1178 }
1179 }
1180
1181 trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1182 }
1183 .instrument(trace_span!("peer-api-connection", ?peer)),
1184 );
1185
1186 ClientConnection { sender }
1187 }
1188
1189 async fn connection(&self) -> Option<DynClientConnection> {
1190 let (sender, receiver) = oneshot::channel();
1191
1192 self.sender
1193 .send(sender)
1194 .await
1195 .expect("Api connection request channel closed unexpectedly");
1196
1197 receiver.await.ok()
1198 }
1199}
1200
1201#[cfg(feature = "iroh")]
1202mod iroh {
1203 use std::collections::{BTreeMap, BTreeSet};
1204 use std::str::FromStr;
1205
1206 use anyhow::Context;
1207 use async_trait::async_trait;
1208 use fedimint_core::PeerId;
1209 use fedimint_core::envs::parse_kv_list_from_env;
1210 use fedimint_core::module::{
1211 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
1212 };
1213 use fedimint_core::util::SafeUrl;
1214 use fedimint_logging::LOG_NET_IROH;
1215 use iroh::endpoint::Connection;
1216 use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
1217 use iroh_base::ticket::NodeTicket;
1218 use serde_json::Value;
1219 use tracing::{trace, warn};
1220
1221 use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
1222
1223 #[derive(Debug, Clone)]
1224 pub struct IrohConnector {
1225 node_ids: BTreeMap<PeerId, NodeId>,
1226 endpoint: Endpoint,
1227
1228 pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
1234 }
1235
1236 impl IrohConnector {
1237 pub async fn new(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
1238 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
1239 warn!(target: LOG_NET_IROH, "Iroh support is experimental");
1240 let mut s = Self::new_no_overrides(peers).await?;
1241
1242 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
1243 s = s.with_connection_override(k, v.into());
1244 }
1245
1246 Ok(s)
1247 }
1248
1249 pub async fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
1250 let node_ids = peers
1251 .into_iter()
1252 .map(|(peer, url)| {
1253 let host = url.host_str().context("Url is missing host")?;
1254
1255 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
1256
1257 Ok((peer, node_id))
1258 })
1259 .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
1260
1261 Ok(Self {
1262 node_ids,
1263 endpoint: Endpoint::builder()
1264 .discovery_n0()
1265 .discovery_dht()
1266 .bind()
1267 .await?,
1268 connection_overrides: BTreeMap::new(),
1269 })
1270 }
1271
1272 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
1273 self.connection_overrides.insert(node, addr);
1274 self
1275 }
1276 }
1277
1278 #[async_trait]
1279 impl IClientConnector for IrohConnector {
1280 fn peers(&self) -> BTreeSet<PeerId> {
1281 self.node_ids.keys().copied().collect()
1282 }
1283
1284 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
1285 let node_id = *self
1286 .node_ids
1287 .get(&peer_id)
1288 .ok_or(PeerError::InvalidPeerId { peer_id })?;
1289
1290 let connection = match self.connection_overrides.get(&node_id) {
1291 Some(node_addr) => {
1292 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
1293 self.endpoint
1294 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
1295 .await
1296 }
1297 None => self.endpoint.connect(node_id, FEDIMINT_API_ALPN).await,
1298 }.map_err(PeerError::Connection)?;
1299
1300 Ok(connection.into_dyn())
1301 }
1302 }
1303
1304 #[async_trait]
1305 impl IClientConnection for Connection {
1306 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
1307 let json = serde_json::to_vec(&IrohApiRequest { method, request })
1308 .expect("Serialization to vec can't fail");
1309
1310 let (mut sink, mut stream) = self
1311 .open_bi()
1312 .await
1313 .map_err(|e| PeerError::Transport(e.into()))?;
1314
1315 sink.write_all(&json)
1316 .await
1317 .map_err(|e| PeerError::Transport(e.into()))?;
1318
1319 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
1320
1321 let response = stream
1322 .read_to_end(1_000_000)
1323 .await
1324 .map_err(|e| PeerError::Transport(e.into()))?;
1325
1326 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
1328 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
1329
1330 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
1331 }
1332
1333 async fn await_disconnection(&self) {
1334 self.closed().await;
1335 }
1336 }
1337}
1338
1339#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1341pub struct FederationStatus {
1342 pub session_count: u64,
1343 pub status_by_peer: HashMap<PeerId, PeerStatus>,
1344 pub peers_online: u64,
1345 pub peers_offline: u64,
1346 pub peers_flagged: u64,
1349 pub scheduled_shutdown: Option<u64>,
1350}
1351
1352#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1353pub struct PeerStatus {
1354 pub last_contribution: Option<u64>,
1355 pub connection_status: P2PConnectionStatus,
1356 pub flagged: bool,
1359}
1360
1361#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1362#[serde(rename_all = "snake_case")]
1363pub enum P2PConnectionStatus {
1364 #[default]
1365 Disconnected,
1366 Connected,
1367}
1368
1369#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1370pub struct StatusResponse {
1371 pub server: ServerStatusLegacy,
1372 pub federation: Option<FederationStatus>,
1373}
1374
1375#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1378pub struct GuardianConfigBackup {
1379 #[serde(with = "fedimint_core::hex::serde")]
1380 pub tar_archive_bytes: Vec<u8>,
1381}
1382
1383#[cfg(test)]
1384mod tests {
1385 use std::str::FromStr as _;
1386
1387 use fedimint_core::config::FederationId;
1388 use fedimint_core::invite_code::InviteCode;
1389
1390 use super::*;
1391
1392 #[test]
1393 fn converts_invite_code() {
1394 let connect = InviteCode::new(
1395 "ws://test1".parse().unwrap(),
1396 PeerId::from(1),
1397 FederationId::dummy(),
1398 Some("api_secret".into()),
1399 );
1400
1401 let bech32 = connect.to_string();
1402 let connect_parsed = InviteCode::from_str(&bech32).expect("parses");
1403 assert_eq!(connect, connect_parsed);
1404
1405 let json = serde_json::to_string(&connect).unwrap();
1406 let connect_as_string: String = serde_json::from_str(&json).unwrap();
1407 assert_eq!(connect_as_string, bech32);
1408 let connect_parsed_json: InviteCode = serde_json::from_str(&json).unwrap();
1409 assert_eq!(connect_parsed_json, connect_parsed);
1410 }
1411
1412 #[test]
1413 fn creates_essential_guardians_invite_code() {
1414 let mut peer_to_url_map = BTreeMap::new();
1415 peer_to_url_map.insert(PeerId::from(0), "ws://test1".parse().expect("URL fail"));
1416 peer_to_url_map.insert(PeerId::from(1), "ws://test2".parse().expect("URL fail"));
1417 peer_to_url_map.insert(PeerId::from(2), "ws://test3".parse().expect("URL fail"));
1418 peer_to_url_map.insert(PeerId::from(3), "ws://test4".parse().expect("URL fail"));
1419 let max_size = peer_to_url_map.to_num_peers().max_evil() + 1;
1420
1421 let code =
1422 InviteCode::new_with_essential_num_guardians(&peer_to_url_map, FederationId::dummy());
1423
1424 assert_eq!(FederationId::dummy(), code.federation_id());
1425
1426 let expected_map: BTreeMap<PeerId, SafeUrl> =
1427 peer_to_url_map.into_iter().take(max_size).collect();
1428 assert_eq!(expected_map, code.peers());
1429 }
1430}