fedimint_connectors/
ws.rs1use std::sync::Arc;
2
3#[allow(unused)]
4use anyhow::anyhow;
5use async_trait::async_trait;
6use fedimint_core::module::{ApiMethod, ApiRequestErased};
7#[cfg(not(target_family = "wasm"))]
8use fedimint_core::rustls::install_crypto_provider;
9use fedimint_core::util::SafeUrl;
10use fedimint_core::{apply, async_trait_maybe_send};
11use fedimint_logging::LOG_NET_WS;
12use jsonrpsee_core::client::ClientT;
13pub use jsonrpsee_core::client::Error as JsonRpcClientError;
14use jsonrpsee_types::ErrorCode;
15#[cfg(target_family = "wasm")]
16use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
17#[allow(unused)]
18#[cfg(not(target_family = "wasm"))]
19use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
20use serde_json::Value;
21use tracing::trace;
22pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
23
24use super::Connector;
25use crate::{
26 DynGatewayConnection, DynGuaridianConnection, IConnection, IGuardianConnection, ServerError,
27 ServerResult,
28};
29
30#[derive(Debug, Clone)]
31pub struct WebsocketConnector {}
32
33impl WebsocketConnector {
34 pub fn new() -> Self {
35 Self {}
36 }
37
38 async fn make_new_connection(
39 &self,
40 url: &SafeUrl,
41 api_secret: Option<&str>,
42 ) -> ServerResult<Arc<WsClient>> {
43 trace!(target: LOG_NET_WS, %url, "Creating new websocket connection");
44
45 #[cfg(not(target_family = "wasm"))]
46 let mut client = {
47 use jsonrpsee_ws_client::{CustomCertStore, WsClientBuilder};
48 use tokio_rustls::rustls::RootCertStore;
49
50 install_crypto_provider().await;
51 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
52 let mut root_certs = RootCertStore::empty();
53 root_certs.extend(webpki_roots);
54
55 let tls_cfg = CustomCertStore::builder()
56 .with_root_certificates(root_certs)
57 .with_no_client_auth();
58
59 WsClientBuilder::default()
60 .max_concurrent_requests(u16::MAX as usize)
61 .with_custom_cert_store(tls_cfg)
62 };
63
64 #[cfg(target_family = "wasm")]
65 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
66
67 if let Some(api_secret) = api_secret {
68 #[cfg(not(target_family = "wasm"))]
69 {
70 use base64::Engine as _;
74 use jsonrpsee_ws_client::{HeaderMap, HeaderValue};
75 let mut headers = HeaderMap::new();
76
77 let auth = base64::engine::general_purpose::STANDARD
78 .encode(format!("fedimint:{api_secret}"));
79
80 headers.insert(
81 "Authorization",
82 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
83 );
84
85 client = client.set_headers(headers);
86 }
87 #[cfg(target_family = "wasm")]
88 {
89 let mut url = url.clone();
92 url.set_username("fedimint")
93 .map_err(|_| ServerError::InvalidEndpoint(anyhow!("invalid username")))?;
94 url.set_password(Some(&api_secret))
95 .map_err(|_| ServerError::InvalidEndpoint(anyhow!("invalid secret")))?;
96
97 let client = client
98 .build(url.as_str())
99 .await
100 .map_err(|err| ServerError::InternalClientError(err.into()))?;
101
102 return Ok(Arc::new(client));
103 }
104 }
105
106 let client = client
107 .build(url.as_str())
108 .await
109 .map_err(|err| ServerError::InternalClientError(err.into()))?;
110
111 Ok(Arc::new(client))
112 }
113}
114
115impl Default for WebsocketConnector {
116 fn default() -> Self {
117 Self::new()
118 }
119}
120
121#[async_trait::async_trait]
122impl Connector for WebsocketConnector {
123 async fn connect_guardian(
124 &self,
125 url: &SafeUrl,
126 api_secret: Option<&str>,
127 ) -> ServerResult<DynGuaridianConnection> {
128 let client = self.make_new_connection(url, api_secret).await?;
129 Ok(client.into_dyn())
130 }
131
132 async fn connect_gateway(&self, _url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
133 Err(anyhow!("Unsupported transport method"))
134 }
135}
136
137#[apply(async_trait_maybe_send!)]
138impl IConnection for WsClient {
139 async fn await_disconnection(&self) {
140 self.on_disconnect().await;
141 }
142
143 fn is_connected(&self) -> bool {
144 WsClient::is_connected(self)
145 }
146}
147
148#[async_trait]
149impl IGuardianConnection for WsClient {
150 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
151 let method = match method {
152 ApiMethod::Core(method) => method,
153 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
154 };
155
156 Ok(ClientT::request(self, &method, [request.to_json()])
157 .await
158 .map_err(jsonrpc_error_to_peer_error)?)
159 }
160}
161
162#[apply(async_trait_maybe_send!)]
163impl IConnection for Arc<WsClient> {
164 async fn await_disconnection(&self) {
165 self.on_disconnect().await;
166 }
167
168 fn is_connected(&self) -> bool {
169 WsClient::is_connected(self)
170 }
171}
172
173#[async_trait]
174impl IGuardianConnection for Arc<WsClient> {
175 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
176 let method = match method {
177 ApiMethod::Core(method) => method,
178 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
179 };
180
181 Ok(
182 ClientT::request(self.as_ref(), &method, [request.to_json()])
183 .await
184 .map_err(jsonrpc_error_to_peer_error)?,
185 )
186 }
187}
188
189fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> ServerError {
190 match jsonrpc_error {
191 JsonRpcClientError::Call(error_object) => {
192 let error = anyhow!(error_object.message().to_owned());
193 match ErrorCode::from(error_object.code()) {
194 ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
195 ServerError::InvalidRequest(error)
196 }
197 ErrorCode::MethodNotFound => ServerError::InvalidRpcId(error),
198 ErrorCode::InvalidParams => ServerError::InvalidRequest(error),
199 ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
200 ServerError::ServerError(error)
201 }
202 }
203 }
204 JsonRpcClientError::Transport(error) => ServerError::Transport(anyhow!(error)),
205 JsonRpcClientError::RestartNeeded(arc) => ServerError::Transport(anyhow!(arc)),
206 JsonRpcClientError::ParseError(error) => ServerError::InvalidResponse(anyhow!(error)),
207 JsonRpcClientError::InvalidSubscriptionId => {
208 ServerError::Transport(anyhow!("Invalid subscription id"))
209 }
210 JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
211 ServerError::InvalidRequest(anyhow!(invalid_request_id))
212 }
213 JsonRpcClientError::RequestTimeout => ServerError::Transport(anyhow!("Request timeout")),
214 JsonRpcClientError::Custom(e) => ServerError::Transport(anyhow!(e)),
215 JsonRpcClientError::HttpNotImplemented => {
216 ServerError::ServerError(anyhow!("Http not implemented"))
217 }
218 JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
219 ServerError::InvalidRequest(anyhow!(empty_batch_request))
220 }
221 JsonRpcClientError::RegisterMethod(register_method_error) => {
222 ServerError::InvalidResponse(anyhow!(register_method_error))
223 }
224 }
225}