1#![allow(clippy::needless_lifetimes)]
2
3use std::borrow::Cow;
15use std::str::FromStr;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use fedimint_core::core::{ModuleInstanceId, ModuleKind};
19use fedimint_core::db::{
20 Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, NonCommittable,
21};
22use fedimint_core::encoding::{Decodable, Encodable};
23use fedimint_core::{Amount, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
24use fedimint_logging::LOG_CLIENT_EVENT_LOG;
25use futures::{Future, StreamExt};
26use itertools::Itertools;
27use serde::{Deserialize, Serialize};
28use tokio::sync::{broadcast, watch};
29use tracing::{debug, trace};
30
31pub const DB_KEY_PREFIX_UNORDERED_EVENT_LOG: u8 = 0x3a;
40pub const DB_KEY_PREFIX_EVENT_LOG: u8 = 0x39;
41pub const DB_KEY_PREFIX_EVENT_LOG_TRIMABLE: u8 = 0x41;
42
43const TRIMABLE_EVENTLOG_MIN_ID_AGE: u64 = 10_000;
45const TRIMABLE_EVENTLOG_MIN_TS_AGE: u64 = 14 * 24 * 60 * 60 * 1_000_000;
47const TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS: usize = 100_000;
49
50pub enum EventPersistence {
75 Transient,
77 Trimable,
79 Persistent,
82}
83
84pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
85 const MODULE: Option<ModuleKind>;
86 const KIND: EventKind;
87 const PERSISTENCE: EventPersistence;
88}
89
90static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
93
94#[derive(Debug, Encodable, Decodable)]
101pub struct UnordedEventLogId {
102 ts_usecs: u64,
103 counter: u64,
104}
105
106impl UnordedEventLogId {
107 fn new() -> Self {
108 Self {
109 ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
110 .unwrap_or(u64::MAX),
112 counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
113 }
114 }
115}
116
117#[derive(
120 Copy,
121 Clone,
122 Debug,
123 Encodable,
124 Decodable,
125 Default,
126 PartialEq,
127 Eq,
128 PartialOrd,
129 Ord,
130 Serialize,
131 Deserialize,
132)]
133pub struct EventLogId(u64);
134
135impl EventLogId {
136 pub const LOG_START: EventLogId = EventLogId(0);
137
138 fn next(self) -> EventLogId {
139 Self(self.0 + 1)
140 }
141
142 pub fn saturating_add(self, rhs: u64) -> EventLogId {
143 Self(self.0.saturating_add(rhs))
144 }
145
146 pub fn saturating_sub(self, rhs: u64) -> EventLogId {
147 Self(self.0.saturating_sub(rhs))
148 }
149}
150
151impl From<EventLogId> for u64 {
152 fn from(value: EventLogId) -> Self {
153 value.0
154 }
155}
156
157impl FromStr for EventLogId {
158 type Err = <u64 as FromStr>::Err;
159
160 fn from_str(s: &str) -> Result<Self, Self::Err> {
161 u64::from_str(s).map(Self)
162 }
163}
164
165#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
166pub struct EventKind(Cow<'static, str>);
167
168impl EventKind {
169 pub const fn from_static(value: &'static str) -> Self {
170 Self(Cow::Borrowed(value))
171 }
172}
173
174impl<'s> From<&'s str> for EventKind {
175 fn from(value: &'s str) -> Self {
176 Self(Cow::Owned(value.to_owned()))
177 }
178}
179
180impl From<String> for EventKind {
181 fn from(value: String) -> Self {
182 Self(Cow::Owned(value))
183 }
184}
185
186#[derive(Debug, Encodable, Decodable, Clone)]
187pub struct UnorderedEventLogEntry {
188 pub flags: u8,
189 pub inner: EventLogEntry,
190}
191
192impl UnorderedEventLogEntry {
193 pub const FLAG_PERSIST: u8 = 1;
194 pub const FLAG_TRIMABLE: u8 = 2;
195
196 fn persist(&self) -> bool {
197 self.flags & Self::FLAG_PERSIST != 0
198 }
199
200 fn trimable(&self) -> bool {
201 self.flags & Self::FLAG_TRIMABLE != 0
202 }
203}
204
205#[derive(Debug, Encodable, Decodable, Clone)]
206pub struct EventLogEntry {
207 pub kind: EventKind,
214
215 pub module: Option<(ModuleKind, ModuleInstanceId)>,
223
224 pub ts_usecs: u64,
226
227 pub payload: Vec<u8>,
230}
231
232#[derive(Debug, Clone)]
234pub struct PersistedLogEntry {
235 id: EventLogId,
236 inner: EventLogEntry,
237}
238
239impl Serialize for PersistedLogEntry {
240 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
241 where
242 S: serde::Serializer,
243 {
244 use serde::ser::SerializeStruct;
245
246 let mut state = serializer.serialize_struct("PersistedLogEntry", 5)?;
247 state.serialize_field("id", &self.id)?;
248 state.serialize_field("kind", &self.inner.kind)?;
249 state.serialize_field("module", &self.inner.module)?;
250 state.serialize_field("ts_usecs", &self.inner.ts_usecs)?;
251
252 let payload_value: serde_json::Value = serde_json::from_slice(&self.inner.payload)
254 .unwrap_or_else(|_| serde_json::Value::String(hex::encode(&self.inner.payload)));
255 state.serialize_field("payload", &payload_value)?;
256
257 state.end()
258 }
259}
260
261impl<'de> Deserialize<'de> for PersistedLogEntry {
262 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
263 where
264 D: serde::Deserializer<'de>,
265 {
266 use serde::de::{self, MapAccess, Visitor};
267
268 #[derive(Deserialize)]
269 #[serde(field_identifier, rename_all = "snake_case")]
270 enum Field {
271 Id,
272 Kind,
273 Module,
274 TsUsecs,
275 Payload,
276 }
277
278 struct PersistedLogEntryVisitor;
279
280 impl<'de> Visitor<'de> for PersistedLogEntryVisitor {
281 type Value = PersistedLogEntry;
282
283 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
284 formatter.write_str("struct PersistedLogEntry")
285 }
286
287 fn visit_map<V>(self, mut map: V) -> Result<PersistedLogEntry, V::Error>
288 where
289 V: MapAccess<'de>,
290 {
291 let mut id = None;
292 let mut kind = None;
293 let mut module = None;
294 let mut ts_usecs = None;
295 let mut payload = None;
296
297 while let Some(key) = map.next_key()? {
298 match key {
299 Field::Id => {
300 if id.is_some() {
301 return Err(de::Error::duplicate_field("id"));
302 }
303 id = Some(map.next_value()?);
304 }
305 Field::Kind => {
306 if kind.is_some() {
307 return Err(de::Error::duplicate_field("kind"));
308 }
309 kind = Some(map.next_value()?);
310 }
311 Field::Module => {
312 if module.is_some() {
313 return Err(de::Error::duplicate_field("module"));
314 }
315 module = Some(map.next_value()?);
316 }
317 Field::TsUsecs => {
318 if ts_usecs.is_some() {
319 return Err(de::Error::duplicate_field("ts_usecs"));
320 }
321 ts_usecs = Some(map.next_value()?);
322 }
323 Field::Payload => {
324 if payload.is_some() {
325 return Err(de::Error::duplicate_field("payload"));
326 }
327 let value: serde_json::Value = map.next_value()?;
328 payload = Some(serde_json::to_vec(&value).map_err(de::Error::custom)?);
329 }
330 }
331 }
332
333 let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
334 let kind = kind.ok_or_else(|| de::Error::missing_field("kind"))?;
335 let module = module.ok_or_else(|| de::Error::missing_field("module"))?;
336 let ts_usecs = ts_usecs.ok_or_else(|| de::Error::missing_field("ts_usecs"))?;
337 let payload = payload.ok_or_else(|| de::Error::missing_field("payload"))?;
338
339 Ok(PersistedLogEntry {
340 id,
341 inner: EventLogEntry {
342 kind,
343 module,
344 ts_usecs,
345 payload,
346 },
347 })
348 }
349 }
350
351 const FIELDS: &[&str] = &["id", "kind", "module", "ts_usecs", "payload"];
352 deserializer.deserialize_struct("PersistedLogEntry", FIELDS, PersistedLogEntryVisitor)
353 }
354}
355
356impl PersistedLogEntry {
357 pub fn id(&self) -> EventLogId {
358 self.id
359 }
360
361 pub fn as_raw(&self) -> &EventLogEntry {
362 &self.inner
363 }
364}
365impl_db_record!(
366 key = UnordedEventLogId,
367 value = UnorderedEventLogEntry,
368 db_prefix = DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
369);
370
371#[derive(Clone, Debug, Encodable, Decodable)]
372pub struct UnorderedEventLogIdPrefixAll;
373
374impl_db_lookup!(
375 key = UnordedEventLogId,
376 query_prefix = UnorderedEventLogIdPrefixAll
377);
378
379#[derive(Clone, Debug, Encodable, Decodable)]
380pub struct EventLogIdPrefixAll;
381
382#[derive(Clone, Debug, Encodable, Decodable)]
383pub struct EventLogIdPrefix(EventLogId);
384
385impl_db_record!(
386 key = EventLogId,
387 value = EventLogEntry,
388 db_prefix = DB_KEY_PREFIX_EVENT_LOG,
389);
390
391impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
392
393impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
394
395#[derive(
396 Copy,
397 Clone,
398 Debug,
399 Encodable,
400 Decodable,
401 Default,
402 PartialEq,
403 Eq,
404 PartialOrd,
405 Ord,
406 Serialize,
407 Deserialize,
408)]
409pub struct EventLogTrimableId(EventLogId);
410
411impl EventLogTrimableId {
412 fn next(&self) -> Self {
413 Self(self.0.next())
414 }
415
416 pub fn saturating_add(self, rhs: u64) -> Self {
417 Self(self.0.saturating_add(rhs))
418 }
419}
420
421impl From<u64> for EventLogTrimableId {
422 fn from(value: u64) -> Self {
423 Self(EventLogId(value))
424 }
425}
426
427#[derive(Clone, Debug, Encodable, Decodable)]
428pub struct EventLogTrimableIdPrefixAll;
429
430#[derive(Clone, Debug, Encodable, Decodable)]
431pub struct EventLogTrimableIdPrefix(EventLogId);
432
433impl_db_record!(
434 key = EventLogTrimableId,
435 value = EventLogEntry,
436 db_prefix = DB_KEY_PREFIX_EVENT_LOG_TRIMABLE,
437);
438
439impl_db_lookup!(
440 key = EventLogTrimableId,
441 query_prefix = EventLogTrimableIdPrefixAll
442);
443
444impl_db_lookup!(
445 key = EventLogTrimableId,
446 query_prefix = EventLogTrimableIdPrefix
447);
448
449#[apply(async_trait_maybe_send!)]
450pub trait DBTransactionEventLogExt {
451 #[allow(clippy::too_many_arguments)]
452 async fn log_event_raw(
453 &mut self,
454 log_ordering_wakeup_tx: watch::Sender<()>,
455 kind: EventKind,
456 module_kind: Option<ModuleKind>,
457 module_id: Option<ModuleInstanceId>,
458 payload: Vec<u8>,
459 persist: EventPersistence,
460 );
461
462 async fn log_event<E>(
467 &mut self,
468 log_ordering_wakeup_tx: watch::Sender<()>,
469 module_id: Option<ModuleInstanceId>,
470 event: E,
471 ) where
472 E: Event + Send,
473 {
474 self.log_event_raw(
475 log_ordering_wakeup_tx,
476 E::KIND,
477 E::MODULE,
478 module_id,
479 serde_json::to_vec(&event).expect("Serialization can't fail"),
480 <E as Event>::PERSISTENCE,
481 )
482 .await;
483 }
484
485 async fn get_next_event_log_id(&mut self) -> EventLogId;
490
491 async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId;
493
494 async fn get_event_log(
496 &mut self,
497 pos: Option<EventLogId>,
498 limit: u64,
499 ) -> Vec<PersistedLogEntry>;
500
501 async fn get_event_log_trimable(
502 &mut self,
503 pos: Option<EventLogTrimableId>,
504 limit: u64,
505 ) -> Vec<PersistedLogEntry>;
506}
507
508#[apply(async_trait_maybe_send!)]
509impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
510where
511 Cap: Send,
512{
513 async fn log_event_raw(
514 &mut self,
515 log_ordering_wakeup_tx: watch::Sender<()>,
516 kind: EventKind,
517 module_kind: Option<ModuleKind>,
518 module_id: Option<ModuleInstanceId>,
519 payload: Vec<u8>,
520 persist: EventPersistence,
521 ) {
522 assert_eq!(
523 module_kind.is_some(),
524 module_id.is_some(),
525 "Events of modules must have module_id set"
526 );
527
528 let unordered_id = UnordedEventLogId::new();
529 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
530
531 if self
532 .insert_entry(
533 &unordered_id,
534 &UnorderedEventLogEntry {
535 flags: match persist {
536 EventPersistence::Transient => 0,
537 EventPersistence::Trimable => UnorderedEventLogEntry::FLAG_TRIMABLE,
538 EventPersistence::Persistent => UnorderedEventLogEntry::FLAG_PERSIST,
539 },
540 inner: EventLogEntry {
541 kind,
542 module: module_kind.map(|kind| (kind, module_id.unwrap())),
543 ts_usecs: unordered_id.ts_usecs,
544 payload,
545 },
546 },
547 )
548 .await
549 .is_some()
550 {
551 panic!("Trying to overwrite event in the client event log");
552 }
553 self.on_commit(move || {
554 log_ordering_wakeup_tx.send_replace(());
555 });
556 }
557
558 async fn get_next_event_log_id(&mut self) -> EventLogId {
559 self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
560 .await
561 .next()
562 .await
563 .map(|(k, _v)| k.next())
564 .unwrap_or_default()
565 }
566
567 async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId {
568 EventLogTrimableId(
569 self.find_by_prefix_sorted_descending(&EventLogTrimableIdPrefixAll)
570 .await
571 .next()
572 .await
573 .map(|(k, _v)| k.0.next())
574 .unwrap_or_default(),
575 )
576 }
577
578 async fn get_event_log(
579 &mut self,
580 pos: Option<EventLogId>,
581 limit: u64,
582 ) -> Vec<PersistedLogEntry> {
583 let pos = pos.unwrap_or_default();
584 self.find_by_range(pos..pos.saturating_add(limit))
585 .await
586 .map(|(k, v)| PersistedLogEntry { id: k, inner: v })
587 .collect()
588 .await
589 }
590
591 async fn get_event_log_trimable(
592 &mut self,
593 pos: Option<EventLogTrimableId>,
594 limit: u64,
595 ) -> Vec<PersistedLogEntry> {
596 let pos = pos.unwrap_or_default();
597 self.find_by_range(pos..pos.saturating_add(limit))
598 .await
599 .map(|(k, v)| PersistedLogEntry { id: k.0, inner: v })
600 .collect()
601 .await
602 }
603}
604
605async fn trim_trimable_log(db: &Database, current_time_usecs: u64) {
607 let mut dbtx = db.begin_transaction().await;
608
609 let current_trimable_id = dbtx.get_next_event_log_trimable_id().await;
610 let min_id_threshold = current_trimable_id
611 .0
612 .saturating_sub(TRIMABLE_EVENTLOG_MIN_ID_AGE);
613 let min_ts_threshold = current_time_usecs.saturating_sub(TRIMABLE_EVENTLOG_MIN_TS_AGE);
614
615 let entries_to_delete: Vec<_> = dbtx
616 .find_by_prefix(&EventLogTrimableIdPrefixAll)
617 .await
618 .take_while(|(id, entry)| {
619 let id_old_enough = id.0 <= min_id_threshold;
620 let ts_old_enough = entry.ts_usecs <= min_ts_threshold;
621
622 async move { id_old_enough && ts_old_enough }
624 })
625 .take(TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS)
626 .map(|(id, _entry)| id)
627 .collect()
628 .await;
629
630 for id in &entries_to_delete {
631 dbtx.remove_entry(id).await;
632 }
633
634 dbtx.commit_tx().await;
635}
636
637pub async fn run_event_log_ordering_task(
640 db: Database,
641 mut log_ordering_task_wakeup: watch::Receiver<()>,
642 log_event_added: watch::Sender<()>,
643 log_event_added_transient: broadcast::Sender<EventLogEntry>,
644) {
645 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
646
647 let current_time_usecs =
648 u64::try_from(fedimint_core::time::duration_since_epoch().as_micros()).unwrap_or(u64::MAX);
649 trim_trimable_log(&db, current_time_usecs).await;
650
651 let mut next_entry_id = db
652 .begin_transaction_nc()
653 .await
654 .get_next_event_log_id()
655 .await;
656 let mut next_entry_id_trimable = db
657 .begin_transaction_nc()
658 .await
659 .get_next_event_log_trimable_id()
660 .await;
661
662 loop {
663 let mut dbtx = db.begin_transaction().await;
664
665 let unordered_events = dbtx
666 .find_by_prefix(&UnorderedEventLogIdPrefixAll)
667 .await
668 .collect::<Vec<_>>()
669 .await;
670 trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
671
672 for (unordered_id, entry) in &unordered_events {
673 assert!(
674 dbtx.remove_entry(unordered_id).await.is_some(),
675 "Must never fail to remove entry"
676 );
677 if entry.persist() {
678 if !entry.trimable() {
681 assert!(
682 dbtx.insert_entry(&next_entry_id, &entry.inner)
683 .await
684 .is_none(),
685 "Must never overwrite existing event"
686 );
687 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
688 next_entry_id = next_entry_id.next();
689 }
690
691 assert!(
693 dbtx.insert_entry(&next_entry_id_trimable, &entry.inner)
694 .await
695 .is_none(),
696 "Must never overwrite existing event"
697 );
698 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
699 next_entry_id_trimable = next_entry_id_trimable.next();
700 } else {
701 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
703 dbtx.on_commit({
704 let log_event_added_transient = log_event_added_transient.clone();
705 let entry = entry.inner.clone();
706
707 move || {
708 let _ = log_event_added_transient.send(entry);
710 }
711 });
712 }
713 }
714
715 dbtx.commit_tx().await;
719 if !unordered_events.is_empty() {
720 log_event_added.send_replace(());
721 }
722
723 trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
724 if log_ordering_task_wakeup.changed().await.is_err() {
725 break;
726 }
727 }
728
729 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
730}
731
732#[apply(async_trait_maybe_send!)]
746pub trait EventLogNonTrimableTracker {
747 async fn store(
749 &mut self,
750 dbtx: &mut DatabaseTransaction<NonCommittable>,
751 pos: EventLogId,
752 ) -> anyhow::Result<()>;
753
754 async fn load(
756 &mut self,
757 dbtx: &mut DatabaseTransaction<NonCommittable>,
758 ) -> anyhow::Result<Option<EventLogId>>;
759}
760pub type DynEventLogTracker = Box<dyn EventLogNonTrimableTracker>;
761
762#[apply(async_trait_maybe_send!)]
764pub trait EventLogTrimableTracker {
765 async fn store(
767 &mut self,
768 dbtx: &mut DatabaseTransaction<NonCommittable>,
769 pos: EventLogTrimableId,
770 ) -> anyhow::Result<()>;
771
772 async fn load(
774 &mut self,
775 dbtx: &mut DatabaseTransaction<NonCommittable>,
776 ) -> anyhow::Result<Option<EventLogTrimableId>>;
777}
778pub type DynEventLogTrimableTracker = Box<dyn EventLogTrimableTracker>;
779
780pub async fn handle_events<F, R>(
781 db: Database,
782 mut tracker: DynEventLogTracker,
783 mut log_event_added: watch::Receiver<()>,
784 call_fn: F,
785) -> anyhow::Result<()>
786where
787 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
788 R: Future<Output = anyhow::Result<()>>,
789{
790 let mut next_key: EventLogId = tracker
791 .load(&mut db.begin_transaction_nc().await)
792 .await?
793 .unwrap_or_default();
794
795 trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
796
797 loop {
798 let mut dbtx = db.begin_transaction().await;
799
800 match dbtx.get_value(&next_key).await {
801 Some(event) => {
802 (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
803
804 next_key = next_key.next();
805
806 tracker.store(&mut dbtx.to_ref_nc(), next_key).await?;
807
808 dbtx.commit_tx().await;
809 }
810 _ => {
811 if log_event_added.changed().await.is_err() {
812 break Ok(());
813 }
814 }
815 }
816 }
817}
818
819pub async fn handle_trimable_events<F, R>(
820 db: Database,
821 mut tracker: DynEventLogTrimableTracker,
822 mut log_event_added: watch::Receiver<()>,
823 call_fn: F,
824) -> anyhow::Result<()>
825where
826 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
827 R: Future<Output = anyhow::Result<()>>,
828{
829 let mut next_key: EventLogTrimableId = tracker
830 .load(&mut db.begin_transaction_nc().await)
831 .await?
832 .unwrap_or_default();
833 trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling trimable events");
834
835 loop {
836 let mut dbtx = db.begin_transaction().await;
837
838 match dbtx.get_value(&next_key).await {
839 Some(event) => {
840 (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
841
842 next_key = next_key.next();
843 tracker.store(&mut dbtx.to_ref_nc(), next_key).await?;
844
845 dbtx.commit_tx().await;
846 }
847 _ => {
848 if log_event_added.changed().await.is_err() {
849 break Ok(());
850 }
851 }
852 }
853 }
854}
855
856pub fn filter_events_by_kind<'a, I>(
859 all_events: I,
860 module_kind: ModuleKind,
861 event_kind: EventKind,
862) -> impl Iterator<Item = &'a PersistedLogEntry> + 'a
863where
864 I: IntoIterator<Item = &'a PersistedLogEntry> + 'a,
865{
866 all_events.into_iter().filter(move |e| {
867 if let Some((m, _)) = &e.inner.module {
868 e.inner.kind == event_kind && *m == module_kind
869 } else {
870 false
871 }
872 })
873}
874
875pub fn join_events<'a, L, R, Res>(
891 events_l: &'a [&PersistedLogEntry],
892 events_r: &'a [&PersistedLogEntry],
893 predicate: impl Fn(L, R, u64) -> Option<Res> + 'a,
894) -> impl Iterator<Item = Res> + 'a
895where
896 L: Event,
897 R: Event,
898{
899 events_l
900 .iter()
901 .cartesian_product(events_r)
902 .filter_map(move |(l, r)| {
903 if let Some(latency) = r.inner.ts_usecs.checked_sub(l.inner.ts_usecs) {
904 let event_l: L =
905 serde_json::from_slice(&l.inner.payload).expect("could not parse JSON");
906 let event_r: R =
907 serde_json::from_slice(&r.inner.payload).expect("could not parse JSON");
908 predicate(event_l, event_r, latency)
909 } else {
910 None
911 }
912 })
913}
914
915#[derive(Debug, Default)]
918pub struct StructuredPaymentEvents {
919 pub latencies: Vec<u64>,
920 pub fees: Vec<Amount>,
921 pub latencies_failure: Vec<u64>,
922}
923
924impl StructuredPaymentEvents {
925 pub fn new(
926 success_stats: &[(u64, Amount)],
927 failure_stats: Vec<u64>,
928 ) -> StructuredPaymentEvents {
929 let mut events = StructuredPaymentEvents {
930 latencies: success_stats.iter().map(|(l, _)| *l).collect(),
931 fees: success_stats.iter().map(|(_, f)| *f).collect(),
932 latencies_failure: failure_stats,
933 };
934 events.sort();
935 events
936 }
937
938 pub fn combine(&mut self, other: &mut StructuredPaymentEvents) {
941 self.latencies.append(&mut other.latencies);
942 self.fees.append(&mut other.fees);
943 self.latencies_failure.append(&mut other.latencies_failure);
944 self.sort();
945 }
946
947 fn sort(&mut self) {
950 self.latencies.sort_unstable();
951 self.fees.sort_unstable();
952 self.latencies_failure.sort_unstable();
953 }
954}
955
956#[cfg(test)]
957mod tests;