Skip to main content

fedimint_connectors/
ws.rs

1use std::sync::Arc;
2
3#[allow(unused)]
4use anyhow::anyhow;
5use async_trait::async_trait;
6use fedimint_core::module::{ApiMethod, ApiRequestErased};
7#[cfg(not(target_family = "wasm"))]
8use fedimint_core::rustls::install_crypto_provider;
9use fedimint_core::util::SafeUrl;
10use fedimint_core::{apply, async_trait_maybe_send};
11use fedimint_logging::LOG_NET_WS;
12use jsonrpsee_core::client::ClientT;
13pub use jsonrpsee_core::client::Error as JsonRpcClientError;
14use jsonrpsee_types::ErrorCode;
15#[cfg(target_family = "wasm")]
16use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
17#[allow(unused)]
18#[cfg(not(target_family = "wasm"))]
19use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
20use serde_json::Value;
21use tracing::trace;
22pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
23
24use super::Connector;
25use crate::{
26    Connectivity, DynGatewayConnection, DynGuaridianConnection, IConnection, IGuardianConnection,
27    ServerError, ServerResult,
28};
29
30#[derive(Debug, Clone)]
31pub struct WebsocketConnector {}
32
33impl WebsocketConnector {
34    pub fn new() -> Self {
35        Self {}
36    }
37
38    async fn make_new_connection(
39        &self,
40        url: &SafeUrl,
41        api_secret: Option<&str>,
42    ) -> ServerResult<Arc<WsClient>> {
43        trace!(target: LOG_NET_WS, %url, "Creating new websocket connection");
44
45        #[cfg(not(target_family = "wasm"))]
46        let mut client = {
47            use jsonrpsee_ws_client::{CustomCertStore, WsClientBuilder};
48            use tokio_rustls::rustls::RootCertStore;
49
50            install_crypto_provider().await;
51            let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
52            let mut root_certs = RootCertStore::empty();
53            root_certs.extend(webpki_roots);
54
55            let tls_cfg = CustomCertStore::builder()
56                .with_root_certificates(root_certs)
57                .with_no_client_auth();
58
59            WsClientBuilder::default()
60                .max_concurrent_requests(u16::MAX as usize)
61                .with_custom_cert_store(tls_cfg)
62        };
63
64        #[cfg(target_family = "wasm")]
65        let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
66
67        if let Some(api_secret) = api_secret {
68            #[cfg(not(target_family = "wasm"))]
69            {
70                // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
71                // but we can set up the headers manually
72
73                use base64::Engine as _;
74                use jsonrpsee_ws_client::{HeaderMap, HeaderValue};
75                let mut headers = HeaderMap::new();
76
77                let auth = base64::engine::general_purpose::STANDARD
78                    .encode(format!("fedimint:{api_secret}"));
79
80                headers.insert(
81                    "Authorization",
82                    HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
83                );
84
85                client = client.set_headers(headers);
86            }
87            #[cfg(target_family = "wasm")]
88            {
89                // on wasm, url will be handled by the browser, which should take care of
90                // `user:pass@...`
91                let mut url = url.clone();
92                url.set_username("fedimint")
93                    .map_err(|_| ServerError::InvalidEndpoint(anyhow!("invalid username")))?;
94                url.set_password(Some(&api_secret))
95                    .map_err(|_| ServerError::InvalidEndpoint(anyhow!("invalid secret")))?;
96
97                let client = client
98                    .build(url.as_str())
99                    .await
100                    .map_err(|err| ServerError::InternalClientError(err.into()))?;
101
102                return Ok(Arc::new(client));
103            }
104        }
105
106        let client = client
107            .build(url.as_str())
108            .await
109            .map_err(|err| ServerError::InternalClientError(err.into()))?;
110
111        Ok(Arc::new(client))
112    }
113}
114
115impl Default for WebsocketConnector {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121#[async_trait::async_trait]
122impl Connector for WebsocketConnector {
123    async fn connect_guardian(
124        &self,
125        url: &SafeUrl,
126        api_secret: Option<&str>,
127    ) -> ServerResult<DynGuaridianConnection> {
128        let client = self.make_new_connection(url, api_secret).await?;
129        Ok(client.into_dyn())
130    }
131
132    async fn connect_gateway(&self, _url: &SafeUrl) -> anyhow::Result<DynGatewayConnection> {
133        Err(anyhow!("Unsupported transport method"))
134    }
135
136    fn connectivity(&self, _url: &SafeUrl) -> Connectivity {
137        Connectivity::Direct
138    }
139}
140
141#[apply(async_trait_maybe_send!)]
142impl IConnection for WsClient {
143    async fn await_disconnection(&self) {
144        self.on_disconnect().await;
145    }
146
147    fn is_connected(&self) -> bool {
148        WsClient::is_connected(self)
149    }
150}
151
152#[async_trait]
153impl IGuardianConnection for WsClient {
154    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
155        let method = match method {
156            ApiMethod::Core(method) => method,
157            ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
158        };
159
160        Ok(ClientT::request(self, &method, [request.to_json()])
161            .await
162            .map_err(jsonrpc_error_to_peer_error)?)
163    }
164}
165
166#[apply(async_trait_maybe_send!)]
167impl IConnection for Arc<WsClient> {
168    async fn await_disconnection(&self) {
169        self.on_disconnect().await;
170    }
171
172    fn is_connected(&self) -> bool {
173        WsClient::is_connected(self)
174    }
175}
176
177#[async_trait]
178impl IGuardianConnection for Arc<WsClient> {
179    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> ServerResult<Value> {
180        let method = match method {
181            ApiMethod::Core(method) => method,
182            ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
183        };
184
185        Ok(
186            ClientT::request(self.as_ref(), &method, [request.to_json()])
187                .await
188                .map_err(jsonrpc_error_to_peer_error)?,
189        )
190    }
191}
192
193fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> ServerError {
194    match jsonrpc_error {
195        JsonRpcClientError::Call(error_object) => {
196            let error = anyhow!(error_object.message().to_owned());
197            match ErrorCode::from(error_object.code()) {
198                ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
199                    ServerError::InvalidRequest(error)
200                }
201                ErrorCode::MethodNotFound => ServerError::InvalidRpcId(error),
202                ErrorCode::InvalidParams => ServerError::InvalidRequest(error),
203                ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
204                    ServerError::ServerError(error)
205                }
206            }
207        }
208        JsonRpcClientError::Transport(error) => ServerError::Transport(anyhow!(error)),
209        JsonRpcClientError::RestartNeeded(arc) => ServerError::Transport(anyhow!(arc)),
210        JsonRpcClientError::ParseError(error) => ServerError::InvalidResponse(anyhow!(error)),
211        JsonRpcClientError::InvalidSubscriptionId => {
212            ServerError::Transport(anyhow!("Invalid subscription id"))
213        }
214        JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
215            ServerError::InvalidRequest(anyhow!(invalid_request_id))
216        }
217        JsonRpcClientError::RequestTimeout => ServerError::Transport(anyhow!("Request timeout")),
218        JsonRpcClientError::Custom(e) => ServerError::Transport(anyhow!(e)),
219        JsonRpcClientError::HttpNotImplemented => {
220            ServerError::ServerError(anyhow!("Http not implemented"))
221        }
222        JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
223            ServerError::InvalidRequest(anyhow!(empty_batch_request))
224        }
225        JsonRpcClientError::RegisterMethod(register_method_error) => {
226            ServerError::InvalidResponse(anyhow!(register_method_error))
227        }
228    }
229}