1#![allow(clippy::needless_lifetimes)]
2
3use std::borrow::Cow;
15use std::str::FromStr;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use fedimint_core::core::{ModuleInstanceId, ModuleKind};
19use fedimint_core::db::{
20 Database, DatabaseKey, DatabaseRecord, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
21 NonCommittable,
22};
23use fedimint_core::encoding::{Decodable, Encodable};
24use fedimint_core::task::{MaybeSend, MaybeSync};
25use fedimint_core::{Amount, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
26use fedimint_logging::LOG_CLIENT_EVENT_LOG;
27use futures::{Future, StreamExt};
28use itertools::Itertools;
29use serde::{Deserialize, Serialize};
30use tokio::sync::{broadcast, watch};
31use tracing::{debug, trace};
32
33pub const DB_KEY_PREFIX_UNORDERED_EVENT_LOG: u8 = 0x3a;
42pub const DB_KEY_PREFIX_EVENT_LOG: u8 = 0x39;
43
44pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
45 const MODULE: Option<ModuleKind>;
46 const KIND: EventKind;
47 const PERSIST: bool = true;
48}
49
50static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
53
54#[derive(Debug, Encodable, Decodable)]
61pub struct UnordedEventLogId {
62 ts_usecs: u64,
63 counter: u64,
64}
65
66impl UnordedEventLogId {
67 fn new() -> Self {
68 Self {
69 ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
70 .unwrap_or(u64::MAX),
72 counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
73 }
74 }
75}
76
77#[derive(
80 Copy,
81 Clone,
82 Debug,
83 Encodable,
84 Decodable,
85 Default,
86 PartialEq,
87 Eq,
88 PartialOrd,
89 Ord,
90 Serialize,
91 Deserialize,
92)]
93pub struct EventLogId(u64);
94
95impl EventLogId {
96 pub const LOG_START: EventLogId = EventLogId(0);
97
98 fn next(self) -> EventLogId {
99 Self(self.0 + 1)
100 }
101
102 pub fn saturating_add(self, rhs: u64) -> EventLogId {
103 Self(self.0.saturating_add(rhs))
104 }
105
106 pub fn saturating_sub(self, rhs: u64) -> EventLogId {
107 Self(self.0.saturating_sub(rhs))
108 }
109}
110
111impl FromStr for EventLogId {
112 type Err = <u64 as FromStr>::Err;
113
114 fn from_str(s: &str) -> Result<Self, Self::Err> {
115 u64::from_str(s).map(Self)
116 }
117}
118
119#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
120pub struct EventKind(Cow<'static, str>);
121
122impl EventKind {
123 pub const fn from_static(value: &'static str) -> Self {
124 Self(Cow::Borrowed(value))
125 }
126}
127
128impl<'s> From<&'s str> for EventKind {
129 fn from(value: &'s str) -> Self {
130 Self(Cow::Owned(value.to_owned()))
131 }
132}
133
134impl From<String> for EventKind {
135 fn from(value: String) -> Self {
136 Self(Cow::Owned(value))
137 }
138}
139
140#[derive(Debug, Encodable, Decodable, Clone)]
141pub struct UnorderedEventLogEntry {
142 pub persist: bool,
143 pub inner: EventLogEntry,
144}
145
146#[derive(Debug, Encodable, Decodable, Clone)]
147pub struct EventLogEntry {
148 pub kind: EventKind,
155
156 pub module: Option<(ModuleKind, ModuleInstanceId)>,
164
165 ts_usecs: u64,
167
168 pub payload: Vec<u8>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct PersistedLogEntry {
176 pub event_id: EventLogId,
177 pub event_kind: EventKind,
178 pub module: Option<(ModuleKind, u16)>,
179 pub timestamp: u64,
180 pub value: serde_json::Value,
181}
182
183impl_db_record!(
184 key = UnordedEventLogId,
185 value = UnorderedEventLogEntry,
186 db_prefix = DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
187);
188
189#[derive(Clone, Debug, Encodable, Decodable)]
190pub struct UnorderedEventLogIdPrefixAll;
191
192impl_db_lookup!(
193 key = UnordedEventLogId,
194 query_prefix = UnorderedEventLogIdPrefixAll
195);
196
197#[derive(Clone, Debug, Encodable, Decodable)]
198pub struct EventLogIdPrefixAll;
199
200#[derive(Clone, Debug, Encodable, Decodable)]
201pub struct EventLogIdPrefix(EventLogId);
202
203impl_db_record!(
204 key = EventLogId,
205 value = EventLogEntry,
206 db_prefix = DB_KEY_PREFIX_EVENT_LOG,
207);
208
209impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
210
211impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
212
213#[apply(async_trait_maybe_send!)]
214pub trait DBTransactionEventLogExt {
215 async fn log_event_raw(
216 &mut self,
217 log_ordering_wakeup_tx: watch::Sender<()>,
218 kind: EventKind,
219 module_kind: Option<ModuleKind>,
220 module_id: Option<ModuleInstanceId>,
221 payload: Vec<u8>,
222 persist: bool,
223 );
224
225 async fn log_event<E>(
230 &mut self,
231 log_ordering_wakeup_tx: watch::Sender<()>,
232 module_id: Option<ModuleInstanceId>,
233 event: E,
234 ) where
235 E: Event + Send,
236 {
237 self.log_event_raw(
238 log_ordering_wakeup_tx,
239 E::KIND,
240 E::MODULE,
241 module_id,
242 serde_json::to_vec(&event).expect("Serialization can't fail"),
243 <E as Event>::PERSIST,
244 )
245 .await;
246 }
247
248 async fn get_next_event_log_id(&mut self) -> EventLogId;
253
254 async fn get_event_log(
256 &mut self,
257 pos: Option<EventLogId>,
258 limit: u64,
259 ) -> Vec<PersistedLogEntry>;
260}
261
262#[apply(async_trait_maybe_send!)]
263impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
264where
265 Cap: Send,
266{
267 async fn log_event_raw(
268 &mut self,
269 log_ordering_wakeup_tx: watch::Sender<()>,
270 kind: EventKind,
271 module_kind: Option<ModuleKind>,
272 module_id: Option<ModuleInstanceId>,
273 payload: Vec<u8>,
274 persist: bool,
275 ) {
276 assert_eq!(
277 module_kind.is_some(),
278 module_id.is_some(),
279 "Events of modules must have module_id set"
280 );
281
282 let unordered_id = UnordedEventLogId::new();
283 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
284
285 if self
286 .insert_entry(
287 &unordered_id,
288 &UnorderedEventLogEntry {
289 persist,
290 inner: EventLogEntry {
291 kind,
292 module: module_kind.map(|kind| (kind, module_id.unwrap())),
293 ts_usecs: unordered_id.ts_usecs,
294 payload,
295 },
296 },
297 )
298 .await
299 .is_some()
300 {
301 panic!("Trying to overwrite event in the client event log");
302 }
303 self.on_commit(move || {
304 let _ = log_ordering_wakeup_tx.send(());
305 });
306 }
307
308 async fn get_next_event_log_id(&mut self) -> EventLogId {
309 self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
310 .await
311 .next()
312 .await
313 .map(|(k, _v)| k.next())
314 .unwrap_or_default()
315 }
316
317 async fn get_event_log(
318 &mut self,
319 pos: Option<EventLogId>,
320 limit: u64,
321 ) -> Vec<PersistedLogEntry> {
322 let pos = pos.unwrap_or_default();
323 self.find_by_range(pos..pos.saturating_add(limit))
324 .await
325 .map(|(k, v)| PersistedLogEntry {
326 event_id: k,
327 event_kind: v.kind,
328 module: v.module,
329 timestamp: v.ts_usecs,
330 value: serde_json::from_slice(&v.payload).unwrap_or_default(),
331 })
332 .collect()
333 .await
334 }
335}
336
337pub async fn run_event_log_ordering_task(
340 db: Database,
341 mut log_ordering_task_wakeup: watch::Receiver<()>,
342 log_event_added: watch::Sender<()>,
343 log_event_added_transient: broadcast::Sender<EventLogEntry>,
344) {
345 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
346 let mut next_entry_id = db
347 .begin_transaction_nc()
348 .await
349 .get_next_event_log_id()
350 .await;
351
352 loop {
353 let mut dbtx = db.begin_transaction().await;
354
355 let unordered_events = dbtx
356 .find_by_prefix(&UnorderedEventLogIdPrefixAll)
357 .await
358 .collect::<Vec<_>>()
359 .await;
360 trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
361
362 for (unordered_id, entry) in &unordered_events {
363 assert!(
364 dbtx.remove_entry(unordered_id).await.is_some(),
365 "Must never fail to remove entry"
366 );
367 if entry.persist {
368 assert!(
369 dbtx.insert_entry(&next_entry_id, &entry.inner)
370 .await
371 .is_none(),
372 "Must never overwrite existing event"
373 );
374 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
375 next_entry_id = next_entry_id.next();
376 } else {
377 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
378 dbtx.on_commit({
379 let log_event_added_transient = log_event_added_transient.clone();
380 let entry = entry.inner.clone();
381
382 move || {
383 let _ = log_event_added_transient.send(entry);
385 }
386 });
387 }
388 }
389
390 dbtx.commit_tx().await;
394 if !unordered_events.is_empty() {
395 let _ = log_event_added.send(());
396 }
397
398 trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
399 if log_ordering_task_wakeup.changed().await.is_err() {
400 break;
401 }
402 }
403
404 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
405}
406
407pub async fn handle_events<F, R, K>(
408 db: Database,
409 pos_key: &K,
410 mut log_event_added: watch::Receiver<()>,
411 call_fn: F,
412) -> anyhow::Result<()>
413where
414 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
415 K: DatabaseRecord<Value = EventLogId>,
416 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
417 R: Future<Output = anyhow::Result<()>>,
418{
419 let mut next_key: EventLogId = db
420 .begin_transaction_nc()
421 .await
422 .get_value(pos_key)
423 .await
424 .unwrap_or_default();
425
426 trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
427
428 loop {
429 let mut dbtx = db.begin_transaction().await;
430
431 match dbtx.get_value(&next_key).await {
432 Some(event) => {
433 (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
434
435 next_key = next_key.next();
436 dbtx.insert_entry(pos_key, &next_key).await;
437
438 dbtx.commit_tx().await;
439 }
440 _ => {
441 if log_event_added.changed().await.is_err() {
442 break Ok(());
443 }
444 }
445 }
446 }
447}
448
449pub fn filter_events_by_kind<'a, I>(
452 all_events: I,
453 module_kind: ModuleKind,
454 event_kind: EventKind,
455) -> impl Iterator<Item = &'a PersistedLogEntry> + 'a
456where
457 I: IntoIterator<Item = &'a PersistedLogEntry> + 'a,
458{
459 all_events.into_iter().filter(move |e| {
460 if let Some((m, _)) = &e.module {
461 e.event_kind == event_kind && *m == module_kind
462 } else {
463 false
464 }
465 })
466}
467
468pub fn join_events<'a, L, R, Res>(
479 events_l: &'a [&PersistedLogEntry],
480 events_r: &'a [&PersistedLogEntry],
481 predicate: impl Fn(L, R, u64) -> Option<Res> + 'a,
482) -> impl Iterator<Item = Res> + 'a
483where
484 L: Event,
485 R: Event,
486{
487 events_l
488 .iter()
489 .cartesian_product(events_r)
490 .filter_map(move |(l, r)| {
491 if let Some(latency) = r.timestamp.checked_sub(l.timestamp) {
492 let event_l: L =
493 serde_json::from_value(l.value.clone()).expect("could not parse JSON");
494 let event_r: R =
495 serde_json::from_value(r.value.clone()).expect("could not parse JSON");
496 predicate(event_l, event_r, latency)
497 } else {
498 None
499 }
500 })
501}
502
503#[derive(Debug, Default)]
506pub struct StructuredPaymentEvents {
507 pub latencies: Vec<u64>,
508 pub fees: Vec<Amount>,
509 pub latencies_failure: Vec<u64>,
510}
511
512impl StructuredPaymentEvents {
513 pub fn new(
514 success_stats: &[(u64, Amount)],
515 failure_stats: Vec<u64>,
516 ) -> StructuredPaymentEvents {
517 let mut events = StructuredPaymentEvents {
518 latencies: success_stats.iter().map(|(l, _)| *l).collect(),
519 fees: success_stats.iter().map(|(_, f)| *f).collect(),
520 latencies_failure: failure_stats,
521 };
522 events.sort();
523 events
524 }
525
526 pub fn combine(&mut self, other: &mut StructuredPaymentEvents) {
529 self.latencies.append(&mut other.latencies);
530 self.fees.append(&mut other.fees);
531 self.latencies_failure.append(&mut other.latencies_failure);
532 self.sort();
533 }
534
535 fn sort(&mut self) {
538 self.latencies.sort_unstable();
539 self.fees.sort_unstable();
540 self.latencies_failure.sort_unstable();
541 }
542}
543
544#[cfg(test)]
545mod tests {
546 use std::sync::Arc;
547 use std::sync::atomic::AtomicU8;
548
549 use anyhow::bail;
550 use fedimint_core::db::IRawDatabaseExt as _;
551 use fedimint_core::db::mem_impl::MemDatabase;
552 use fedimint_core::encoding::{Decodable, Encodable};
553 use fedimint_core::impl_db_record;
554 use fedimint_core::task::TaskGroup;
555 use tokio::sync::{broadcast, watch};
556 use tokio::try_join;
557 use tracing::info;
558
559 use super::{
560 DBTransactionEventLogExt as _, EventLogId, handle_events, run_event_log_ordering_task,
561 };
562 use crate::EventKind;
563
564 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
565 pub struct TestLogIdKey;
566
567 impl_db_record!(key = TestLogIdKey, value = EventLogId, db_prefix = 0x00,);
568
569 #[test_log::test(tokio::test)]
570 async fn sanity_handle_events() {
571 let db = MemDatabase::new().into_database();
572 let tg = TaskGroup::new();
573
574 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
575 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
576 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
577 broadcast::channel(1024);
578
579 tg.spawn_cancellable(
580 "event log ordering task",
581 run_event_log_ordering_task(
582 db.clone(),
583 log_ordering_wakeup_rx,
584 log_event_added_tx,
585 log_event_added_transient_tx,
586 ),
587 );
588
589 let counter = Arc::new(AtomicU8::new(0));
590
591 let _ = try_join!(
592 handle_events(
593 db.clone(),
594 &TestLogIdKey,
595 log_event_added_rx,
596 move |_dbtx, event| {
597 let counter = counter.clone();
598 Box::pin(async move {
599 info!("{event:?}");
600
601 assert_eq!(
602 event.kind,
603 EventKind::from(format!(
604 "{}",
605 counter.load(std::sync::atomic::Ordering::Relaxed)
606 ))
607 );
608
609 if counter.load(std::sync::atomic::Ordering::Relaxed) == 4 {
610 bail!("Time to wrap up");
611 }
612 counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
613 Ok(())
614 })
615 },
616 ),
617 async {
618 for i in 0..=4 {
619 let mut dbtx = db.begin_transaction().await;
620 dbtx.log_event_raw(
621 log_ordering_wakeup_tx.clone(),
622 EventKind::from(format!("{i}")),
623 None,
624 None,
625 vec![],
626 true,
627 )
628 .await;
629
630 dbtx.commit_tx().await;
631 }
632
633 Ok(())
634 }
635 );
636 }
637}