use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt::Debug;
use std::pin::Pin;
use std::result;
use std::sync::Arc;
use anyhow::anyhow;
#[cfg(all(feature = "tor", not(target_family = "wasm")))]
use arti_client::{TorAddr, TorClient, TorClientConfig};
use base64::Engine as _;
use bitcoin::hashes::sha256;
use bitcoin::secp256k1;
pub use error::{FederationError, OutputOutcomeError, PeerError};
use fedimint_core::admin_client::{
ConfigGenConnectionsRequest, ConfigGenParamsRequest, ConfigGenParamsResponse, PeerServerParams,
ServerStatus,
};
use fedimint_core::backup::ClientBackupSnapshot;
use fedimint_core::core::backup::SignedBackupRequest;
use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
use fedimint_core::encoding::{Decodable, Encodable};
use fedimint_core::invite_code::InviteCode;
use fedimint_core::module::audit::AuditSummary;
use fedimint_core::module::registry::ModuleDecoderRegistry;
use fedimint_core::module::{ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding};
use fedimint_core::net::api_announcement::SignedApiAnnouncement;
use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
use fedimint_core::task::{MaybeSend, MaybeSync};
use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
use fedimint_core::util::backoff_util::api_networking_backoff;
use fedimint_core::util::SafeUrl;
use fedimint_core::{
apply, async_trait_maybe_send, dyn_newtype_define, util, NumPeersExt, PeerId, TransactionId,
};
use fedimint_logging::LOG_CLIENT_NET_API;
use futures::future::pending;
use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt};
use itertools::Itertools;
use jsonrpsee_core::client::ClientT;
pub use jsonrpsee_core::client::Error as JsonRpcClientError;
use jsonrpsee_core::DeserializeOwned;
#[cfg(target_family = "wasm")]
use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
#[cfg(not(target_family = "wasm"))]
use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
#[cfg(not(target_family = "wasm"))]
use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
use net::Connector;
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[cfg(not(target_family = "wasm"))]
use tokio_rustls::rustls::RootCertStore;
#[cfg(all(feature = "tor", not(target_family = "wasm")))]
use tokio_rustls::{rustls::ClientConfig as TlsClientConfig, TlsConnector};
use tracing::{debug, instrument, trace, warn};
use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
mod error;
mod global_api;
pub mod net;
mod peer;
pub use global_api::{GlobalFederationApiWithCache, GlobalFederationApiWithCacheExt};
use peer::FederationPeer;
pub type PeerResult<T> = Result<T, PeerError>;
pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
pub type FederationResult<T> = Result<T, FederationError>;
pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
pub struct ApiVersionSet {
pub core: ApiVersion,
pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
}
#[apply(async_trait_maybe_send!)]
pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
fn all_peers(&self) -> &BTreeSet<PeerId>;
fn self_peer(&self) -> Option<PeerId>;
fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
async fn request_raw(
&self,
peer_id: PeerId,
method: &str,
params: &[Value],
) -> result::Result<Value, JsonRpcClientError>;
}
#[apply(async_trait_maybe_send!)]
pub trait FederationApiExt: IRawFederationApi {
async fn request_single_peer<Ret>(
&self,
method: String,
params: ApiRequestErased,
peer: PeerId,
) -> PeerResult<Ret>
where
Ret: DeserializeOwned,
{
self.request_raw(peer, &method, &[params.to_json()])
.await
.map_err(PeerError::Rpc)
.and_then(|v| {
serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
})
}
async fn request_single_peer_federation<FedRet>(
&self,
method: String,
params: ApiRequestErased,
peer_id: PeerId,
) -> FederationResult<FedRet>
where
FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
{
self.request_raw(peer_id, &method, &[params.to_json()])
.await
.map_err(PeerError::Rpc)
.and_then(|v| {
serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
})
.map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
}
#[instrument(target = "fm::api", skip_all, fields(method=method))]
async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
&self,
mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
method: String,
params: ApiRequestErased,
) -> FederationResult<FR> {
#[cfg(not(target_family = "wasm"))]
let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
#[cfg(target_family = "wasm")]
let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
for peer in self.all_peers() {
futures.push(Box::pin({
let method = &method;
let params = ¶ms;
async move {
let result = self
.request_single_peer(method.clone(), params.clone(), *peer)
.await;
(*peer, result)
}
}));
}
let mut peer_errors = BTreeMap::new();
let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
loop {
let (peer, result) = futures
.next()
.await
.expect("Query strategy ran out of peers to query without returning a result");
match result {
Ok(response) => match strategy.process(peer, response) {
QueryStep::Retry(peers) => {
for peer in peers {
futures.push(Box::pin({
let method = &method;
let params = ¶ms;
async move {
let result = self
.request_single_peer(method.clone(), params.clone(), peer)
.await;
(peer, result)
}
}));
}
}
QueryStep::Success(response) => return Ok(response),
QueryStep::Failure(e) => {
peer_errors.insert(peer, PeerError::InvalidResponse(e.to_string()));
}
QueryStep::Continue => {}
},
Err(e) => {
e.report_if_important(peer);
peer_errors.insert(peer, e);
}
}
if peer_errors.len() == peer_error_threshold {
return Err(FederationError::peer_errors(
method.clone(),
params.params.clone(),
peer_errors,
));
}
}
}
async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
&self,
mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
method: String,
params: ApiRequestErased,
) -> FR {
#[cfg(not(target_family = "wasm"))]
let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
#[cfg(target_family = "wasm")]
let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
for peer in self.all_peers() {
futures.push(Box::pin({
let method = &method;
let params = ¶ms;
async move {
let response = util::retry(
"api-request-{method}-{peer}",
api_networking_backoff(),
|| async {
self.request_single_peer(method.clone(), params.clone(), *peer)
.await
.inspect_err(|e| e.report_if_important(*peer))
.map_err(|e| anyhow!(e.to_string()))
},
)
.await
.expect("Number of retries has no limit");
(*peer, response)
}
}));
}
loop {
let (peer, response) = match futures.next().await {
Some(t) => t,
None => pending().await,
};
match strategy.process(peer, response) {
QueryStep::Retry(peers) => {
for peer in peers {
futures.push(Box::pin({
let method = &method;
let params = ¶ms;
async move {
let response = util::retry(
"api-request-{method}-{peer}",
api_networking_backoff(),
|| async {
self.request_single_peer(
method.clone(),
params.clone(),
peer,
)
.await
.inspect_err(|e| e.report_if_important(peer))
.map_err(|e| anyhow!(e.to_string()))
},
)
.await
.expect("Number of retries has no limit");
(peer, response)
}
}));
}
}
QueryStep::Success(response) => return response,
QueryStep::Failure(e) => {
warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
}
QueryStep::Continue => {}
}
}
}
async fn request_current_consensus<Ret>(
&self,
method: String,
params: ApiRequestErased,
) -> FederationResult<Ret>
where
Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
{
self.request_with_strategy(
ThresholdConsensus::new(self.all_peers().to_num_peers()),
method,
params,
)
.await
}
async fn request_current_consensus_retry<Ret>(
&self,
method: String,
params: ApiRequestErased,
) -> Ret
where
Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
{
self.request_with_strategy_retry(
ThresholdConsensus::new(self.all_peers().to_num_peers()),
method,
params,
)
.await
}
async fn request_admin<Ret>(
&self,
method: &str,
params: ApiRequestErased,
auth: ApiAuth,
) -> FederationResult<Ret>
where
Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
{
let Some(self_peer_id) = self.self_peer() else {
return Err(FederationError::general(
method,
params,
anyhow::format_err!("Admin peer_id not set"),
));
};
self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
.await
}
async fn request_admin_no_auth<Ret>(
&self,
method: &str,
params: ApiRequestErased,
) -> FederationResult<Ret>
where
Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
{
let Some(self_peer_id) = self.self_peer() else {
return Err(FederationError::general(
method,
params,
anyhow::format_err!("Admin peer_id not set"),
));
};
self.request_single_peer_federation(method.into(), params, self_peer_id)
.await
}
}
#[apply(async_trait_maybe_send!)]
impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
pub trait IModuleFederationApi: IRawFederationApi {}
dyn_newtype_define! {
#[derive(Clone)]
pub DynModuleApi(Arc<IModuleFederationApi>)
}
dyn_newtype_define! {
#[derive(Clone)]
pub DynGlobalApi(Arc<IGlobalFederationApi>)
}
impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
self.inner.as_ref()
}
}
impl DynGlobalApi {
pub fn new_admin(
peer: PeerId,
url: SafeUrl,
api_secret: &Option<String>,
connector: &Connector,
) -> DynGlobalApi {
GlobalFederationApiWithCache::new(
WsFederationApi::new(connector, vec![(peer, url)], api_secret).with_self_peer_id(peer),
)
.into()
}
pub fn from_pre_peer_id_admin_endpoint(url: SafeUrl, api_secret: &Option<String>) -> Self {
let peer_id = PeerId::from(1024);
GlobalFederationApiWithCache::new(
WsFederationApi::new(&Connector::default(), vec![(peer_id, url)], api_secret)
.with_self_peer_id(peer_id),
)
.into()
}
pub fn from_single_endpoint(
peer: PeerId,
url: SafeUrl,
api_secret: &Option<String>,
connector: &Connector,
) -> Self {
GlobalFederationApiWithCache::new(WsFederationApi::new(
connector,
vec![(peer, url)],
api_secret,
))
.into()
}
pub fn from_endpoints(
peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
api_secret: &Option<String>,
connector: &Connector,
) -> Self {
GlobalFederationApiWithCache::new(WsFederationApi::new(connector, peers, api_secret)).into()
}
pub fn from_invite_code(connector: &Connector, invite_code: &InviteCode) -> Self {
GlobalFederationApiWithCache::new(WsFederationApi::new(
connector,
invite_code.peers().into_iter().collect_vec(),
&invite_code.api_secret(),
))
.into()
}
}
#[apply(async_trait_maybe_send!)]
pub trait IGlobalFederationApi: IRawFederationApi {
async fn submit_transaction(
&self,
tx: Transaction,
) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
async fn await_block(
&self,
block_index: u64,
decoders: &ModuleDecoderRegistry,
) -> anyhow::Result<SessionOutcome>;
async fn get_session_status(
&self,
block_index: u64,
decoders: &ModuleDecoderRegistry,
) -> anyhow::Result<SessionStatus>;
async fn session_count(&self) -> FederationResult<u64>;
async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
async fn server_config_consensus_hash(&self) -> FederationResult<sha256::Hash>;
async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
async fn download_backup(
&self,
id: &secp256k1::PublicKey,
) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
async fn set_config_gen_connections(
&self,
info: ConfigGenConnectionsRequest,
auth: ApiAuth,
) -> FederationResult<()>;
async fn add_config_gen_peer(&self, peer: PeerServerParams) -> FederationResult<()>;
async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParams>>;
async fn get_default_config_gen_params(
&self,
auth: ApiAuth,
) -> FederationResult<ConfigGenParamsRequest>;
async fn set_config_gen_params(
&self,
requested: ConfigGenParamsRequest,
auth: ApiAuth,
) -> FederationResult<()>;
async fn consensus_config_gen_params(&self) -> FederationResult<ConfigGenParamsResponse>;
async fn run_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
async fn get_verify_config_hash(
&self,
auth: ApiAuth,
) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
async fn verified_configs(
&self,
auth: ApiAuth,
) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
async fn status(&self) -> FederationResult<StatusResponse>;
async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
async fn guardian_config_backup(&self, auth: ApiAuth)
-> FederationResult<GuardianConfigBackup>;
async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
async fn submit_api_announcement(
&self,
peer_id: PeerId,
announcement: SignedApiAnnouncement,
) -> FederationResult<()>;
async fn api_announcements(
&self,
guardian: PeerId,
) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
async fn sign_api_announcement(
&self,
api_url: SafeUrl,
auth: ApiAuth,
) -> FederationResult<SignedApiAnnouncement>;
async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
}
pub fn deserialize_outcome<R>(
outcome: &SerdeOutputOutcome,
module_decoder: &Decoder,
) -> OutputOutcomeResult<R>
where
R: OutputOutcome + MaybeSend,
{
let dyn_outcome = outcome
.try_into_inner_known_module_kind(module_decoder)
.map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
let source_instance = dyn_outcome.module_instance_id();
dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
let target_type = std::any::type_name::<R>();
OutputOutcomeError::ResponseDeserialization(anyhow!(
"Could not downcast output outcome with instance id {source_instance} to {target_type}"
))
})
}
#[derive(Debug, Clone)]
pub struct WsFederationApi<C = WsClient> {
peer_ids: BTreeSet<PeerId>,
self_peer_id: Option<PeerId>,
peers: Arc<Vec<FederationPeer<C>>>,
module_id: Option<ModuleInstanceId>,
}
impl<C: JsonRpcClient + Debug + 'static> IModuleFederationApi for WsFederationApi<C> {}
#[apply(async_trait_maybe_send!)]
impl<C: JsonRpcClient + Debug + 'static> IRawFederationApi for WsFederationApi<C> {
fn all_peers(&self) -> &BTreeSet<PeerId> {
&self.peer_ids
}
fn self_peer(&self) -> Option<PeerId> {
self.self_peer_id
}
fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
WsFederationApi {
peer_ids: self.peer_ids.clone(),
peers: self.peers.clone(),
module_id: Some(id),
self_peer_id: self.self_peer_id,
}
.into()
}
async fn request_raw(
&self,
peer_id: PeerId,
method: &str,
params: &[Value],
) -> JsonRpcResult<Value> {
let peer = self
.peers
.iter()
.find(|m| m.peer_id == peer_id)
.ok_or_else(|| JsonRpcClientError::Custom(format!("Invalid peer_id: {peer_id}")))?;
let method = match self.module_id {
None => method.to_string(),
Some(id) => format!("module_{id}_{method}"),
};
peer.request(&method, params).await
}
}
#[apply(async_trait_maybe_send!)]
pub trait JsonRpcClient: ClientT + Sized + MaybeSend + MaybeSync {
async fn connect(
url: &SafeUrl,
api_secret: Option<String>,
) -> result::Result<Self, JsonRpcClientError>;
#[cfg(all(feature = "tor", not(target_family = "wasm")))]
async fn connect_with_tor(
url: &SafeUrl,
api_secret: Option<String>,
) -> result::Result<Self, JsonRpcClientError>;
fn is_connected(&self) -> bool;
}
#[apply(async_trait_maybe_send!)]
impl JsonRpcClient for WsClient {
async fn connect(
url: &SafeUrl,
api_secret: Option<String>,
) -> result::Result<Self, JsonRpcClientError> {
#[cfg(not(target_family = "wasm"))]
let mut client = {
let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
let mut root_certs = RootCertStore::empty();
root_certs.extend(webpki_roots);
let tls_cfg = CustomCertStore::builder()
.with_root_certificates(root_certs)
.with_no_client_auth();
WsClientBuilder::default()
.max_concurrent_requests(u16::MAX as usize)
.with_custom_cert_store(tls_cfg)
};
#[cfg(target_family = "wasm")]
let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
if let Some(api_secret) = api_secret {
#[cfg(not(target_family = "wasm"))]
{
let mut headers = HeaderMap::new();
let auth = base64::engine::general_purpose::STANDARD
.encode(format!("fedimint:{api_secret}"));
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
);
client = client.set_headers(headers);
}
#[cfg(target_family = "wasm")]
{
let mut url = url.clone();
url.set_username("fedimint").map_err(|_| {
JsonRpcClientError::Transport(anyhow::format_err!("invalid username").into())
})?;
url.set_password(Some(&api_secret)).map_err(|_| {
JsonRpcClientError::Transport(anyhow::format_err!("invalid secret").into())
})?;
return client.build(url.as_str()).await;
}
}
client.build(url.as_str()).await
}
#[cfg(all(feature = "tor", not(target_family = "wasm")))]
async fn connect_with_tor(
url: &SafeUrl,
api_secret: Option<String>,
) -> result::Result<Self, JsonRpcClientError> {
let tor_config = TorClientConfig::default();
let tor_client = TorClient::create_bootstrapped(tor_config)
.await
.map_err(|e| JsonRpcClientError::Transport(e.into()))?
.isolated_client();
debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
let addr = (
url.host_str()
.expect("It should've asserted for `host` on construction"),
url.port_or_known_default()
.expect("It should've asserted for `port`, or used a default one, on construction"),
);
let tor_addr = TorAddr::from(addr).map_err(|e| JsonRpcClientError::Transport(e.into()))?;
let tor_addr_clone = tor_addr.clone();
debug!(
?tor_addr,
?addr,
"Successfully created `TorAddr` for given address (i.e. host and port)"
);
let anonymized_stream = if url.is_onion_address() {
let mut stream_prefs = arti_client::StreamPrefs::default();
stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
let anonymized_stream = tor_client
.connect_with_prefs(tor_addr, &stream_prefs)
.await
.map_err(|e| JsonRpcClientError::Transport(e.into()))?;
debug!(
?tor_addr_clone,
"Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
);
anonymized_stream
} else {
let anonymized_stream = tor_client
.connect(tor_addr)
.await
.map_err(|e| JsonRpcClientError::Transport(e.into()))?;
debug!(?tor_addr_clone, "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`");
anonymized_stream
};
let is_tls = match url.scheme() {
"wss" => true,
"ws" => false,
unexpected_scheme => {
let error =
format!("`{unexpected_scheme}` not supported, it's expected `ws` or `wss`!");
return Err(JsonRpcClientError::Transport(anyhow!(error).into()));
}
};
let tls_connector = if is_tls {
let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
let mut root_certs = RootCertStore::empty();
root_certs.extend(webpki_roots);
let tls_config = TlsClientConfig::builder()
.with_root_certificates(root_certs)
.with_no_client_auth();
let tls_connector = TlsConnector::from(Arc::new(tls_config));
Some(tls_connector)
} else {
None
};
let mut ws_client_builder =
WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
if let Some(api_secret) = api_secret {
let mut headers = HeaderMap::new();
let auth =
base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
);
ws_client_builder = ws_client_builder.set_headers(headers);
}
match tls_connector {
None => {
return ws_client_builder
.build_with_stream(url.as_str(), anonymized_stream)
.await;
}
Some(tls_connector) => {
let host = url.host_str().map(ToOwned::to_owned).ok_or_else(|| {
JsonRpcClientError::Transport(anyhow!("Invalid host!").into())
})?;
let server_name = rustls_pki_types::ServerName::try_from(host)
.map_err(|e| JsonRpcClientError::Transport(e.into()))?;
let anonymized_tls_stream = tls_connector
.connect(server_name, anonymized_stream)
.await
.map_err(|e| JsonRpcClientError::Transport(e.into()))?;
return ws_client_builder
.build_with_stream(url.as_str(), anonymized_tls_stream)
.await;
}
}
}
fn is_connected(&self) -> bool {
self.is_connected()
}
}
impl WsFederationApi<WsClient> {
pub fn new(
connector: &Connector,
peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
api_secret: &Option<String>,
) -> Self {
Self::new_with_client(connector, peers, None, api_secret)
}
pub fn new_admin(
peer: PeerId,
url: SafeUrl,
api_secret: &Option<String>,
connector: &Connector,
) -> Self {
WsFederationApi::new(connector, vec![(peer, url)], api_secret).with_self_peer_id(peer)
}
pub fn from_endpoints(
peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
api_secret: &Option<String>,
connector: &Connector,
) -> Self {
WsFederationApi::new(connector, peers, api_secret)
}
pub fn with_self_peer_id(self, self_peer_id: PeerId) -> Self {
Self {
self_peer_id: Some(self_peer_id),
..self
}
}
}
impl<C> WsFederationApi<C>
where
C: JsonRpcClient + 'static,
{
pub fn peers(&self) -> Vec<PeerId> {
self.peers.iter().map(|peer| peer.peer_id).collect()
}
pub fn new_with_client(
connector: &Connector,
peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
self_peer_id: Option<PeerId>,
api_secret: &Option<String>,
) -> Self {
let (peer_connections, peer_ids) = peers
.into_iter()
.map(|(peer_id, url)| {
assert!(
url.port_or_known_default().is_some(),
"API client requires a port"
);
assert!(url.host().is_some(), "API client requires a target host");
(
FederationPeer::new(*connector, url, peer_id, api_secret.clone()),
peer_id,
)
})
.unzip();
WsFederationApi {
peer_ids,
self_peer_id,
peers: Arc::new(peer_connections),
module_id: None,
}
}
}
impl<C> FederationPeer<C>
where
C: JsonRpcClient + 'static,
{
#[instrument(level = "trace", fields(peer = %self.peer_id, %method), skip_all)]
pub async fn request(&self, method: &str, params: &[Value]) -> JsonRpcResult<Value> {
const RETRIES: usize = 1;
for attempts in 0.. {
debug_assert!(attempts <= RETRIES);
let rclient = self.client.read().await;
match rclient.client.get_try().await {
Ok(client) if client.is_connected() => {
return client.request::<_, _>(method, params).await;
}
Err(e) => {
if RETRIES <= attempts {
return Err(JsonRpcClientError::Transport(e.into()));
}
debug!(target: LOG_CLIENT_NET_API, err=%e, "Triggering reconnection after connection error");
}
Ok(_client) => {
if RETRIES <= attempts {
return Err(JsonRpcClientError::Transport(
anyhow::format_err!("Disconnected").into(),
));
}
debug!(target: LOG_CLIENT_NET_API, "Triggering reconnection after disconnection");
}
};
drop(rclient);
let mut wclient = self.client.write().await;
match wclient.client.get_try().await {
Ok(client) if client.is_connected() => {
trace!(target: LOG_CLIENT_NET_API, "Some other request reconnected client, retrying");
}
_ => {
wclient.reconnect(
self.connector,
self.peer_id,
self.url.clone(),
self.api_secret.clone(),
);
}
}
}
unreachable!();
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct FederationStatus {
pub session_count: u64,
pub status_by_peer: HashMap<PeerId, PeerStatus>,
pub peers_online: u64,
pub peers_offline: u64,
pub peers_flagged: u64,
pub scheduled_shutdown: Option<u64>,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct PeerStatus {
pub last_contribution: Option<u64>,
pub connection_status: PeerConnectionStatus,
pub flagged: bool,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PeerConnectionStatus {
#[default]
Disconnected,
Connected,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct StatusResponse {
pub server: ServerStatus,
pub federation: Option<FederationStatus>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GuardianConfigBackup {
#[serde(with = "fedimint_core::hex::serde")]
pub tar_archive_bytes: Vec<u8>,
}
#[cfg(test)]
mod tests {
use std::fmt;
use std::str::FromStr as _;
use fedimint_core::config::FederationId;
use jsonrpsee_core::client::BatchResponse;
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
use super::*;
type Result<T = ()> = std::result::Result<T, JsonRpcClientError>;
#[apply(async_trait_maybe_send!)]
trait SimpleClient: Sized {
async fn connect() -> Result<Self>;
#[cfg(all(feature = "tor", not(target_family = "wasm")))]
async fn connect_with_tor() -> Result<Self>;
fn is_connected(&self) -> bool {
true
}
async fn request(&self, method: &str) -> Result<String>;
}
struct Client<C: SimpleClient>(C);
#[apply(async_trait_maybe_send!)]
impl<C: SimpleClient + MaybeSend + MaybeSync> JsonRpcClient for Client<C> {
async fn connect(_url: &SafeUrl, _api_secret: Option<String>) -> Result<Self> {
Ok(Self(C::connect().await?))
}
#[cfg(all(feature = "tor", not(target_family = "wasm")))]
async fn connect_with_tor(_url: &SafeUrl, _api_secret: Option<String>) -> Result<Self> {
Ok(Self(C::connect_with_tor().await?))
}
fn is_connected(&self) -> bool {
self.0.is_connected()
}
}
#[apply(async_trait_maybe_send!)]
impl<C: SimpleClient + MaybeSend + MaybeSync> ClientT for Client<C> {
async fn request<R, P>(&self, method: &str, _params: P) -> Result<R>
where
R: jsonrpsee_core::DeserializeOwned,
P: ToRpcParams + MaybeSend,
{
let json = self.0.request(method).await?;
Ok(serde_json::from_str(&json).unwrap())
}
async fn notification<P>(&self, _method: &str, _params: P) -> Result<()>
where
P: ToRpcParams + MaybeSend,
{
unimplemented!()
}
async fn batch_request<'a, R>(
&self,
_batch: BatchRequestBuilder<'a>,
) -> std::result::Result<BatchResponse<'a, R>, jsonrpsee_core::client::Error>
where
R: DeserializeOwned + fmt::Debug + 'a,
{
unimplemented!()
}
}
#[test]
fn converts_invite_code() {
let connect = InviteCode::new(
"ws://test1".parse().unwrap(),
PeerId::from(1),
FederationId::dummy(),
Some("api_secret".into()),
);
let bech32 = connect.to_string();
let connect_parsed = InviteCode::from_str(&bech32).expect("parses");
assert_eq!(connect, connect_parsed);
let json = serde_json::to_string(&connect).unwrap();
let connect_as_string: String = serde_json::from_str(&json).unwrap();
assert_eq!(connect_as_string, bech32);
let connect_parsed_json: InviteCode = serde_json::from_str(&json).unwrap();
assert_eq!(connect_parsed_json, connect_parsed);
}
#[test]
fn creates_essential_guardians_invite_code() {
let mut peer_to_url_map = BTreeMap::new();
peer_to_url_map.insert(PeerId::from(0), "ws://test1".parse().expect("URL fail"));
peer_to_url_map.insert(PeerId::from(1), "ws://test2".parse().expect("URL fail"));
peer_to_url_map.insert(PeerId::from(2), "ws://test3".parse().expect("URL fail"));
peer_to_url_map.insert(PeerId::from(3), "ws://test4".parse().expect("URL fail"));
let max_size = peer_to_url_map.to_num_peers().max_evil() + 1;
let code =
InviteCode::new_with_essential_num_guardians(&peer_to_url_map, FederationId::dummy());
assert_eq!(FederationId::dummy(), code.federation_id());
let expected_map: BTreeMap<PeerId, SafeUrl> =
peer_to_url_map.into_iter().take(max_size).collect();
assert_eq!(expected_map, code.peers());
}
}