1#![allow(clippy::needless_lifetimes)]
2
3use std::borrow::Cow;
15use std::str::FromStr;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use fedimint_core::core::{ModuleInstanceId, ModuleKind};
19use fedimint_core::db::{
20 Database, DatabaseKey, DatabaseRecord, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
21 NonCommittable,
22};
23use fedimint_core::encoding::{Decodable, Encodable};
24use fedimint_core::task::{MaybeSend, MaybeSync};
25use fedimint_core::{Amount, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
26use fedimint_logging::LOG_CLIENT_EVENT_LOG;
27use futures::{Future, StreamExt};
28use itertools::Itertools;
29use serde::{Deserialize, Serialize};
30use tokio::sync::{broadcast, watch};
31use tracing::{debug, trace};
32
33pub const DB_KEY_PREFIX_UNORDERED_EVENT_LOG: u8 = 0x3a;
42pub const DB_KEY_PREFIX_EVENT_LOG: u8 = 0x39;
43pub const DB_KEY_PREFIX_EVENT_LOG_TRIMABLE: u8 = 0x41;
44
45const TRIMABLE_EVENTLOG_MIN_ID_AGE: u64 = 10_000;
47const TRIMABLE_EVENTLOG_MIN_TS_AGE: u64 = 14 * 24 * 60 * 60 * 1_000_000;
49const TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS: usize = 100_000;
51
52pub enum EventPersistence {
53 Transient,
55 Trimable,
57 Persistent,
60}
61
62pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
63 const MODULE: Option<ModuleKind>;
64 const KIND: EventKind;
65 const PERSISTENCE: EventPersistence;
66}
67
68static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
71
72#[derive(Debug, Encodable, Decodable)]
79pub struct UnordedEventLogId {
80 ts_usecs: u64,
81 counter: u64,
82}
83
84impl UnordedEventLogId {
85 fn new() -> Self {
86 Self {
87 ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
88 .unwrap_or(u64::MAX),
90 counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
91 }
92 }
93}
94
95#[derive(
98 Copy,
99 Clone,
100 Debug,
101 Encodable,
102 Decodable,
103 Default,
104 PartialEq,
105 Eq,
106 PartialOrd,
107 Ord,
108 Serialize,
109 Deserialize,
110)]
111pub struct EventLogId(u64);
112
113impl EventLogId {
114 pub const LOG_START: EventLogId = EventLogId(0);
115
116 fn next(self) -> EventLogId {
117 Self(self.0 + 1)
118 }
119
120 pub fn saturating_add(self, rhs: u64) -> EventLogId {
121 Self(self.0.saturating_add(rhs))
122 }
123
124 pub fn saturating_sub(self, rhs: u64) -> EventLogId {
125 Self(self.0.saturating_sub(rhs))
126 }
127}
128
129impl From<EventLogId> for u64 {
130 fn from(value: EventLogId) -> Self {
131 value.0
132 }
133}
134
135impl FromStr for EventLogId {
136 type Err = <u64 as FromStr>::Err;
137
138 fn from_str(s: &str) -> Result<Self, Self::Err> {
139 u64::from_str(s).map(Self)
140 }
141}
142
143#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
144pub struct EventKind(Cow<'static, str>);
145
146impl EventKind {
147 pub const fn from_static(value: &'static str) -> Self {
148 Self(Cow::Borrowed(value))
149 }
150}
151
152impl<'s> From<&'s str> for EventKind {
153 fn from(value: &'s str) -> Self {
154 Self(Cow::Owned(value.to_owned()))
155 }
156}
157
158impl From<String> for EventKind {
159 fn from(value: String) -> Self {
160 Self(Cow::Owned(value))
161 }
162}
163
164#[derive(Debug, Encodable, Decodable, Clone)]
165pub struct UnorderedEventLogEntry {
166 pub flags: u8,
167 pub inner: EventLogEntry,
168}
169
170impl UnorderedEventLogEntry {
171 pub const FLAG_PERSIST: u8 = 1;
172 pub const FLAG_TRIMABLE: u8 = 2;
173
174 fn persist(&self) -> bool {
175 self.flags & Self::FLAG_PERSIST != 0
176 }
177
178 fn trimable(&self) -> bool {
179 self.flags & Self::FLAG_TRIMABLE != 0
180 }
181}
182
183#[derive(Debug, Encodable, Decodable, Clone)]
184pub struct EventLogEntry {
185 pub kind: EventKind,
192
193 pub module: Option<(ModuleKind, ModuleInstanceId)>,
201
202 ts_usecs: u64,
204
205 pub payload: Vec<u8>,
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct PersistedLogEntry {
213 pub event_id: EventLogId,
214 pub event_kind: EventKind,
215 pub module: Option<(ModuleKind, u16)>,
216 pub timestamp: u64,
217 pub value: serde_json::Value,
218}
219
220impl_db_record!(
221 key = UnordedEventLogId,
222 value = UnorderedEventLogEntry,
223 db_prefix = DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
224);
225
226#[derive(Clone, Debug, Encodable, Decodable)]
227pub struct UnorderedEventLogIdPrefixAll;
228
229impl_db_lookup!(
230 key = UnordedEventLogId,
231 query_prefix = UnorderedEventLogIdPrefixAll
232);
233
234#[derive(Clone, Debug, Encodable, Decodable)]
235pub struct EventLogIdPrefixAll;
236
237#[derive(Clone, Debug, Encodable, Decodable)]
238pub struct EventLogIdPrefix(EventLogId);
239
240impl_db_record!(
241 key = EventLogId,
242 value = EventLogEntry,
243 db_prefix = DB_KEY_PREFIX_EVENT_LOG,
244);
245
246impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
247
248impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
249
250#[derive(
251 Copy,
252 Clone,
253 Debug,
254 Encodable,
255 Decodable,
256 Default,
257 PartialEq,
258 Eq,
259 PartialOrd,
260 Ord,
261 Serialize,
262 Deserialize,
263)]
264pub struct EventLogTrimableId(EventLogId);
265
266impl EventLogTrimableId {
267 fn next(&self) -> Self {
268 Self(self.0.next())
269 }
270
271 pub fn saturating_add(self, rhs: u64) -> Self {
272 Self(self.0.saturating_add(rhs))
273 }
274}
275
276impl From<u64> for EventLogTrimableId {
277 fn from(value: u64) -> Self {
278 Self(EventLogId(value))
279 }
280}
281
282#[derive(Clone, Debug, Encodable, Decodable)]
283pub struct EventLogTrimableIdPrefixAll;
284
285#[derive(Clone, Debug, Encodable, Decodable)]
286pub struct EventLogTrimableIdPrefix(EventLogId);
287
288impl_db_record!(
289 key = EventLogTrimableId,
290 value = EventLogEntry,
291 db_prefix = DB_KEY_PREFIX_EVENT_LOG_TRIMABLE,
292);
293
294impl_db_lookup!(
295 key = EventLogTrimableId,
296 query_prefix = EventLogTrimableIdPrefixAll
297);
298
299impl_db_lookup!(
300 key = EventLogTrimableId,
301 query_prefix = EventLogTrimableIdPrefix
302);
303
304#[apply(async_trait_maybe_send!)]
305pub trait DBTransactionEventLogExt {
306 #[allow(clippy::too_many_arguments)]
307 async fn log_event_raw(
308 &mut self,
309 log_ordering_wakeup_tx: watch::Sender<()>,
310 kind: EventKind,
311 module_kind: Option<ModuleKind>,
312 module_id: Option<ModuleInstanceId>,
313 payload: Vec<u8>,
314 persist: EventPersistence,
315 );
316
317 async fn log_event<E>(
322 &mut self,
323 log_ordering_wakeup_tx: watch::Sender<()>,
324 module_id: Option<ModuleInstanceId>,
325 event: E,
326 ) where
327 E: Event + Send,
328 {
329 self.log_event_raw(
330 log_ordering_wakeup_tx,
331 E::KIND,
332 E::MODULE,
333 module_id,
334 serde_json::to_vec(&event).expect("Serialization can't fail"),
335 <E as Event>::PERSISTENCE,
336 )
337 .await;
338 }
339
340 async fn get_next_event_log_id(&mut self) -> EventLogId;
345
346 async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId;
348
349 async fn get_event_log(
351 &mut self,
352 pos: Option<EventLogId>,
353 limit: u64,
354 ) -> Vec<PersistedLogEntry>;
355
356 async fn get_event_log_trimable(
357 &mut self,
358 pos: Option<EventLogTrimableId>,
359 limit: u64,
360 ) -> Vec<PersistedLogEntry>;
361}
362
363#[apply(async_trait_maybe_send!)]
364impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
365where
366 Cap: Send,
367{
368 async fn log_event_raw(
369 &mut self,
370 log_ordering_wakeup_tx: watch::Sender<()>,
371 kind: EventKind,
372 module_kind: Option<ModuleKind>,
373 module_id: Option<ModuleInstanceId>,
374 payload: Vec<u8>,
375 persist: EventPersistence,
376 ) {
377 assert_eq!(
378 module_kind.is_some(),
379 module_id.is_some(),
380 "Events of modules must have module_id set"
381 );
382
383 let unordered_id = UnordedEventLogId::new();
384 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
385
386 if self
387 .insert_entry(
388 &unordered_id,
389 &UnorderedEventLogEntry {
390 flags: match persist {
391 EventPersistence::Transient => 0,
392 EventPersistence::Trimable => UnorderedEventLogEntry::FLAG_TRIMABLE,
393 EventPersistence::Persistent => UnorderedEventLogEntry::FLAG_PERSIST,
394 },
395 inner: EventLogEntry {
396 kind,
397 module: module_kind.map(|kind| (kind, module_id.unwrap())),
398 ts_usecs: unordered_id.ts_usecs,
399 payload,
400 },
401 },
402 )
403 .await
404 .is_some()
405 {
406 panic!("Trying to overwrite event in the client event log");
407 }
408 self.on_commit(move || {
409 log_ordering_wakeup_tx.send_replace(());
410 });
411 }
412
413 async fn get_next_event_log_id(&mut self) -> EventLogId {
414 self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
415 .await
416 .next()
417 .await
418 .map(|(k, _v)| k.next())
419 .unwrap_or_default()
420 }
421
422 async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId {
423 EventLogTrimableId(
424 self.find_by_prefix_sorted_descending(&EventLogTrimableIdPrefixAll)
425 .await
426 .next()
427 .await
428 .map(|(k, _v)| k.0.next())
429 .unwrap_or_default(),
430 )
431 }
432
433 async fn get_event_log(
434 &mut self,
435 pos: Option<EventLogId>,
436 limit: u64,
437 ) -> Vec<PersistedLogEntry> {
438 let pos = pos.unwrap_or_default();
439 self.find_by_range(pos..pos.saturating_add(limit))
440 .await
441 .map(|(k, v)| PersistedLogEntry {
442 event_id: k,
443 event_kind: v.kind,
444 module: v.module,
445 timestamp: v.ts_usecs,
446 value: serde_json::from_slice(&v.payload).unwrap_or_default(),
447 })
448 .collect()
449 .await
450 }
451
452 async fn get_event_log_trimable(
453 &mut self,
454 pos: Option<EventLogTrimableId>,
455 limit: u64,
456 ) -> Vec<PersistedLogEntry> {
457 let pos = pos.unwrap_or_default();
458 self.find_by_range(pos..pos.saturating_add(limit))
459 .await
460 .map(|(k, v)| PersistedLogEntry {
461 event_id: k.0,
462 event_kind: v.kind,
463 module: v.module,
464 timestamp: v.ts_usecs,
465 value: serde_json::from_slice(&v.payload).unwrap_or_default(),
466 })
467 .collect()
468 .await
469 }
470}
471
472async fn trim_trimable_log(db: &Database, current_time_usecs: u64) {
474 let mut dbtx = db.begin_transaction().await;
475
476 let current_trimable_id = dbtx.get_next_event_log_trimable_id().await;
477 let min_id_threshold = current_trimable_id
478 .0
479 .saturating_sub(TRIMABLE_EVENTLOG_MIN_ID_AGE);
480 let min_ts_threshold = current_time_usecs.saturating_sub(TRIMABLE_EVENTLOG_MIN_TS_AGE);
481
482 let entries_to_delete: Vec<_> = dbtx
483 .find_by_prefix(&EventLogTrimableIdPrefixAll)
484 .await
485 .take_while(|(id, entry)| {
486 let id_old_enough = id.0 <= min_id_threshold;
487 let ts_old_enough = entry.ts_usecs <= min_ts_threshold;
488
489 async move { id_old_enough && ts_old_enough }
491 })
492 .take(TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS)
493 .map(|(id, _entry)| id)
494 .collect()
495 .await;
496
497 for id in &entries_to_delete {
498 dbtx.remove_entry(id).await;
499 }
500
501 dbtx.commit_tx().await;
502}
503
504pub async fn run_event_log_ordering_task(
507 db: Database,
508 mut log_ordering_task_wakeup: watch::Receiver<()>,
509 log_event_added: watch::Sender<()>,
510 log_event_added_transient: broadcast::Sender<EventLogEntry>,
511) {
512 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
513
514 let current_time_usecs =
515 u64::try_from(fedimint_core::time::duration_since_epoch().as_micros()).unwrap_or(u64::MAX);
516 trim_trimable_log(&db, current_time_usecs).await;
517
518 let mut next_entry_id = db
519 .begin_transaction_nc()
520 .await
521 .get_next_event_log_id()
522 .await;
523 let mut next_entry_id_trimable = db
524 .begin_transaction_nc()
525 .await
526 .get_next_event_log_trimable_id()
527 .await;
528
529 loop {
530 let mut dbtx = db.begin_transaction().await;
531
532 let unordered_events = dbtx
533 .find_by_prefix(&UnorderedEventLogIdPrefixAll)
534 .await
535 .collect::<Vec<_>>()
536 .await;
537 trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
538
539 for (unordered_id, entry) in &unordered_events {
540 assert!(
541 dbtx.remove_entry(unordered_id).await.is_some(),
542 "Must never fail to remove entry"
543 );
544 if entry.persist() {
545 if !entry.trimable() {
548 assert!(
549 dbtx.insert_entry(&next_entry_id, &entry.inner)
550 .await
551 .is_none(),
552 "Must never overwrite existing event"
553 );
554 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
555 next_entry_id = next_entry_id.next();
556 }
557
558 assert!(
560 dbtx.insert_entry(&next_entry_id_trimable, &entry.inner)
561 .await
562 .is_none(),
563 "Must never overwrite existing event"
564 );
565 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
566 next_entry_id_trimable = next_entry_id_trimable.next();
567 } else {
568 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
570 dbtx.on_commit({
571 let log_event_added_transient = log_event_added_transient.clone();
572 let entry = entry.inner.clone();
573
574 move || {
575 let _ = log_event_added_transient.send(entry);
577 }
578 });
579 }
580 }
581
582 dbtx.commit_tx().await;
586 if !unordered_events.is_empty() {
587 log_event_added.send_replace(());
588 }
589
590 trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
591 if log_ordering_task_wakeup.changed().await.is_err() {
592 break;
593 }
594 }
595
596 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
597}
598
599pub async fn handle_events<F, R, K>(
600 db: Database,
601 pos_key: &K,
602 mut log_event_added: watch::Receiver<()>,
603 call_fn: F,
604) -> anyhow::Result<()>
605where
606 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
607 K: DatabaseRecord<Value = EventLogId>,
608 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
609 R: Future<Output = anyhow::Result<()>>,
610{
611 let mut next_key: EventLogId = db
612 .begin_transaction_nc()
613 .await
614 .get_value(pos_key)
615 .await
616 .unwrap_or_default();
617
618 trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
619
620 loop {
621 let mut dbtx = db.begin_transaction().await;
622
623 match dbtx.get_value(&next_key).await {
624 Some(event) => {
625 (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
626
627 next_key = next_key.next();
628 dbtx.insert_entry(pos_key, &next_key).await;
629
630 dbtx.commit_tx().await;
631 }
632 _ => {
633 if log_event_added.changed().await.is_err() {
634 break Ok(());
635 }
636 }
637 }
638 }
639}
640
641pub async fn handle_trimable_events<F, R, K>(
642 db: Database,
643 pos_key: &K,
644 mut log_event_added: watch::Receiver<()>,
645 call_fn: F,
646) -> anyhow::Result<()>
647where
648 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
649 K: DatabaseRecord<Value = EventLogTrimableId>,
650 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
651 R: Future<Output = anyhow::Result<()>>,
652{
653 let mut next_key: EventLogTrimableId = db
654 .begin_transaction_nc()
655 .await
656 .get_value(pos_key)
657 .await
658 .unwrap_or_default();
659
660 trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling trimable events");
661
662 loop {
663 let mut dbtx = db.begin_transaction().await;
664
665 match dbtx.get_value(&next_key).await {
666 Some(event) => {
667 (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
668
669 next_key = next_key.next();
670 dbtx.insert_entry(pos_key, &next_key).await;
671
672 dbtx.commit_tx().await;
673 }
674 _ => {
675 if log_event_added.changed().await.is_err() {
676 break Ok(());
677 }
678 }
679 }
680 }
681}
682
683pub fn filter_events_by_kind<'a, I>(
686 all_events: I,
687 module_kind: ModuleKind,
688 event_kind: EventKind,
689) -> impl Iterator<Item = &'a PersistedLogEntry> + 'a
690where
691 I: IntoIterator<Item = &'a PersistedLogEntry> + 'a,
692{
693 all_events.into_iter().filter(move |e| {
694 if let Some((m, _)) = &e.module {
695 e.event_kind == event_kind && *m == module_kind
696 } else {
697 false
698 }
699 })
700}
701
702pub fn join_events<'a, L, R, Res>(
713 events_l: &'a [&PersistedLogEntry],
714 events_r: &'a [&PersistedLogEntry],
715 predicate: impl Fn(L, R, u64) -> Option<Res> + 'a,
716) -> impl Iterator<Item = Res> + 'a
717where
718 L: Event,
719 R: Event,
720{
721 events_l
722 .iter()
723 .cartesian_product(events_r)
724 .filter_map(move |(l, r)| {
725 if let Some(latency) = r.timestamp.checked_sub(l.timestamp) {
726 let event_l: L =
727 serde_json::from_value(l.value.clone()).expect("could not parse JSON");
728 let event_r: R =
729 serde_json::from_value(r.value.clone()).expect("could not parse JSON");
730 predicate(event_l, event_r, latency)
731 } else {
732 None
733 }
734 })
735}
736
737#[derive(Debug, Default)]
740pub struct StructuredPaymentEvents {
741 pub latencies: Vec<u64>,
742 pub fees: Vec<Amount>,
743 pub latencies_failure: Vec<u64>,
744}
745
746impl StructuredPaymentEvents {
747 pub fn new(
748 success_stats: &[(u64, Amount)],
749 failure_stats: Vec<u64>,
750 ) -> StructuredPaymentEvents {
751 let mut events = StructuredPaymentEvents {
752 latencies: success_stats.iter().map(|(l, _)| *l).collect(),
753 fees: success_stats.iter().map(|(_, f)| *f).collect(),
754 latencies_failure: failure_stats,
755 };
756 events.sort();
757 events
758 }
759
760 pub fn combine(&mut self, other: &mut StructuredPaymentEvents) {
763 self.latencies.append(&mut other.latencies);
764 self.fees.append(&mut other.fees);
765 self.latencies_failure.append(&mut other.latencies_failure);
766 self.sort();
767 }
768
769 fn sort(&mut self) {
772 self.latencies.sort_unstable();
773 self.fees.sort_unstable();
774 self.latencies_failure.sort_unstable();
775 }
776}
777
778#[cfg(test)]
779mod tests;