fedimint_client_module/
api.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::string::ToString;
3
4use fedimint_api_client::api::{DynModuleApi, IRawFederationApi, ServerResult};
5use fedimint_core::core::ModuleInstanceId;
6use fedimint_core::db::{Database, DatabaseTransaction};
7use fedimint_core::module::ApiRequestErased;
8use fedimint_core::task::{MaybeSend, MaybeSync};
9use fedimint_core::{PeerId, apply, async_trait_maybe_send};
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use tokio::sync::watch;
13
14/// Event log event right before making an api call
15///
16/// Notably there is no guarantee that a corresponding [`ApiCallDone`]
17/// is ever called, or that the api call actually reached the server.
18#[derive(Serialize, Deserialize, Debug, Clone)]
19pub struct ApiCallStarted {
20    method: String,
21    peer_id: PeerId,
22}
23
24impl Event for ApiCallStarted {
25    const MODULE: Option<fedimint_core::core::ModuleKind> = None;
26    const KIND: EventKind = EventKind::from_static("api-call-started");
27    /// These were deemed heavy volume enough and mostly diagnostics, so they
28    /// are not persisted
29    const PERSISTENCE: EventPersistence = EventPersistence::Transient;
30}
31
32/// Event log event right after an api call
33///
34/// Notably there is no guarantee this event is always created. If the
35/// client completed the call, but was abruptly terminated before logging
36/// an event, the call might have completed on the server side, but never
37/// create this event.
38#[derive(Serialize, Deserialize, Debug, Clone)]
39pub struct ApiCallDone {
40    method: String,
41    peer_id: PeerId,
42    duration_ms: u64,
43    success: bool,
44    #[serde(skip_serializing_if = "Option::is_none")]
45    error_str: Option<String>,
46}
47
48impl Event for ApiCallDone {
49    const MODULE: Option<fedimint_core::core::ModuleKind> = None;
50    const KIND: EventKind = EventKind::from_static("api-call-done");
51    const PERSISTENCE: EventPersistence = EventPersistence::Transient;
52}
53
54use fedimint_eventlog::{DBTransactionEventLogExt as _, Event, EventKind, EventPersistence};
55use futures::stream::BoxStream;
56
57/// Convenience extension trait used for wrapping [`IRawFederationApi`] in
58/// a [`ClientRawFederationApi`]
59pub trait ClientRawFederationApiExt
60where
61    Self: Sized,
62{
63    fn with_client_ext(
64        self,
65        db: Database,
66        log_ordering_wakeup_tx: watch::Sender<()>,
67    ) -> ClientRawFederationApi<Self>;
68}
69
70impl<T> ClientRawFederationApiExt for T
71where
72    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
73{
74    fn with_client_ext(
75        self,
76        db: Database,
77        log_ordering_wakeup_tx: watch::Sender<()>,
78    ) -> ClientRawFederationApi<T> {
79        db.ensure_global().expect("Must be given global db");
80        ClientRawFederationApi {
81            inner: self,
82            db,
83            log_ordering_wakeup_tx,
84        }
85    }
86}
87
88/// A wrapper over [`IRawFederationApi`] adding client side event logging
89///
90/// Create using [`ClientRawFederationApiExt::with_client_ext`]
91#[derive(Debug)]
92pub struct ClientRawFederationApi<I> {
93    inner: I,
94    db: Database,
95    log_ordering_wakeup_tx: watch::Sender<()>,
96}
97
98impl<I> ClientRawFederationApi<I> {
99    pub async fn log_event<E>(&self, event: E)
100    where
101        E: Event + Send,
102    {
103        let mut dbtx = self.db.begin_transaction().await;
104        self.log_event_dbtx(&mut dbtx, event).await;
105        dbtx.commit_tx().await;
106    }
107
108    pub async fn log_event_dbtx<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
109    where
110        E: Event + Send,
111        Cap: Send,
112    {
113        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), None, event)
114            .await;
115    }
116}
117
118#[apply(async_trait_maybe_send!)]
119impl<I> IRawFederationApi for ClientRawFederationApi<I>
120where
121    I: IRawFederationApi,
122{
123    fn all_peers(&self) -> &BTreeSet<PeerId> {
124        self.inner.all_peers()
125    }
126
127    fn self_peer(&self) -> Option<PeerId> {
128        self.inner.self_peer()
129    }
130
131    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
132        self.inner.with_module(id)
133    }
134
135    async fn request_raw(
136        &self,
137        peer_id: PeerId,
138        method: &str,
139        params: &ApiRequestErased,
140    ) -> ServerResult<Value> {
141        self.log_event(ApiCallStarted {
142            method: method.to_string(),
143            peer_id,
144        })
145        .await;
146
147        let start = fedimint_core::time::now();
148        let res = self.inner.request_raw(peer_id, method, params).await;
149        let end = fedimint_core::time::now();
150
151        self.log_event(ApiCallDone {
152            method: method.to_string(),
153            peer_id,
154            duration_ms: end
155                .duration_since(start)
156                .unwrap_or_default()
157                .as_millis()
158                .try_into()
159                .unwrap_or(u64::MAX),
160            success: res.is_ok(),
161            error_str: res.as_ref().err().map(ToString::to_string),
162        })
163        .await;
164
165        res
166    }
167
168    fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>> {
169        self.inner.connection_status_stream()
170    }
171
172    async fn wait_for_initialized_connections(&self) {
173        self.inner.wait_for_initialized_connections().await;
174    }
175}