1use std::collections::HashSet;
2use std::fmt::Debug;
3use std::ops::Range;
4use std::time::Duration;
5
6use fedimint_client_module::oplog::{
7 IOperationLog, JsonStringed, OperationLogEntry, OperationOutcome, UpdateStreamOrOutcome,
8};
9use fedimint_core::core::OperationId;
10use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _};
11use fedimint_core::task::{MaybeSend, MaybeSync};
12use fedimint_core::time::now;
13use fedimint_core::util::BoxStream;
14use fedimint_core::{apply, async_trait_maybe_send};
15use fedimint_logging::LOG_CLIENT;
16use futures::StreamExt as _;
17use serde::Serialize;
18use serde::de::DeserializeOwned;
19use tokio::sync::OnceCell;
20use tracing::{error, instrument, warn};
21
22use crate::db::{ChronologicalOperationLogKey, OperationLogKey};
23
24#[cfg(test)]
25mod tests;
26
27#[derive(Debug, Clone)]
28pub struct OperationLog {
29 db: Database,
30 oldest_entry: tokio::sync::OnceCell<ChronologicalOperationLogKey>,
31}
32
33impl OperationLog {
34 pub fn new(db: Database) -> Self {
35 Self {
36 db,
37 oldest_entry: OnceCell::new(),
38 }
39 }
40
41 async fn get_oldest_operation_log_key(&self) -> Option<ChronologicalOperationLogKey> {
45 let mut dbtx = self.db.begin_transaction_nc().await;
46 self.oldest_entry
47 .get_or_try_init(move || async move {
48 dbtx.find_by_prefix(&crate::db::ChronologicalOperationLogKeyPrefix)
49 .await
50 .map(|(key, ())| key)
51 .next()
52 .await
53 .ok_or(())
54 })
55 .await
56 .ok()
57 .copied()
58 }
59
60 pub async fn add_operation_log_entry_dbtx(
61 &self,
62 dbtx: &mut DatabaseTransaction<'_>,
63 operation_id: OperationId,
64 operation_type: &str,
65 operation_meta: impl serde::Serialize,
66 ) {
67 dbtx.insert_new_entry(
68 &OperationLogKey { operation_id },
69 &OperationLogEntry::new(
70 operation_type.to_string(),
71 JsonStringed(
72 serde_json::to_value(operation_meta)
73 .expect("Can only fail if meta is not serializable"),
74 ),
75 None,
76 ),
77 )
78 .await;
79 dbtx.insert_new_entry(
80 &ChronologicalOperationLogKey {
81 creation_time: now(),
82 operation_id,
83 },
84 &(),
85 )
86 .await;
87 }
88
89 #[deprecated(since = "0.6.0", note = "Use `paginate_operations_rev` instead")]
90 pub async fn list_operations(
91 &self,
92 limit: usize,
93 last_seen: Option<ChronologicalOperationLogKey>,
94 ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> {
95 self.paginate_operations_rev(limit, last_seen).await
96 }
97
98 pub async fn paginate_operations_rev(
101 &self,
102 limit: usize,
103 last_seen: Option<ChronologicalOperationLogKey>,
104 ) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> {
105 const EPOCH_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 7);
106
107 let start_after_key = last_seen.unwrap_or_else(|| ChronologicalOperationLogKey {
108 creation_time: now() + Duration::from_secs(30),
112 operation_id: OperationId([0; 32]),
113 });
114
115 let Some(oldest_entry_key) = self.get_oldest_operation_log_key().await else {
116 return vec![];
117 };
118
119 let mut dbtx = self.db.begin_transaction_nc().await;
120 let mut operation_log_keys = Vec::with_capacity(limit);
121
122 'outer: for key_range_rev in
127 rev_epoch_ranges(start_after_key, oldest_entry_key, EPOCH_DURATION)
128 {
129 let epoch_operation_log_keys_rev = dbtx
130 .find_by_range(key_range_rev)
131 .await
132 .map(|(key, ())| key)
133 .collect::<Vec<_>>()
134 .await;
135
136 for operation_log_key in epoch_operation_log_keys_rev.into_iter().rev() {
137 operation_log_keys.push(operation_log_key);
138 if operation_log_keys.len() >= limit {
139 break 'outer;
140 }
141 }
142 }
143
144 debug_assert!(
145 operation_log_keys.iter().collect::<HashSet<_>>().len() == operation_log_keys.len(),
146 "Operation log keys returned are not unique"
147 );
148
149 let mut operation_log_entries = Vec::with_capacity(operation_log_keys.len());
150 for operation_log_key in operation_log_keys {
151 let operation_log_entry = dbtx
152 .get_value(&OperationLogKey {
153 operation_id: operation_log_key.operation_id,
154 })
155 .await
156 .expect("Inconsistent DB");
157 operation_log_entries.push((operation_log_key, operation_log_entry));
158 }
159
160 operation_log_entries
161 }
162
163 pub async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry> {
164 Self::get_operation_dbtx(
165 &mut self.db.begin_transaction_nc().await.into_nc(),
166 operation_id,
167 )
168 .await
169 }
170
171 pub async fn get_operation_dbtx(
172 dbtx: &mut DatabaseTransaction<'_>,
173 operation_id: OperationId,
174 ) -> Option<OperationLogEntry> {
175 dbtx.get_value(&OperationLogKey { operation_id }).await
176 }
177
178 #[instrument(target = LOG_CLIENT, skip(db), level = "debug")]
180 pub async fn set_operation_outcome(
181 db: &Database,
182 operation_id: OperationId,
183 outcome: &(impl Serialize + Debug),
184 ) -> anyhow::Result<()> {
185 let outcome_json =
186 JsonStringed(serde_json::to_value(outcome).expect("Outcome is not serializable"));
187
188 let mut dbtx = db.begin_transaction().await;
189 let mut operation = Self::get_operation_dbtx(&mut dbtx.to_ref_nc(), operation_id)
190 .await
191 .expect("Operation exists");
192 operation.set_outcome(OperationOutcome {
193 time: fedimint_core::time::now(),
194 outcome: outcome_json,
195 });
196 dbtx.insert_entry(&OperationLogKey { operation_id }, &operation)
197 .await;
198 dbtx.commit_tx_result().await?;
199
200 Ok(())
201 }
202
203 pub fn outcome_or_updates<U, S>(
208 db: &Database,
209 operation_id: OperationId,
210 operation_log_entry: OperationLogEntry,
211 stream_gen: impl FnOnce() -> S,
212 ) -> UpdateStreamOrOutcome<U>
213 where
214 U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
215 S: futures::Stream<Item = U> + MaybeSend + 'static,
216 {
217 match operation_log_entry.outcome::<U>() {
218 Some(outcome) => UpdateStreamOrOutcome::Outcome(outcome),
219 None => UpdateStreamOrOutcome::UpdateStream(caching_operation_update_stream(
220 db.clone(),
221 operation_id,
222 stream_gen(),
223 )),
224 }
225 }
226
227 pub async fn optimistically_set_operation_outcome(
232 db: &Database,
233 operation_id: OperationId,
234 outcome: &(impl Serialize + Debug),
235 ) {
236 if let Err(e) = Self::set_operation_outcome(db, operation_id, outcome).await {
237 warn!(
238 target: LOG_CLIENT,
239 "Error setting operation outcome: {e}"
240 );
241 }
242 }
243}
244
245#[apply(async_trait_maybe_send!)]
246impl IOperationLog for OperationLog {
247 async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry> {
248 OperationLog::get_operation(self, operation_id).await
249 }
250
251 async fn get_operation_dbtx(
252 &self,
253 dbtx: &mut DatabaseTransaction<'_>,
254 operation_id: OperationId,
255 ) -> Option<OperationLogEntry> {
256 OperationLog::get_operation_dbtx(dbtx, operation_id).await
257 }
258
259 async fn add_operation_log_entry_dbtx(
260 &self,
261 dbtx: &mut DatabaseTransaction<'_>,
262 operation_id: OperationId,
263 operation_type: &str,
264 operation_meta: serde_json::Value,
265 ) {
266 OperationLog::add_operation_log_entry_dbtx(
267 self,
268 dbtx,
269 operation_id,
270 operation_type,
271 operation_meta,
272 )
273 .await
274 }
275
276 fn outcome_or_updates(
277 &self,
278 db: &Database,
279 operation_id: OperationId,
280 operation: OperationLogEntry,
281 stream_gen: Box<dyn FnOnce() -> BoxStream<'static, serde_json::Value>>,
282 ) -> UpdateStreamOrOutcome<serde_json::Value> {
283 match OperationLog::outcome_or_updates(db, operation_id, operation, stream_gen) {
284 UpdateStreamOrOutcome::UpdateStream(pin) => UpdateStreamOrOutcome::UpdateStream(pin),
285 UpdateStreamOrOutcome::Outcome(o) => {
286 UpdateStreamOrOutcome::Outcome(serde_json::from_value(o).expect("Can't fail"))
287 }
288 }
289 }
290}
291fn rev_epoch_ranges(
301 start_after: ChronologicalOperationLogKey,
302 last_entry: ChronologicalOperationLogKey,
303 epoch_duration: Duration,
304) -> impl Iterator<Item = Range<ChronologicalOperationLogKey>> {
305 (0..)
310 .map(move |epoch| start_after.creation_time - epoch * epoch_duration)
311 .take_while(move |&start_time| start_time >= last_entry.creation_time)
315 .map(move |start_time| {
316 let end_time = start_time - epoch_duration;
317
318 let start_key = if start_time == start_after.creation_time {
322 start_after
323 } else {
324 ChronologicalOperationLogKey {
325 creation_time: start_time,
326 operation_id: OperationId([0; 32]),
327 }
328 };
329
330 let end_key = ChronologicalOperationLogKey {
335 creation_time: end_time,
336 operation_id: OperationId([0; 32]),
337 };
338
339 Range {
343 start: end_key,
344 end: start_key,
345 }
346 })
347}
348
349pub fn caching_operation_update_stream<'a, U, S>(
352 db: Database,
353 operation_id: OperationId,
354 stream: S,
355) -> BoxStream<'a, U>
356where
357 U: Clone + Serialize + Debug + MaybeSend + MaybeSync + 'static,
358 S: futures::Stream<Item = U> + MaybeSend + 'a,
359{
360 let mut stream = Box::pin(stream);
361 Box::pin(async_stream::stream! {
362 let mut last_update = None;
363 while let Some(update) = stream.next().await {
364 yield update.clone();
365 last_update = Some(update);
366 }
367
368 let Some(last_update) = last_update else {
369 error!(
370 target: LOG_CLIENT,
371 "Stream ended without any updates, this should not happen!"
372 );
373 return;
374 };
375
376 OperationLog::optimistically_set_operation_outcome(&db, operation_id, &last_update).await;
377 })
378}