fedimint_server/metrics/
jsonrpsee.rs

1//! jsonrpsee/tower rpc layer that collects rpc stats
2//!
3//! Based on implementation of logger from:
4//!
5//! <https://github.com/paritytech/jsonrpsee/blob/bf5952fb663bdb8193b9f8a43182454c143b0e7d/server/src/middleware/rpc/layer/logger.rs#L1>
6
7use std::borrow::Cow;
8use std::pin::Pin;
9use std::task;
10use std::task::Poll;
11
12use fedimint_metrics::prometheus::HistogramTimer;
13use futures::Future;
14use jsonrpsee::MethodResponse;
15use jsonrpsee::server::middleware::rpc::RpcServiceT;
16use jsonrpsee::types::Request;
17use pin_project::pin_project;
18
19use super::{JSONRPC_API_REQUEST_DURATION_SECONDS, JSONRPC_API_REQUEST_RESPONSE_CODE};
20
21#[pin_project]
22pub struct ResponseFuture<F> {
23    #[pin]
24    method: String,
25    #[pin]
26    fut: F,
27    #[pin]
28    timer: Option<HistogramTimer>,
29}
30
31impl<F> std::fmt::Debug for ResponseFuture<F> {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        f.write_str("ResponseFuture")
34    }
35}
36
37impl<F: Future<Output = MethodResponse>> Future for ResponseFuture<F> {
38    type Output = F::Output;
39
40    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
41        let mut projected = self.project();
42        let res = projected.fut.poll(cx);
43        if let Poll::Ready(res) = &res {
44            if let Some(timer) = projected.timer.take() {
45                timer.observe_duration();
46
47                JSONRPC_API_REQUEST_RESPONSE_CODE
48                    .with_label_values(&[
49                        &projected.method,
50                        &if let Some(code) = res.as_error_code() {
51                            Cow::Owned(code.to_string())
52                        } else {
53                            Cow::Borrowed("0")
54                        },
55                        if res.is_subscription() {
56                            "subscription"
57                        } else if res.is_batch() {
58                            "batch"
59                        } else {
60                            "default"
61                        },
62                    ])
63                    .inc();
64            }
65        }
66        res
67    }
68}
69
70#[derive(Copy, Clone, Debug)]
71pub struct MetricsLayer;
72
73impl<S> tower::Layer<S> for MetricsLayer {
74    type Service = MetricsService<S>;
75
76    fn layer(&self, service: S) -> Self::Service {
77        MetricsService { service }
78    }
79}
80
81pub struct MetricsService<S> {
82    pub(crate) service: S,
83}
84
85impl<'a, S> RpcServiceT<'a> for MetricsService<S>
86where
87    S: RpcServiceT<'a> + Send + Sync,
88{
89    type Future = ResponseFuture<S::Future>;
90
91    fn call(&self, req: Request<'a>) -> Self::Future {
92        let timer = JSONRPC_API_REQUEST_DURATION_SECONDS
93            .with_label_values(&[req.method_name()])
94            .start_timer();
95
96        ResponseFuture {
97            method: req.method.to_string(),
98            fut: self.service.call(req),
99            timer: Some(timer),
100        }
101    }
102}