fedimint_connectors/
ws.rs

1use std::sync::Arc;
2
3#[allow(unused)]
4use anyhow::anyhow;
5use async_trait::async_trait;
6use fedimint_core::module::{ApiMethod, ApiRequestErased};
7#[cfg(not(target_family = "wasm"))]
8use fedimint_core::rustls::install_crypto_provider;
9use fedimint_core::util::SafeUrl;
10use fedimint_core::{apply, async_trait_maybe_send};
11use fedimint_logging::LOG_NET_WS;
12use jsonrpsee_core::client::ClientT;
13pub use jsonrpsee_core::client::Error as JsonRpcClientError;
14use jsonrpsee_types::ErrorCode;
15#[cfg(target_family = "wasm")]
16use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
17#[allow(unused)]
18#[cfg(not(target_family = "wasm"))]
19use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
20use serde_json::Value;
21use tracing::trace;
22pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
23
24use super::Connector;
25use crate::{
26    DynGatewayConnection, DynGuaridianConnection, IConnection, IGuardianConnection, ServerError,
27    ServerResult,
28};
29
30#[derive(Debug, Clone)]
31pub struct WebsocketConnector {}
32
33impl WebsocketConnector {
34    pub fn new() -> Self {
35        Self {}
36    }
37
38    async fn make_new_connection(
39        &self,
40        url: &SafeUrl,
41        api_secret: Option<&str>,
42    ) -> ServerResult<Arc<WsClient>> {
43        trace!(target: LOG_NET_WS, %url, "Creating new websocket connection");
44
45        #[cfg(not(target_family = "wasm"))]
46        let mut client = {
47            use jsonrpsee_ws_client::{CustomCertStore, WsClientBuilder};
48            use tokio_rustls::rustls::RootCertStore;
49
50            install_crypto_provider().await;
51            let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
52            let mut root_certs = RootCertStore::empty();
53            root_certs.extend(webpki_roots);
54
55            let tls_cfg = CustomCertStore::builder()
56                .with_root_certificates(root_certs)
57                .with_no_client_auth();
58
59            WsClientBuilder::default()
60                .max_concurrent_requests(u16::MAX as usize)
61                .with_custom_cert_store(tls_cfg)
62        };
63
64        #[cfg(target_family = "wasm")]
65        let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
66
67        if let Some(api_secret) = api_secret {
68            #[cfg(not(target_family = "wasm"))]
69            {
70                // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
71                // but we can set up the headers manually
72
73                use base64::Engine as _;
74                use jsonrpsee_ws_client::{HeaderMap, HeaderValue};
75                let mut headers = HeaderMap::new();
76
77                let auth = base64::engine::general_purpose::STANDARD
78                    .encode(format!("fedimint:{api_secret}"));
79
80                headers.insert(
81                    "Authorization",
82                    HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
83                );
84
85                client = client.set_headers(headers);
86            }
87            #[cfg(target_family = "wasm")]
88            {
89                // on wasm, url will be handled by the browser, which should take care of
90                // `user:pass@...`
91                let mut url = url.clone();
92                url.set_username("fedimint")
93                    .map_err(|_| ServerError::InvalidEndpoint(anyhow!("invalid username")))?;
94                url.set_password(Some(&api_secret))
95                    .map_err(|_| ServerError::InvalidEndpoint(anyhow!("invalid secret")))?;
96
97                let client = client
98                    .build(url.as_str())
99                    .await
100                    .map_err(|err| ServerError::InternalClientError(err.into()))?;
101
102                return Ok(Arc::new(client));
103            }
104        }
105
106        let client = client
107            .build(url.as_str())
108            .await
109            .map_err(|err| ServerError::InternalClientError(err.into()))?;
110
111        Ok(Arc::new(client))
112    }
113}
114
115impl Default for WebsocketConnector {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121#[async_trait::async_trait]
122impl Connector for WebsocketConnector {
123    async fn connect_guardian(
124        &self,
125        url: &SafeUrl,
126        api_secret: Option<&str>,
127    ) -> ServerResult<DynGuaridianConnection> {
128        let client = self.make_new_connection(url, api_secret).await?;
129        Ok(client.into_dyn())
130    }
131
132    async fn connect_gateway(&self, _url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
133        Err(anyhow!("Unsupported transport method"))
134    }
135}
136
137#[apply(async_trait_maybe_send!)]
138impl IConnection for WsClient {
139    async fn await_disconnection(&self) {
140        self.on_disconnect().await;
141    }
142
143    fn is_connected(&self) -> bool {
144        WsClient::is_connected(self)
145    }
146}
147
148#[async_trait]
149impl IGuardianConnection for WsClient {
150    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
151        let method = match method {
152            ApiMethod::Core(method) => method,
153            ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
154        };
155
156        Ok(ClientT::request(self, &method, [request.to_json()])
157            .await
158            .map_err(jsonrpc_error_to_peer_error)?)
159    }
160}
161
162#[apply(async_trait_maybe_send!)]
163impl IConnection for Arc<WsClient> {
164    async fn await_disconnection(&self) {
165        self.on_disconnect().await;
166    }
167
168    fn is_connected(&self) -> bool {
169        WsClient::is_connected(self)
170    }
171}
172
173#[async_trait]
174impl IGuardianConnection for Arc<WsClient> {
175    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
176        let method = match method {
177            ApiMethod::Core(method) => method,
178            ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
179        };
180
181        Ok(
182            ClientT::request(self.as_ref(), &method, [request.to_json()])
183                .await
184                .map_err(jsonrpc_error_to_peer_error)?,
185        )
186    }
187}
188
189fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> ServerError {
190    match jsonrpc_error {
191        JsonRpcClientError::Call(error_object) => {
192            let error = anyhow!(error_object.message().to_owned());
193            match ErrorCode::from(error_object.code()) {
194                ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
195                    ServerError::InvalidRequest(error)
196                }
197                ErrorCode::MethodNotFound => ServerError::InvalidRpcId(error),
198                ErrorCode::InvalidParams => ServerError::InvalidRequest(error),
199                ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
200                    ServerError::ServerError(error)
201                }
202            }
203        }
204        JsonRpcClientError::Transport(error) => ServerError::Transport(anyhow!(error)),
205        JsonRpcClientError::RestartNeeded(arc) => ServerError::Transport(anyhow!(arc)),
206        JsonRpcClientError::ParseError(error) => ServerError::InvalidResponse(anyhow!(error)),
207        JsonRpcClientError::InvalidSubscriptionId => {
208            ServerError::Transport(anyhow!("Invalid subscription id"))
209        }
210        JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
211            ServerError::InvalidRequest(anyhow!(invalid_request_id))
212        }
213        JsonRpcClientError::RequestTimeout => ServerError::Transport(anyhow!("Request timeout")),
214        JsonRpcClientError::Custom(e) => ServerError::Transport(anyhow!(e)),
215        JsonRpcClientError::HttpNotImplemented => {
216            ServerError::ServerError(anyhow!("Http not implemented"))
217        }
218        JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
219            ServerError::InvalidRequest(anyhow!(empty_batch_request))
220        }
221        JsonRpcClientError::RegisterMethod(register_method_error) => {
222            ServerError::InvalidResponse(anyhow!(register_method_error))
223        }
224    }
225}