fedimint_client_module/
oplog.rs

1use std::fmt::Debug;
2use std::future;
3use std::time::SystemTime;
4
5use fedimint_core::core::OperationId;
6use fedimint_core::db::{Database, DatabaseTransaction};
7use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
8use fedimint_core::module::registry::ModuleDecoderRegistry;
9use fedimint_core::task::{MaybeSend, MaybeSync};
10use fedimint_core::util::BoxStream;
11use fedimint_core::{apply, async_trait_maybe_send};
12use futures::{StreamExt, stream};
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15
16/// Json value using string representation as db encoding.
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(transparent)]
19pub struct JsonStringed(pub serde_json::Value);
20
21impl Encodable for JsonStringed {
22    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
23        let json_str = serde_json::to_string(&self.0).expect("JSON serialization should not fail");
24        json_str.consensus_encode(writer)
25    }
26}
27
28impl Decodable for JsonStringed {
29    fn consensus_decode_partial<R: std::io::Read>(
30        r: &mut R,
31        modules: &ModuleDecoderRegistry,
32    ) -> Result<Self, DecodeError> {
33        let json_str = String::consensus_decode_partial(r, modules)?;
34        let value = serde_json::from_str(&json_str).map_err(DecodeError::from_err)?;
35        Ok(JsonStringed(value))
36    }
37}
38
39#[apply(async_trait_maybe_send!)]
40pub trait IOperationLog {
41    async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry>;
42
43    async fn get_operation_dbtx(
44        &self,
45        dbtx: &mut DatabaseTransaction<'_>,
46        operation_id: OperationId,
47    ) -> Option<OperationLogEntry>;
48
49    async fn add_operation_log_entry_dbtx(
50        &self,
51        dbtx: &mut DatabaseTransaction<'_>,
52        operation_id: OperationId,
53        operation_type: &str,
54        operation_meta: serde_json::Value,
55    );
56
57    fn outcome_or_updates(
58        &self,
59        db: &Database,
60        operation_id: OperationId,
61        operation_log_entry: OperationLogEntry,
62        stream_gen: Box<dyn FnOnce() -> BoxStream<'static, serde_json::Value>>,
63    ) -> UpdateStreamOrOutcome<serde_json::Value>;
64}
65
66/// Represents the outcome of an operation, combining both the outcome value and
67/// its timestamp
68#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable, PartialEq, Eq)]
69pub struct OperationOutcome {
70    pub time: SystemTime,
71    pub outcome: JsonStringed,
72}
73
74/// Represents an operation triggered by a user, typically related to sending or
75/// receiving money.
76///
77/// There are three levels of introspection possible for `OperationLogEntry`s:
78///   1. The [`OperationLogEntry::operation_module_kind`] function returns the
79///      kind of the module that created the operation.
80///   2. The [`OperationLogEntry::meta`] function returns static meta data that
81///      was associated with the operation when it was created. Modules define
82///      their own meta structures, so the module kind has to be used to
83///      determine the structure of the meta data.
84///   3. To find out the current state of the operation there is a two-step
85///      process:
86///      * First, the [`OperationLogEntry::outcome`] function returns the
87///        outcome if the operation finished **and** the update subscription
88///        stream has been processed till its end at least once.
89///      * If that isn't the case, the [`OperationLogEntry::outcome`] method
90///        will return `None` and the appropriate update subscription function
91///        has to be called. See the respective client extension trait for these
92///        functions.
93#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
94pub struct OperationLogEntry {
95    pub(crate) operation_module_kind: String,
96    pub(crate) meta: JsonStringed,
97    // TODO: probably change all that JSON to Dyn-types
98    pub(crate) outcome: Option<OperationOutcome>,
99}
100
101impl OperationLogEntry {
102    pub fn new(
103        operation_module_kind: String,
104        meta: JsonStringed,
105        outcome: Option<OperationOutcome>,
106    ) -> Self {
107        Self {
108            operation_module_kind,
109            meta,
110            outcome,
111        }
112    }
113
114    /// Returns the kind of the module that generated the operation
115    pub fn operation_module_kind(&self) -> &str {
116        &self.operation_module_kind
117    }
118
119    /// Returns the meta data of the operation. This is a JSON value that can be
120    /// either returned as a [`serde_json::Value`] or deserialized into a
121    /// specific type. The specific type should be named `<Module>OperationMeta`
122    /// in the module's client crate. The module can be determined by calling
123    /// [`OperationLogEntry::operation_module_kind`].
124    pub fn meta<M: DeserializeOwned>(&self) -> M {
125        self.try_meta()
126            .expect("JSON deserialization should not fail")
127    }
128
129    /// Fallible version of [`OperationLogEntry::meta`]. Used to avoid panics in
130    /// the case of failed past migrations, resulting in invalid encodings in
131    /// the DB.
132    pub fn try_meta<M: DeserializeOwned>(&self) -> Result<M, serde_json::Error> {
133        serde_json::from_value(self.meta.0.clone())
134    }
135
136    /// Returns the last state update of the operation, if any was cached yet.
137    /// If this hasn't been the case yet and `None` is returned subscribe to the
138    /// appropriate update stream.
139    ///
140    /// ## Determining the return type
141    /// [`OperationLogEntry::meta`] should tell you the which operation type of
142    /// a given module the outcome belongs to. The operation type will have a
143    /// corresponding `async fn subscribe_type(&self, operation_id:
144    /// OperationId) -> anyhow::Result<UpdateStreamOrOutcome<TypeState>>;`
145    /// function that returns a `UpdateStreamOrOutcome<S>` where `S` is the
146    /// high-level state the operation is in. If this state is terminal, i.e.
147    /// the stream closes after returning it, it will be cached as the `outcome`
148    /// of the operation.
149    ///
150    /// This means the type to be used for deserializing the outcome is `S`,
151    /// often called `<OperationType>State`. Alternatively one can also use
152    /// [`serde_json::Value`] to get the unstructured data.
153    pub fn outcome<D: DeserializeOwned>(&self) -> Option<D> {
154        self.try_outcome()
155            .expect("JSON deserialization should not fail")
156    }
157
158    /// Fallible version of [`OperationLogEntry::outcome`]. Used to avoid panics
159    /// in the case of failed past migrations, resulting in invalid encodings in
160    /// the DB.
161    pub fn try_outcome<D: DeserializeOwned>(&self) -> Result<Option<D>, serde_json::Error> {
162        self.outcome
163            .as_ref()
164            .map(|outcome| serde_json::from_value(outcome.outcome.0.clone()))
165            .transpose()
166    }
167
168    /// Returns the time when the outcome was cached.
169    pub fn outcome_time(&self) -> Option<SystemTime> {
170        self.outcome.as_ref().map(|o| o.time)
171    }
172
173    pub fn set_outcome(&mut self, outcome: impl Into<Option<OperationOutcome>>) {
174        self.outcome = outcome.into();
175    }
176}
177
178/// Either a stream of operation updates if the operation hasn't finished yet or
179/// its outcome otherwise.
180pub enum UpdateStreamOrOutcome<U> {
181    UpdateStream(BoxStream<'static, U>),
182    Outcome(U),
183}
184
185impl<U> UpdateStreamOrOutcome<U>
186where
187    U: MaybeSend + MaybeSync + 'static,
188{
189    /// Returns a stream no matter if the operation is finished. If there
190    /// already is a cached outcome the stream will only return that, otherwise
191    /// all updates will be returned until the operation finishes.
192    pub fn into_stream(self) -> BoxStream<'static, U> {
193        match self {
194            UpdateStreamOrOutcome::UpdateStream(stream) => stream,
195            UpdateStreamOrOutcome::Outcome(outcome) => {
196                Box::pin(stream::once(future::ready(outcome)))
197            }
198        }
199    }
200
201    /// Awaits the outcome of the operation update stream, either by returning
202    /// the cached value or by consuming the entire stream and returning the
203    /// last update.
204    pub async fn await_outcome(self) -> Option<U> {
205        match self {
206            UpdateStreamOrOutcome::Outcome(outcome) => Some(outcome),
207            UpdateStreamOrOutcome::UpdateStream(mut stream) => {
208                let mut last_update = None;
209                while let Some(update) = stream.next().await {
210                    last_update = Some(update);
211                }
212                last_update
213            }
214        }
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use futures::stream;
221    use serde_json::Value;
222
223    use super::*;
224
225    #[tokio::test]
226    async fn test_await_outcome_cached() {
227        let test_value = serde_json::json!({"status": "completed", "amount": 100});
228        let cached_outcome = UpdateStreamOrOutcome::Outcome(test_value.clone());
229        let result = cached_outcome.await_outcome().await;
230        assert_eq!(result, Some(test_value));
231    }
232
233    #[tokio::test]
234    async fn test_await_outcome_uncached_with_updates() {
235        let update_stream = Box::pin(stream::iter(vec![
236            Value::from(0),
237            Value::from(1),
238            Value::from(2),
239        ]));
240        let uncached_outcome = UpdateStreamOrOutcome::UpdateStream(update_stream);
241        let result = uncached_outcome.await_outcome().await;
242        assert_eq!(result, Some(Value::from(2)));
243    }
244
245    #[tokio::test]
246    async fn test_await_outcome_uncached_empty_stream() {
247        let empty_stream = Box::pin(stream::empty::<serde_json::Value>());
248        let uncached_outcome = UpdateStreamOrOutcome::UpdateStream(empty_stream);
249        let result = uncached_outcome.await_outcome().await;
250        assert_eq!(result, None);
251    }
252
253    #[tokio::test]
254    async fn test_await_outcome_uncached_single_update() {
255        let update_stream = Box::pin(stream::once(async { Value::from(0) }));
256        let uncached_outcome = UpdateStreamOrOutcome::UpdateStream(update_stream);
257        let result = uncached_outcome.await_outcome().await;
258        assert_eq!(result, Some(Value::from(0)));
259    }
260}