fedimint_client/
oplog.rs

1use std::collections::HashSet;
2use std::fmt::Debug;
3use std::ops::Range;
4use std::time::Duration;
5
6use fedimint_client_module::oplog::{
7    IOperationLog, JsonStringed, OperationLogEntry, OperationOutcome, UpdateStreamOrOutcome,
8};
9use fedimint_core::core::OperationId;
10use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _};
11use fedimint_core::task::{MaybeSend, MaybeSync};
12use fedimint_core::time::now;
13use fedimint_core::util::BoxStream;
14use fedimint_core::{apply, async_trait_maybe_send};
15use fedimint_logging::LOG_CLIENT;
16use futures::StreamExt as _;
17use serde::Serialize;
18use serde::de::DeserializeOwned;
19use tokio::sync::OnceCell;
20use tracing::{error, instrument, warn};
21
22use crate::db::{ChronologicalOperationLogKey, OperationLogKey};
23
24#[cfg(test)]
25mod tests;
26
27#[derive(Debug, Clone)]
28pub struct OperationLog {
29    db: Database,
30    oldest_entry: tokio::sync::OnceCell<ChronologicalOperationLogKey>,
31}
32
33impl OperationLog {
34    pub fn new(db: Database) -> Self {
35        Self {
36            db,
37            oldest_entry: OnceCell::new(),
38        }
39    }
40
41    /// Will return the oldest operation log key in the database and cache the
42    /// result. If no entry exists yet the DB will be queried on each call till
43    /// an entry is present.
44    async fn get_oldest_operation_log_key(&self) -> Option<ChronologicalOperationLogKey> {
45        let mut dbtx = self.db.begin_transaction_nc().await;
46        self.oldest_entry
47            .get_or_try_init(move || async move {
48                dbtx.find_by_prefix(&crate::db::ChronologicalOperationLogKeyPrefix)
49                    .await
50                    .map(|(key, ())| key)
51                    .next()
52                    .await
53                    .ok_or(())
54            })
55            .await
56            .ok()
57            .copied()
58    }
59
60    pub async fn add_operation_log_entry_dbtx(
61        &self,
62        dbtx: &mut DatabaseTransaction<'_>,
63        operation_id: OperationId,
64        operation_type: &str,
65        operation_meta: impl serde::Serialize,
66    ) {
67        dbtx.insert_new_entry(
68            &OperationLogKey { operation_id },
69            &OperationLogEntry::new(
70                operation_type.to_string(),
71                JsonStringed(
72                    serde_json::to_value(operation_meta)
73                        .expect("Can only fail if meta is not serializable"),
74                ),
75                None,
76            ),
77        )
78        .await;
79        dbtx.insert_new_entry(
80            &ChronologicalOperationLogKey {
81                creation_time: now(),
82                operation_id,
83            },
84            &(),
85        )
86        .await;
87    }
88
89    #[deprecated(since = "0.6.0", note = "Use `paginate_operations_rev` instead")]
90    pub async fn list_operations(
91        &self,
92        limit: usize,
93        last_seen: Option<ChronologicalOperationLogKey>,
94    ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> {
95        self.paginate_operations_rev(limit, last_seen).await
96    }
97
98    /// Returns the last `limit` operations. To fetch the next page, pass the
99    /// last operation's [`ChronologicalOperationLogKey`] as `start_after`.
100    pub async fn paginate_operations_rev(
101        &self,
102        limit: usize,
103        last_seen: Option<ChronologicalOperationLogKey>,
104    ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> {
105        const EPOCH_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 7);
106
107        let start_after_key = last_seen.unwrap_or_else(|| ChronologicalOperationLogKey {
108            // We don't expect any operations from the future to exist, since SystemTime isn't
109            // monotone and CI can be overloaded at times we add a small buffer to avoid flakiness
110            // in tests.
111            creation_time: now() + Duration::from_secs(30),
112            operation_id: OperationId([0; 32]),
113        });
114
115        let Some(oldest_entry_key) = self.get_oldest_operation_log_key().await else {
116            return vec![];
117        };
118
119        let mut dbtx = self.db.begin_transaction_nc().await;
120        let mut operation_log_keys = Vec::with_capacity(limit);
121
122        // Find all the operation log keys in the requested window. Since we decided to
123        // not introduce a find_by_range_rev function we have to jump through some
124        // hoops, see also the comments in rev_epoch_ranges.
125        // TODO: Implement using find_by_range_rev if ever introduced
126        'outer: for key_range_rev in
127            rev_epoch_ranges(start_after_key, oldest_entry_key, EPOCH_DURATION)
128        {
129            let epoch_operation_log_keys_rev = dbtx
130                .find_by_range(key_range_rev)
131                .await
132                .map(|(key, ())| key)
133                .collect::<Vec<_>>()
134                .await;
135
136            for operation_log_key in epoch_operation_log_keys_rev.into_iter().rev() {
137                operation_log_keys.push(operation_log_key);
138                if operation_log_keys.len() >= limit {
139                    break 'outer;
140                }
141            }
142        }
143
144        debug_assert!(
145            operation_log_keys.iter().collect::<HashSet<_>>().len() == operation_log_keys.len(),
146            "Operation log keys returned are not unique"
147        );
148
149        let mut operation_log_entries = Vec::with_capacity(operation_log_keys.len());
150        for operation_log_key in operation_log_keys {
151            let operation_log_entry = dbtx
152                .get_value(&OperationLogKey {
153                    operation_id: operation_log_key.operation_id,
154                })
155                .await
156                .expect("Inconsistent DB");
157            operation_log_entries.push((operation_log_key, operation_log_entry));
158        }
159
160        operation_log_entries
161    }
162
163    pub async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry> {
164        Self::get_operation_dbtx(
165            &mut self.db.begin_transaction_nc().await.into_nc(),
166            operation_id,
167        )
168        .await
169    }
170
171    pub async fn get_operation_dbtx(
172        dbtx: &mut DatabaseTransaction<'_>,
173        operation_id: OperationId,
174    ) -> Option<OperationLogEntry> {
175        dbtx.get_value(&OperationLogKey { operation_id }).await
176    }
177
178    /// Sets the outcome of an operation
179    #[instrument(target = LOG_CLIENT, skip(db), level = "debug")]
180    pub async fn set_operation_outcome(
181        db: &Database,
182        operation_id: OperationId,
183        outcome: &(impl Serialize + Debug),
184    ) -> anyhow::Result<()> {
185        let outcome_json =
186            JsonStringed(serde_json::to_value(outcome).expect("Outcome is not serializable"));
187
188        let mut dbtx = db.begin_transaction().await;
189        let mut operation = Self::get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
190            .await
191            .expect("Operation exists");
192        operation.set_outcome(OperationOutcome {
193            time: fedimint_core::time::now(),
194            outcome: outcome_json,
195        });
196        dbtx.insert_entry(&OperationLogKey { operation_id }, &operation)
197            .await;
198        dbtx.commit_tx_result().await?;
199
200        Ok(())
201    }
202
203    /// Returns an a [`UpdateStreamOrOutcome`] enum that can be converted into
204    /// an update stream for easier handling using
205    /// [`UpdateStreamOrOutcome::into_stream`] but can also be matched over to
206    /// shortcut the handling of final outcomes.
207    pub fn outcome_or_updates<U, S>(
208        db: &Database,
209        operation_id: OperationId,
210        operation_log_entry: OperationLogEntry,
211        stream_gen: impl FnOnce() -> S,
212    ) -> UpdateStreamOrOutcome<U>
213    where
214        U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
215        S: futures::Stream<Item = U> + MaybeSend + 'static,
216    {
217        match operation_log_entry.outcome::<U>() {
218            Some(outcome) => UpdateStreamOrOutcome::Outcome(outcome),
219            None => UpdateStreamOrOutcome::UpdateStream(caching_operation_update_stream(
220                db.clone(),
221                operation_id,
222                stream_gen(),
223            )),
224        }
225    }
226
227    /// Tries to set the outcome of an operation, but only logs an error if it
228    /// fails and does not return it. Since the outcome can always be recomputed
229    /// from an update stream, failing to save it isn't a problem in cases where
230    /// we do this merely for caching.
231    pub async fn optimistically_set_operation_outcome(
232        db: &Database,
233        operation_id: OperationId,
234        outcome: &(impl Serialize + Debug),
235    ) {
236        if let Err(e) = Self::set_operation_outcome(db, operation_id, outcome).await {
237            warn!(
238                target: LOG_CLIENT,
239                "Error setting operation outcome: {e}"
240            );
241        }
242    }
243}
244
245#[apply(async_trait_maybe_send!)]
246impl IOperationLog for OperationLog {
247    async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry> {
248        OperationLog::get_operation(self, operation_id).await
249    }
250
251    async fn get_operation_dbtx(
252        &self,
253        dbtx: &mut DatabaseTransaction<'_>,
254        operation_id: OperationId,
255    ) -> Option<OperationLogEntry> {
256        OperationLog::get_operation_dbtx(dbtx, operation_id).await
257    }
258
259    async fn add_operation_log_entry_dbtx(
260        &self,
261        dbtx: &mut DatabaseTransaction<'_>,
262        operation_id: OperationId,
263        operation_type: &str,
264        operation_meta: serde_json::Value,
265    ) {
266        OperationLog::add_operation_log_entry_dbtx(
267            self,
268            dbtx,
269            operation_id,
270            operation_type,
271            operation_meta,
272        )
273        .await
274    }
275
276    fn outcome_or_updates(
277        &self,
278        db: &Database,
279        operation_id: OperationId,
280        operation: OperationLogEntry,
281        stream_gen: Box<dyn FnOnce() -> BoxStream<'static, serde_json::Value>>,
282    ) -> UpdateStreamOrOutcome<serde_json::Value> {
283        match OperationLog::outcome_or_updates(db, operation_id, operation, stream_gen) {
284            UpdateStreamOrOutcome::UpdateStream(pin) => UpdateStreamOrOutcome::UpdateStream(pin),
285            UpdateStreamOrOutcome::Outcome(o) => {
286                UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
287            }
288        }
289    }
290}
291/// Returns an iterator over the ranges of operation log keys, starting from the
292/// most recent range and going backwards in time till slightly later than
293/// `last_entry`.
294///
295/// Simplifying keys to integers and assuming a `start_after` of 100, a
296/// `last_entry` of 55 and an `epoch_duration` of 10 the ranges would be:
297/// ```text
298/// [90..100, 80..90, 70..80, 60..70, 50..60]
299/// ```
300fn rev_epoch_ranges(
301    start_after: ChronologicalOperationLogKey,
302    last_entry: ChronologicalOperationLogKey,
303    epoch_duration: Duration,
304) -> impl Iterator<Item = Range<ChronologicalOperationLogKey>> {
305    // We want to fetch all operations that were created before `start_after`, going
306    // backwards in time. This means "start" generally means a later time than
307    // "end". Only when creating a rust Range we have to swap the terminology (see
308    // comment there).
309    (0..)
310        .map(move |epoch| start_after.creation_time - epoch * epoch_duration)
311        // We want to get all operation log keys in the range [last_key, start_after). So as
312        // long as the start time is greater than the last key's creation time, we have to
313        // keep going.
314        .take_while(move |&start_time| start_time >= last_entry.creation_time)
315        .map(move |start_time| {
316            let end_time = start_time - epoch_duration;
317
318            // In the edge case that there were two events logged at exactly the same time
319            // we need to specify the correct operation_id for the first key. Otherwise, we
320            // could miss entries.
321            let start_key = if start_time == start_after.creation_time {
322                start_after
323            } else {
324                ChronologicalOperationLogKey {
325                    creation_time: start_time,
326                    operation_id: OperationId([0; 32]),
327                }
328            };
329
330            // We could also special-case the last key here, but it's not necessary, making
331            // it last_key if end_time < last_key.creation_time. We know there are no
332            // entries beyond last_key though, so the range query will be equivalent either
333            // way.
334            let end_key = ChronologicalOperationLogKey {
335                creation_time: end_time,
336                operation_id: OperationId([0; 32]),
337            };
338
339            // We want to go backwards using a forward range query. This means we have to
340            // swap the start and end keys and then reverse the vector returned by the
341            // query.
342            Range {
343                start: end_key,
344                end: start_key,
345            }
346        })
347}
348
349/// Wraps an operation update stream such that the last update before it closes
350/// is tried to be written to the operation log entry as its outcome.
351pub fn caching_operation_update_stream<'a, U, S>(
352    db: Database,
353    operation_id: OperationId,
354    stream: S,
355) -> BoxStream<'a, U>
356where
357    U: Clone + Serialize + Debug + MaybeSend + MaybeSync + 'static,
358    S: futures::Stream<Item = U> + MaybeSend + 'a,
359{
360    let mut stream = Box::pin(stream);
361    Box::pin(async_stream::stream! {
362        let mut last_update = None;
363        while let Some(update) = stream.next().await {
364            yield update.clone();
365            last_update = Some(update);
366        }
367
368        let Some(last_update) = last_update else {
369            error!(
370                target: LOG_CLIENT,
371                "Stream ended without any updates, this should not happen!"
372            );
373            return;
374        };
375
376        OperationLog::optimistically_set_operation_outcome(&db, operation_id, &last_update).await;
377    })
378}