1use std::fmt::Debug;
2use std::future;
3use std::time::SystemTime;
45use fedimint_core::core::OperationId;
6use fedimint_core::db::{Database, DatabaseTransaction};
7use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
8use fedimint_core::module::registry::ModuleDecoderRegistry;
9use fedimint_core::task::{MaybeSend, MaybeSync};
10use fedimint_core::util::BoxStream;
11use fedimint_core::{apply, async_trait_maybe_send};
12use futures::stream;
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
1516/// Json value using string representation as db encoding.
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(transparent)]
19pub struct JsonStringed(pub serde_json::Value);
2021impl Encodable for JsonStringed {
22fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
23let json_str = serde_json::to_string(&self.0).expect("JSON serialization should not fail");
24 json_str.consensus_encode(writer)
25 }
26}
2728impl Decodable for JsonStringed {
29fn consensus_decode_partial<R: std::io::Read>(
30 r: &mut R,
31 modules: &ModuleDecoderRegistry,
32 ) -> Result<Self, DecodeError> {
33let json_str = String::consensus_decode_partial(r, modules)?;
34let value = serde_json::from_str(&json_str).map_err(DecodeError::from_err)?;
35Ok(JsonStringed(value))
36 }
37}
3839#[apply(async_trait_maybe_send!)]
40pub trait IOperationLog {
41async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry>;
4243async fn get_operation_dbtx(
44&self,
45 dbtx: &mut DatabaseTransaction<'_>,
46 operation_id: OperationId,
47 ) -> Option<OperationLogEntry>;
4849async fn add_operation_log_entry_dbtx(
50&self,
51 dbtx: &mut DatabaseTransaction<'_>,
52 operation_id: OperationId,
53 operation_type: &str,
54 operation_meta: serde_json::Value,
55 );
5657fn outcome_or_updates(
58&self,
59 db: &Database,
60 operation_id: OperationId,
61 operation_log_entry: OperationLogEntry,
62 stream_gen: Box<dyn FnOnce() -> BoxStream<'static, serde_json::Value>>,
63 ) -> UpdateStreamOrOutcome<serde_json::Value>;
64}
6566/// Represents the outcome of an operation, combining both the outcome value and
67/// its timestamp
68#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable, PartialEq, Eq)]
69pub struct OperationOutcome {
70pub time: SystemTime,
71pub outcome: JsonStringed,
72}
7374/// Represents an operation triggered by a user, typically related to sending or
75/// receiving money.
76///
77/// There are three levels of introspection possible for `OperationLogEntry`s:
78/// 1. The [`OperationLogEntry::operation_module_kind`] function returns the
79/// kind of the module that created the operation.
80/// 2. The [`OperationLogEntry::meta`] function returns static meta data that
81/// was associated with the operation when it was created. Modules define
82/// their own meta structures, so the module kind has to be used to
83/// determine the structure of the meta data.
84/// 3. To find out the current state of the operation there is a two-step
85/// process:
86/// * First, the [`OperationLogEntry::outcome`] function returns the
87/// outcome if the operation finished **and** the update subscription
88/// stream has been processed till its end at least once.
89/// * If that isn't the case, the [`OperationLogEntry::outcome`] method
90/// will return `None` and the appropriate update subscription function
91/// has to be called. See the respective client extension trait for these
92/// functions.
93#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
94pub struct OperationLogEntry {
95pub(crate) operation_module_kind: String,
96pub(crate) meta: JsonStringed,
97// TODO: probably change all that JSON to Dyn-types
98pub(crate) outcome: Option<OperationOutcome>,
99}
100101impl OperationLogEntry {
102pub fn new(
103 operation_module_kind: String,
104 meta: JsonStringed,
105 outcome: Option<OperationOutcome>,
106 ) -> Self {
107Self {
108 operation_module_kind,
109 meta,
110 outcome,
111 }
112 }
113114/// Returns the kind of the module that generated the operation
115pub fn operation_module_kind(&self) -> &str {
116&self.operation_module_kind
117 }
118119/// Returns the meta data of the operation. This is a JSON value that can be
120 /// either returned as a [`serde_json::Value`] or deserialized into a
121 /// specific type. The specific type should be named `<Module>OperationMeta`
122 /// in the module's client crate. The module can be determined by calling
123 /// [`OperationLogEntry::operation_module_kind`].
124pub fn meta<M: DeserializeOwned>(&self) -> M {
125 serde_json::from_value(self.meta.0.clone()).expect("JSON deserialization should not fail")
126 }
127128/// Returns the last state update of the operation, if any was cached yet.
129 /// If this hasn't been the case yet and `None` is returned subscribe to the
130 /// appropriate update stream.
131 ///
132 /// ## Determining the return type
133 /// [`OperationLogEntry::meta`] should tell you the which operation type of
134 /// a given module the outcome belongs to. The operation type will have a
135 /// corresponding `async fn subscribe_type(&self, operation_id:
136 /// OperationId) -> anyhow::Result<UpdateStreamOrOutcome<TypeState>>;`
137 /// function that returns a `UpdateStreamOrOutcome<S>` where `S` is the
138 /// high-level state the operation is in. If this state is terminal, i.e.
139 /// the stream closes after returning it, it will be cached as the `outcome`
140 /// of the operation.
141 ///
142 /// This means the type to be used for deserializing the outcome is `S`,
143 /// often called `<OperationType>State`. Alternatively one can also use
144 /// [`serde_json::Value`] to get the unstructured data.
145pub fn outcome<D: DeserializeOwned>(&self) -> Option<D> {
146self.outcome.as_ref().map(|outcome| {
147 serde_json::from_value(outcome.outcome.0.clone())
148 .expect("JSON deserialization should not fail")
149 })
150 }
151152/// Returns the time when the outcome was cached.
153pub fn outcome_time(&self) -> Option<SystemTime> {
154self.outcome.as_ref().map(|o| o.time)
155 }
156157pub fn set_outcome(&mut self, outcome: impl Into<Option<OperationOutcome>>) {
158self.outcome = outcome.into();
159 }
160}
161162/// Either a stream of operation updates if the operation hasn't finished yet or
163/// its outcome otherwise.
164pub enum UpdateStreamOrOutcome<U> {
165 UpdateStream(BoxStream<'static, U>),
166 Outcome(U),
167}
168169impl<U> UpdateStreamOrOutcome<U>
170where
171U: MaybeSend + MaybeSync + 'static,
172{
173/// Returns a stream no matter if the operation is finished. If there
174 /// already is a cached outcome the stream will only return that, otherwise
175 /// all updates will be returned until the operation finishes.
176pub fn into_stream(self) -> BoxStream<'static, U> {
177match self {
178 UpdateStreamOrOutcome::UpdateStream(stream) => stream,
179 UpdateStreamOrOutcome::Outcome(outcome) => {
180 Box::pin(stream::once(future::ready(outcome)))
181 }
182 }
183 }
184}