fedimint_client_module/
api.rs1use std::collections::{BTreeMap, BTreeSet};
2use std::string::ToString;
3
4use fedimint_api_client::api::{DynModuleApi, IRawFederationApi, ServerResult};
5use fedimint_core::core::ModuleInstanceId;
6use fedimint_core::db::{Database, DatabaseTransaction};
7use fedimint_core::module::ApiRequestErased;
8use fedimint_core::task::{MaybeSend, MaybeSync};
9use fedimint_core::{PeerId, apply, async_trait_maybe_send};
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use tokio::sync::watch;
13
14#[derive(Serialize, Deserialize, Debug, Clone)]
19pub struct ApiCallStarted {
20 method: String,
21 peer_id: PeerId,
22}
23
24impl Event for ApiCallStarted {
25 const MODULE: Option<fedimint_core::core::ModuleKind> = None;
26 const KIND: EventKind = EventKind::from_static("api-call-started");
27 const PERSISTENCE: EventPersistence = EventPersistence::Transient;
30}
31
32#[derive(Serialize, Deserialize, Debug, Clone)]
39pub struct ApiCallDone {
40 method: String,
41 peer_id: PeerId,
42 duration_ms: u64,
43 success: bool,
44 #[serde(skip_serializing_if = "Option::is_none")]
45 error_str: Option<String>,
46}
47
48impl Event for ApiCallDone {
49 const MODULE: Option<fedimint_core::core::ModuleKind> = None;
50 const KIND: EventKind = EventKind::from_static("api-call-done");
51 const PERSISTENCE: EventPersistence = EventPersistence::Transient;
52}
53
54use fedimint_eventlog::{DBTransactionEventLogExt as _, Event, EventKind, EventPersistence};
55use futures::stream::BoxStream;
56
57pub trait ClientRawFederationApiExt
60where
61 Self: Sized,
62{
63 fn with_client_ext(
64 self,
65 db: Database,
66 log_ordering_wakeup_tx: watch::Sender<()>,
67 ) -> ClientRawFederationApi<Self>;
68}
69
70impl<T> ClientRawFederationApiExt for T
71where
72 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
73{
74 fn with_client_ext(
75 self,
76 db: Database,
77 log_ordering_wakeup_tx: watch::Sender<()>,
78 ) -> ClientRawFederationApi<T> {
79 db.ensure_global().expect("Must be given global db");
80 ClientRawFederationApi {
81 inner: self,
82 db,
83 log_ordering_wakeup_tx,
84 }
85 }
86}
87
88#[derive(Debug)]
92pub struct ClientRawFederationApi<I> {
93 inner: I,
94 db: Database,
95 log_ordering_wakeup_tx: watch::Sender<()>,
96}
97
98impl<I> ClientRawFederationApi<I> {
99 pub async fn log_event<E>(&self, event: E)
100 where
101 E: Event + Send,
102 {
103 let mut dbtx = self.db.begin_transaction().await;
104 self.log_event_dbtx(&mut dbtx, event).await;
105 dbtx.commit_tx().await;
106 }
107
108 pub async fn log_event_dbtx<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
109 where
110 E: Event + Send,
111 Cap: Send,
112 {
113 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), None, event)
114 .await;
115 }
116}
117
118#[apply(async_trait_maybe_send!)]
119impl<I> IRawFederationApi for ClientRawFederationApi<I>
120where
121 I: IRawFederationApi,
122{
123 fn all_peers(&self) -> &BTreeSet<PeerId> {
124 self.inner.all_peers()
125 }
126
127 fn self_peer(&self) -> Option<PeerId> {
128 self.inner.self_peer()
129 }
130
131 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
132 self.inner.with_module(id)
133 }
134
135 async fn request_raw(
136 &self,
137 peer_id: PeerId,
138 method: &str,
139 params: &ApiRequestErased,
140 ) -> ServerResult<Value> {
141 self.log_event(ApiCallStarted {
142 method: method.to_string(),
143 peer_id,
144 })
145 .await;
146
147 let start = fedimint_core::time::now();
148 let res = self.inner.request_raw(peer_id, method, params).await;
149 let end = fedimint_core::time::now();
150
151 self.log_event(ApiCallDone {
152 method: method.to_string(),
153 peer_id,
154 duration_ms: end
155 .duration_since(start)
156 .unwrap_or_default()
157 .as_millis()
158 .try_into()
159 .unwrap_or(u64::MAX),
160 success: res.is_ok(),
161 error_str: res.as_ref().err().map(ToString::to_string),
162 })
163 .await;
164
165 res
166 }
167
168 fn connection_status_stream(&self) -> BoxStream<'static, BTreeMap<PeerId, bool>> {
169 self.inner.connection_status_stream()
170 }
171
172 async fn wait_for_initialized_connections(&self) {
173 self.inner.wait_for_initialized_connections().await;
174 }
175}