fedimint_connectors/
ws.rs1use std::sync::Arc;
2
3#[allow(unused)]
4use anyhow::anyhow;
5use async_trait::async_trait;
6use fedimint_core::module::{ApiMethod, ApiRequestErased};
7#[cfg(not(target_family = "wasm"))]
8use fedimint_core::rustls::install_crypto_provider;
9use fedimint_core::util::SafeUrl;
10use fedimint_core::{apply, async_trait_maybe_send};
11use fedimint_logging::LOG_NET_WS;
12use jsonrpsee_core::client::ClientT;
13pub use jsonrpsee_core::client::Error as JsonRpcClientError;
14use jsonrpsee_types::ErrorCode;
15#[cfg(target_family = "wasm")]
16use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
17#[allow(unused)]
18#[cfg(not(target_family = "wasm"))]
19use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
20use serde_json::Value;
21use tracing::trace;
22pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
23
24use super::Connector;
25use crate::{
26 Connectivity, DynGatewayConnection, DynGuaridianConnection, IConnection, IGuardianConnection,
27 ServerError, ServerResult,
28};
29
30#[derive(Debug, Clone)]
31pub struct WebsocketConnector {}
32
33impl WebsocketConnector {
34 pub fn new() -> Self {
35 Self {}
36 }
37
38 async fn make_new_connection(
39 &self,
40 url: &SafeUrl,
41 api_secret: Option<&str>,
42 ) -> ServerResult<Arc<WsClient>> {
43 trace!(target: LOG_NET_WS, %url, "Creating new websocket connection");
44
45 #[cfg(not(target_family = "wasm"))]
46 let mut client = {
47 use jsonrpsee_ws_client::{CustomCertStore, WsClientBuilder};
48 use tokio_rustls::rustls::RootCertStore;
49
50 install_crypto_provider().await;
51 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
52 let mut root_certs = RootCertStore::empty();
53 root_certs.extend(webpki_roots);
54
55 let tls_cfg = CustomCertStore::builder()
56 .with_root_certificates(root_certs)
57 .with_no_client_auth();
58
59 WsClientBuilder::default()
60 .max_concurrent_requests(u16::MAX as usize)
61 .with_custom_cert_store(tls_cfg)
62 };
63
64 #[cfg(target_family = "wasm")]
65 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
66
67 if let Some(api_secret) = api_secret {
68 #[cfg(not(target_family = "wasm"))]
69 {
70 use base64::Engine as _;
74 use jsonrpsee_ws_client::{HeaderMap, HeaderValue};
75 let mut headers = HeaderMap::new();
76
77 let auth = base64::engine::general_purpose::STANDARD
78 .encode(format!("fedimint:{api_secret}"));
79
80 headers.insert(
81 "Authorization",
82 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
83 );
84
85 client = client.set_headers(headers);
86 }
87 #[cfg(target_family = "wasm")]
88 {
89 let mut url = url.clone();
92 url.set_username("fedimint")
93 .map_err(|_| ServerError::InvalidEndpoint(anyhow!("invalid username")))?;
94 url.set_password(Some(&api_secret))
95 .map_err(|_| ServerError::InvalidEndpoint(anyhow!("invalid secret")))?;
96
97 let client = client
98 .build(url.as_str())
99 .await
100 .map_err(|err| ServerError::InternalClientError(err.into()))?;
101
102 return Ok(Arc::new(client));
103 }
104 }
105
106 let client = client
107 .build(url.as_str())
108 .await
109 .map_err(|err| ServerError::InternalClientError(err.into()))?;
110
111 Ok(Arc::new(client))
112 }
113}
114
115impl Default for WebsocketConnector {
116 fn default() -> Self {
117 Self::new()
118 }
119}
120
121#[async_trait::async_trait]
122impl Connector for WebsocketConnector {
123 async fn connect_guardian(
124 &self,
125 url: &SafeUrl,
126 api_secret: Option<&str>,
127 ) -> ServerResult<DynGuaridianConnection> {
128 let client = self.make_new_connection(url, api_secret).await?;
129 Ok(client.into_dyn())
130 }
131
132 async fn connect_gateway(&self, _url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
133 Err(anyhow!("Unsupported transport method"))
134 }
135
136 fn connectivity(&self, _url: &SafeUrl) -> Connectivity {
137 Connectivity::Direct
138 }
139}
140
141#[apply(async_trait_maybe_send!)]
142impl IConnection for WsClient {
143 async fn await_disconnection(&self) {
144 self.on_disconnect().await;
145 }
146
147 fn is_connected(&self) -> bool {
148 WsClient::is_connected(self)
149 }
150}
151
152#[async_trait]
153impl IGuardianConnection for WsClient {
154 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
155 let method = match method {
156 ApiMethod::Core(method) => method,
157 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
158 };
159
160 Ok(ClientT::request(self, &method, [request.to_json()])
161 .await
162 .map_err(jsonrpc_error_to_peer_error)?)
163 }
164}
165
166#[apply(async_trait_maybe_send!)]
167impl IConnection for Arc<WsClient> {
168 async fn await_disconnection(&self) {
169 self.on_disconnect().await;
170 }
171
172 fn is_connected(&self) -> bool {
173 WsClient::is_connected(self)
174 }
175}
176
177#[async_trait]
178impl IGuardianConnection for Arc<WsClient> {
179 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
180 let method = match method {
181 ApiMethod::Core(method) => method,
182 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
183 };
184
185 Ok(
186 ClientT::request(self.as_ref(), &method, [request.to_json()])
187 .await
188 .map_err(jsonrpc_error_to_peer_error)?,
189 )
190 }
191}
192
193fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> ServerError {
194 match jsonrpc_error {
195 JsonRpcClientError::Call(error_object) => {
196 let error = anyhow!(error_object.message().to_owned());
197 match ErrorCode::from(error_object.code()) {
198 ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
199 ServerError::InvalidRequest(error)
200 }
201 ErrorCode::MethodNotFound => ServerError::InvalidRpcId(error),
202 ErrorCode::InvalidParams => ServerError::InvalidRequest(error),
203 ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
204 ServerError::ServerError(error)
205 }
206 }
207 }
208 JsonRpcClientError::Transport(error) => ServerError::Transport(anyhow!(error)),
209 JsonRpcClientError::RestartNeeded(arc) => ServerError::Transport(anyhow!(arc)),
210 JsonRpcClientError::ParseError(error) => ServerError::InvalidResponse(anyhow!(error)),
211 JsonRpcClientError::InvalidSubscriptionId => {
212 ServerError::Transport(anyhow!("Invalid subscription id"))
213 }
214 JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
215 ServerError::InvalidRequest(anyhow!(invalid_request_id))
216 }
217 JsonRpcClientError::RequestTimeout => ServerError::Transport(anyhow!("Request timeout")),
218 JsonRpcClientError::Custom(e) => ServerError::Transport(anyhow!(e)),
219 JsonRpcClientError::HttpNotImplemented => {
220 ServerError::ServerError(anyhow!("Http not implemented"))
221 }
222 JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
223 ServerError::InvalidRequest(anyhow!(empty_batch_request))
224 }
225 JsonRpcClientError::RegisterMethod(register_method_error) => {
226 ServerError::InvalidResponse(anyhow!(register_method_error))
227 }
228 }
229}