fedimint_client_module/
oplog.rs1use std::fmt::Debug;
2use std::future;
3use std::time::SystemTime;
4
5use fedimint_core::core::OperationId;
6use fedimint_core::db::{Database, DatabaseTransaction};
7use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
8use fedimint_core::module::registry::ModuleDecoderRegistry;
9use fedimint_core::task::{MaybeSend, MaybeSync};
10use fedimint_core::util::BoxStream;
11use fedimint_core::{apply, async_trait_maybe_send};
12use futures::{StreamExt, stream};
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(transparent)]
19pub struct JsonStringed(pub serde_json::Value);
20
21impl Encodable for JsonStringed {
22 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
23 let json_str = serde_json::to_string(&self.0).expect("JSON serialization should not fail");
24 json_str.consensus_encode(writer)
25 }
26}
27
28impl Decodable for JsonStringed {
29 fn consensus_decode_partial<R: std::io::Read>(
30 r: &mut R,
31 modules: &ModuleDecoderRegistry,
32 ) -> Result<Self, DecodeError> {
33 let json_str = String::consensus_decode_partial(r, modules)?;
34 let value = serde_json::from_str(&json_str).map_err(DecodeError::from_err)?;
35 Ok(JsonStringed(value))
36 }
37}
38
39#[apply(async_trait_maybe_send!)]
40pub trait IOperationLog {
41 async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry>;
42
43 async fn get_operation_dbtx(
44 &self,
45 dbtx: &mut DatabaseTransaction<'_>,
46 operation_id: OperationId,
47 ) -> Option<OperationLogEntry>;
48
49 async fn add_operation_log_entry_dbtx(
50 &self,
51 dbtx: &mut DatabaseTransaction<'_>,
52 operation_id: OperationId,
53 operation_type: &str,
54 operation_meta: serde_json::Value,
55 );
56
57 fn outcome_or_updates(
58 &self,
59 db: &Database,
60 operation_id: OperationId,
61 operation_log_entry: OperationLogEntry,
62 stream_gen: Box<dyn FnOnce() -> BoxStream<'static, serde_json::Value>>,
63 ) -> UpdateStreamOrOutcome<serde_json::Value>;
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable, PartialEq, Eq)]
69pub struct OperationOutcome {
70 pub time: SystemTime,
71 pub outcome: JsonStringed,
72}
73
74#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
94pub struct OperationLogEntry {
95 pub(crate) operation_module_kind: String,
96 pub(crate) meta: JsonStringed,
97 pub(crate) outcome: Option<OperationOutcome>,
99}
100
101impl OperationLogEntry {
102 pub fn new(
103 operation_module_kind: String,
104 meta: JsonStringed,
105 outcome: Option<OperationOutcome>,
106 ) -> Self {
107 Self {
108 operation_module_kind,
109 meta,
110 outcome,
111 }
112 }
113
114 pub fn operation_module_kind(&self) -> &str {
116 &self.operation_module_kind
117 }
118
119 pub fn meta<M: DeserializeOwned>(&self) -> M {
125 self.try_meta()
126 .expect("JSON deserialization should not fail")
127 }
128
129 pub fn try_meta<M: DeserializeOwned>(&self) -> Result<M, serde_json::Error> {
133 serde_json::from_value(self.meta.0.clone())
134 }
135
136 pub fn outcome<D: DeserializeOwned>(&self) -> Option<D> {
154 self.try_outcome()
155 .expect("JSON deserialization should not fail")
156 }
157
158 pub fn try_outcome<D: DeserializeOwned>(&self) -> Result<Option<D>, serde_json::Error> {
162 self.outcome
163 .as_ref()
164 .map(|outcome| serde_json::from_value(outcome.outcome.0.clone()))
165 .transpose()
166 }
167
168 pub fn outcome_time(&self) -> Option<SystemTime> {
170 self.outcome.as_ref().map(|o| o.time)
171 }
172
173 pub fn set_outcome(&mut self, outcome: impl Into<Option<OperationOutcome>>) {
174 self.outcome = outcome.into();
175 }
176}
177
178pub enum UpdateStreamOrOutcome<U> {
181 UpdateStream(BoxStream<'static, U>),
182 Outcome(U),
183}
184
185impl<U: Debug> Debug for UpdateStreamOrOutcome<U> {
186 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187 match self {
188 UpdateStreamOrOutcome::UpdateStream(_) => write!(f, "UpdateStream"),
189 UpdateStreamOrOutcome::Outcome(o) => f.debug_tuple("Outcome").field(o).finish(),
190 }
191 }
192}
193
194impl<U> UpdateStreamOrOutcome<U>
195where
196 U: MaybeSend + MaybeSync + 'static,
197{
198 pub fn into_stream(self) -> BoxStream<'static, U> {
202 match self {
203 UpdateStreamOrOutcome::UpdateStream(stream) => stream,
204 UpdateStreamOrOutcome::Outcome(outcome) => {
205 Box::pin(stream::once(future::ready(outcome)))
206 }
207 }
208 }
209
210 pub async fn await_outcome(self) -> Option<U> {
214 match self {
215 UpdateStreamOrOutcome::Outcome(outcome) => Some(outcome),
216 UpdateStreamOrOutcome::UpdateStream(mut stream) => {
217 let mut last_update = None;
218 while let Some(update) = stream.next().await {
219 last_update = Some(update);
220 }
221 last_update
222 }
223 }
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use futures::stream;
230 use serde_json::Value;
231
232 use super::*;
233
234 #[tokio::test]
235 async fn test_await_outcome_cached() {
236 let test_value = serde_json::json!({"status": "completed", "amount": 100});
237 let cached_outcome = UpdateStreamOrOutcome::Outcome(test_value.clone());
238 let result = cached_outcome.await_outcome().await;
239 assert_eq!(result, Some(test_value));
240 }
241
242 #[tokio::test]
243 async fn test_await_outcome_uncached_with_updates() {
244 let update_stream = Box::pin(stream::iter(vec![
245 Value::from(0),
246 Value::from(1),
247 Value::from(2),
248 ]));
249 let uncached_outcome = UpdateStreamOrOutcome::UpdateStream(update_stream);
250 let result = uncached_outcome.await_outcome().await;
251 assert_eq!(result, Some(Value::from(2)));
252 }
253
254 #[tokio::test]
255 async fn test_await_outcome_uncached_empty_stream() {
256 let empty_stream = Box::pin(stream::empty::<serde_json::Value>());
257 let uncached_outcome = UpdateStreamOrOutcome::UpdateStream(empty_stream);
258 let result = uncached_outcome.await_outcome().await;
259 assert_eq!(result, None);
260 }
261
262 #[tokio::test]
263 async fn test_await_outcome_uncached_single_update() {
264 let update_stream = Box::pin(stream::once(async { Value::from(0) }));
265 let uncached_outcome = UpdateStreamOrOutcome::UpdateStream(update_stream);
266 let result = uncached_outcome.await_outcome().await;
267 assert_eq!(result, Some(Value::from(0)));
268 }
269}