Skip to main content

fedimint_client_module/
api.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::string::ToString;
3
4use fedimint_api_client::api::{DynModuleApi, IRawFederationApi, ServerResult};
5use fedimint_connectors::DynGuaridianConnection;
6use fedimint_core::core::ModuleInstanceId;
7use fedimint_core::db::{Database, DatabaseTransaction};
8use fedimint_core::module::ApiRequestErased;
9use fedimint_core::task::{MaybeSend, MaybeSync};
10use fedimint_core::{PeerId, apply, async_trait_maybe_send};
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use tokio::sync::watch;
14
15/// Event log event right before making an api call
16///
17/// Notably there is no guarantee that a corresponding [`ApiCallDone`]
18/// is ever called, or that the api call actually reached the server.
19#[derive(Serialize, Deserialize, Debug, Clone)]
20pub struct ApiCallStarted {
21    method: String,
22    peer_id: PeerId,
23}
24
25impl Event for ApiCallStarted {
26    const MODULE: Option<fedimint_core::core::ModuleKind> = None;
27    const KIND: EventKind = EventKind::from_static("api-call-started");
28    /// These were deemed heavy volume enough and mostly diagnostics, so they
29    /// are not persisted
30    const PERSISTENCE: EventPersistence = EventPersistence::Transient;
31}
32
33/// Event log event right after an api call
34///
35/// Notably there is no guarantee this event is always created. If the
36/// client completed the call, but was abruptly terminated before logging
37/// an event, the call might have completed on the server side, but never
38/// create this event.
39#[derive(Serialize, Deserialize, Debug, Clone)]
40pub struct ApiCallDone {
41    method: String,
42    peer_id: PeerId,
43    duration_ms: u64,
44    success: bool,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    error_str: Option<String>,
47}
48
49impl Event for ApiCallDone {
50    const MODULE: Option<fedimint_core::core::ModuleKind> = None;
51    const KIND: EventKind = EventKind::from_static("api-call-done");
52    const PERSISTENCE: EventPersistence = EventPersistence::Transient;
53}
54
55use fedimint_eventlog::{DBTransactionEventLogExt as _, Event, EventKind, EventPersistence};
56use futures::stream::BoxStream;
57
58/// Convenience extension trait used for wrapping [`IRawFederationApi`] in
59/// a [`ClientRawFederationApi`]
60pub trait ClientRawFederationApiExt
61where
62    Self: Sized,
63{
64    fn with_client_ext(
65        self,
66        db: Database,
67        log_ordering_wakeup_tx: watch::Sender<()>,
68    ) -> ClientRawFederationApi<Self>;
69}
70
71impl<T> ClientRawFederationApiExt for T
72where
73    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
74{
75    fn with_client_ext(
76        self,
77        db: Database,
78        log_ordering_wakeup_tx: watch::Sender<()>,
79    ) -> ClientRawFederationApi<T> {
80        db.ensure_global().expect("Must be given global db");
81        ClientRawFederationApi {
82            inner: self,
83            db,
84            log_ordering_wakeup_tx,
85        }
86    }
87}
88
89/// A wrapper over [`IRawFederationApi`] adding client side event logging
90///
91/// Create using [`ClientRawFederationApiExt::with_client_ext`]
92#[derive(Debug)]
93pub struct ClientRawFederationApi<I> {
94    inner: I,
95    db: Database,
96    log_ordering_wakeup_tx: watch::Sender<()>,
97}
98
99impl<I> ClientRawFederationApi<I> {
100    pub async fn log_event<E>(&self, event: E)
101    where
102        E: Event + Send,
103    {
104        let mut dbtx = self.db.begin_transaction().await;
105        self.log_event_dbtx(&mut dbtx, event).await;
106        dbtx.commit_tx().await;
107    }
108
109    pub async fn log_event_dbtx<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
110    where
111        E: Event + Send,
112        Cap: Send,
113    {
114        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), None, event)
115            .await;
116    }
117}
118
119#[apply(async_trait_maybe_send!)]
120impl<I> IRawFederationApi for ClientRawFederationApi<I>
121where
122    I: IRawFederationApi,
123{
124    fn all_peers(&self) -> &BTreeSet<PeerId> {
125        self.inner.all_peers()
126    }
127
128    fn self_peer(&self) -> Option<PeerId> {
129        self.inner.self_peer()
130    }
131
132    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
133        self.inner.with_module(id)
134    }
135
136    async fn request_raw(
137        &self,
138        peer_id: PeerId,
139        method: &str,
140        params: &ApiRequestErased,
141    ) -> ServerResult<Value> {
142        self.log_event(ApiCallStarted {
143            method: method.to_string(),
144            peer_id,
145        })
146        .await;
147
148        let start = fedimint_core::time::now();
149        let res = self.inner.request_raw(peer_id, method, params).await;
150        let end = fedimint_core::time::now();
151
152        self.log_event(ApiCallDone {
153            method: method.to_string(),
154            peer_id,
155            duration_ms: end
156                .duration_since(start)
157                .unwrap_or_default()
158                .as_millis()
159                .try_into()
160                .unwrap_or(u64::MAX),
161            success: res.is_ok(),
162            error_str: res.as_ref().err().map(ToString::to_string),
163        })
164        .await;
165
166        res
167    }
168
169    fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>> {
170        self.inner.connection_status_stream()
171    }
172
173    async fn wait_for_initialized_connections(&self) {
174        self.inner.wait_for_initialized_connections().await;
175    }
176
177    async fn get_peer_connection(&self, peer_id: PeerId) -> ServerResult<DynGuaridianConnection> {
178        self.inner.get_peer_connection(peer_id).await
179    }
180}