fedimint_eventlog/
lib.rs

1#![allow(clippy::needless_lifetimes)]
2
3//! Client Event Log
4//!
5//! The goal here is to maintain a single, ordered, append only
6//! log of all important client-side events: low or high level,
7//! and move as much of coordination between different parts of
8//! the system in a natural and decomposed way.
9//!
10//! Any event log "follower" can just keep going through
11//! all events and react to ones it is interested in (and understands),
12//! potentially emitting events of its own, and atomically updating persisted
13//! event log position ("cursor") of events that were already processed.
14use std::borrow::Cow;
15use std::str::FromStr;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use fedimint_core::core::{ModuleInstanceId, ModuleKind};
19use fedimint_core::db::{
20    Database, DatabaseKey, DatabaseRecord, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
21    NonCommittable,
22};
23use fedimint_core::encoding::{Decodable, Encodable};
24use fedimint_core::task::{MaybeSend, MaybeSync};
25use fedimint_core::{Amount, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
26use fedimint_logging::LOG_CLIENT_EVENT_LOG;
27use futures::{Future, StreamExt};
28use itertools::Itertools;
29use serde::{Deserialize, Serialize};
30use tokio::sync::{broadcast, watch};
31use tracing::{debug, trace};
32
33/// DB prefixes hardcoded for use of the event log
34/// `fedimint-eventlog` was extracted from `fedimint-client` to help
35/// include/re-use in other part of the code. But fundamentally its role
36/// is to implement event log in the client.
37/// There is currently no way to inject the prefixes to use for db records,
38/// so we use these constants to keep them in sync. Any other app that will
39/// want to store its own even log, will need to use the exact same prefixes,
40/// which in practice should not be a problem.
41pub const DB_KEY_PREFIX_UNORDERED_EVENT_LOG: u8 = 0x3a;
42pub const DB_KEY_PREFIX_EVENT_LOG: u8 = 0x39;
43pub const DB_KEY_PREFIX_EVENT_LOG_TRIMABLE: u8 = 0x41;
44
45/// Minimum age in ID count for trimable events to be deleted
46const TRIMABLE_EVENTLOG_MIN_ID_AGE: u64 = 10_000;
47/// Minimum age in microseconds for trimable events to be deleted (14 days)
48const TRIMABLE_EVENTLOG_MIN_TS_AGE: u64 = 14 * 24 * 60 * 60 * 1_000_000;
49/// Maximum number of entries to trim in one operation
50const TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS: usize = 100_000;
51
52pub enum EventPersistence {
53    /// Not written anywhere, just broadcasted as notification at runtime
54    Transient,
55    /// Persised only to log that gets trimmed
56    Trimable,
57    /// Persisted in both trimmed and untrimmed logs, so potentially
58    /// stored forever.
59    Persistent,
60}
61
62pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
63    const MODULE: Option<ModuleKind>;
64    const KIND: EventKind;
65    const PERSISTENCE: EventPersistence;
66}
67
68/// An counter that resets on every restart, that guarantees that
69/// [`UnordedEventLogId`]s don't conflict with each other.
70static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
71
72/// A self-allocated ID that is mostly ordered
73///
74/// The goal here is to avoid concurrent database transaction
75/// conflicts due the ID allocation. Instead they are picked based on
76/// a time and a counter, so they are mostly but not strictly ordered and
77/// monotonic, and even more imporantly: not contiguous.
78#[derive(Debug, Encodable, Decodable)]
79pub struct UnordedEventLogId {
80    ts_usecs: u64,
81    counter: u64,
82}
83
84impl UnordedEventLogId {
85    fn new() -> Self {
86        Self {
87            ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
88                // This will never happen
89                .unwrap_or(u64::MAX),
90            counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
91        }
92    }
93}
94
95/// Ordered, contiguous ID space, which is easy for event log followers to
96/// track.
97#[derive(
98    Copy,
99    Clone,
100    Debug,
101    Encodable,
102    Decodable,
103    Default,
104    PartialEq,
105    Eq,
106    PartialOrd,
107    Ord,
108    Serialize,
109    Deserialize,
110)]
111pub struct EventLogId(u64);
112
113impl EventLogId {
114    pub const LOG_START: EventLogId = EventLogId(0);
115
116    fn next(self) -> EventLogId {
117        Self(self.0 + 1)
118    }
119
120    pub fn saturating_add(self, rhs: u64) -> EventLogId {
121        Self(self.0.saturating_add(rhs))
122    }
123
124    pub fn saturating_sub(self, rhs: u64) -> EventLogId {
125        Self(self.0.saturating_sub(rhs))
126    }
127}
128
129impl From<EventLogId> for u64 {
130    fn from(value: EventLogId) -> Self {
131        value.0
132    }
133}
134
135impl FromStr for EventLogId {
136    type Err = <u64 as FromStr>::Err;
137
138    fn from_str(s: &str) -> Result<Self, Self::Err> {
139        u64::from_str(s).map(Self)
140    }
141}
142
143#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
144pub struct EventKind(Cow<'static, str>);
145
146impl EventKind {
147    pub const fn from_static(value: &'static str) -> Self {
148        Self(Cow::Borrowed(value))
149    }
150}
151
152impl<'s> From<&'s str> for EventKind {
153    fn from(value: &'s str) -> Self {
154        Self(Cow::Owned(value.to_owned()))
155    }
156}
157
158impl From<String> for EventKind {
159    fn from(value: String) -> Self {
160        Self(Cow::Owned(value))
161    }
162}
163
164#[derive(Debug, Encodable, Decodable, Clone)]
165pub struct UnorderedEventLogEntry {
166    pub flags: u8,
167    pub inner: EventLogEntry,
168}
169
170impl UnorderedEventLogEntry {
171    pub const FLAG_PERSIST: u8 = 1;
172    pub const FLAG_TRIMABLE: u8 = 2;
173
174    fn persist(&self) -> bool {
175        self.flags & Self::FLAG_PERSIST != 0
176    }
177
178    fn trimable(&self) -> bool {
179        self.flags & Self::FLAG_TRIMABLE != 0
180    }
181}
182
183#[derive(Debug, Encodable, Decodable, Clone)]
184pub struct EventLogEntry {
185    /// Type/kind of the event
186    ///
187    /// Any part of the client is free to self-allocate identifier, denoting a
188    /// certain kind of an event. Notably one event kind have multiple
189    /// instances. E.g. "successful wallet deposit" can be an event kind,
190    /// and it can happen multiple times with different payloads.
191    pub kind: EventKind,
192
193    /// To prevent accidental conflicts between `kind`s, a module kind the
194    /// given event kind belong is used as well.
195    ///
196    /// Note: the meaning of this field is mostly about which part of the code
197    /// defines this event kind. Oftentime a core (non-module)-defined event
198    /// will refer in some way to a module. It should use a separate `module_id`
199    /// field in the `payload`, instead of this field.
200    pub module: Option<(ModuleKind, ModuleInstanceId)>,
201
202    /// Timestamp in microseconds after unix epoch
203    ts_usecs: u64,
204
205    /// Event-kind specific payload, typically encoded as a json string for
206    /// flexibility.
207    pub payload: Vec<u8>,
208}
209
210/// Struct used for processing log entries after they have been persisted.
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct PersistedLogEntry {
213    pub event_id: EventLogId,
214    pub event_kind: EventKind,
215    pub module: Option<(ModuleKind, u16)>,
216    pub timestamp: u64,
217    pub value: serde_json::Value,
218}
219
220impl_db_record!(
221    key = UnordedEventLogId,
222    value = UnorderedEventLogEntry,
223    db_prefix = DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
224);
225
226#[derive(Clone, Debug, Encodable, Decodable)]
227pub struct UnorderedEventLogIdPrefixAll;
228
229impl_db_lookup!(
230    key = UnordedEventLogId,
231    query_prefix = UnorderedEventLogIdPrefixAll
232);
233
234#[derive(Clone, Debug, Encodable, Decodable)]
235pub struct EventLogIdPrefixAll;
236
237#[derive(Clone, Debug, Encodable, Decodable)]
238pub struct EventLogIdPrefix(EventLogId);
239
240impl_db_record!(
241    key = EventLogId,
242    value = EventLogEntry,
243    db_prefix = DB_KEY_PREFIX_EVENT_LOG,
244);
245
246impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
247
248impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
249
250#[derive(
251    Copy,
252    Clone,
253    Debug,
254    Encodable,
255    Decodable,
256    Default,
257    PartialEq,
258    Eq,
259    PartialOrd,
260    Ord,
261    Serialize,
262    Deserialize,
263)]
264pub struct EventLogTrimableId(EventLogId);
265
266impl EventLogTrimableId {
267    fn next(&self) -> Self {
268        Self(self.0.next())
269    }
270
271    pub fn saturating_add(self, rhs: u64) -> Self {
272        Self(self.0.saturating_add(rhs))
273    }
274}
275
276impl From<u64> for EventLogTrimableId {
277    fn from(value: u64) -> Self {
278        Self(EventLogId(value))
279    }
280}
281
282#[derive(Clone, Debug, Encodable, Decodable)]
283pub struct EventLogTrimableIdPrefixAll;
284
285#[derive(Clone, Debug, Encodable, Decodable)]
286pub struct EventLogTrimableIdPrefix(EventLogId);
287
288impl_db_record!(
289    key = EventLogTrimableId,
290    value = EventLogEntry,
291    db_prefix = DB_KEY_PREFIX_EVENT_LOG_TRIMABLE,
292);
293
294impl_db_lookup!(
295    key = EventLogTrimableId,
296    query_prefix = EventLogTrimableIdPrefixAll
297);
298
299impl_db_lookup!(
300    key = EventLogTrimableId,
301    query_prefix = EventLogTrimableIdPrefix
302);
303
304#[apply(async_trait_maybe_send!)]
305pub trait DBTransactionEventLogExt {
306    #[allow(clippy::too_many_arguments)]
307    async fn log_event_raw(
308        &mut self,
309        log_ordering_wakeup_tx: watch::Sender<()>,
310        kind: EventKind,
311        module_kind: Option<ModuleKind>,
312        module_id: Option<ModuleInstanceId>,
313        payload: Vec<u8>,
314        persist: EventPersistence,
315    );
316
317    /// Log an event log event
318    ///
319    /// The event will start "unordered", but after it is committed an ordering
320    /// task will be notified to "order" it into a final ordered log.
321    async fn log_event<E>(
322        &mut self,
323        log_ordering_wakeup_tx: watch::Sender<()>,
324        module_id: Option<ModuleInstanceId>,
325        event: E,
326    ) where
327        E: Event + Send,
328    {
329        self.log_event_raw(
330            log_ordering_wakeup_tx,
331            E::KIND,
332            E::MODULE,
333            module_id,
334            serde_json::to_vec(&event).expect("Serialization can't fail"),
335            <E as Event>::PERSISTENCE,
336        )
337        .await;
338    }
339
340    /// Next [`EventLogId`] to use for new ordered events.
341    ///
342    /// Used by ordering task, though might be
343    /// useful to get the current count of events.
344    async fn get_next_event_log_id(&mut self) -> EventLogId;
345
346    /// Next [`EventLogTrimableId`] to use for new ordered trimable events
347    async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId;
348
349    /// Read a part of the event log.
350    async fn get_event_log(
351        &mut self,
352        pos: Option<EventLogId>,
353        limit: u64,
354    ) -> Vec<PersistedLogEntry>;
355
356    async fn get_event_log_trimable(
357        &mut self,
358        pos: Option<EventLogTrimableId>,
359        limit: u64,
360    ) -> Vec<PersistedLogEntry>;
361}
362
363#[apply(async_trait_maybe_send!)]
364impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
365where
366    Cap: Send,
367{
368    async fn log_event_raw(
369        &mut self,
370        log_ordering_wakeup_tx: watch::Sender<()>,
371        kind: EventKind,
372        module_kind: Option<ModuleKind>,
373        module_id: Option<ModuleInstanceId>,
374        payload: Vec<u8>,
375        persist: EventPersistence,
376    ) {
377        assert_eq!(
378            module_kind.is_some(),
379            module_id.is_some(),
380            "Events of modules must have module_id set"
381        );
382
383        let unordered_id = UnordedEventLogId::new();
384        trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
385
386        if self
387            .insert_entry(
388                &unordered_id,
389                &UnorderedEventLogEntry {
390                    flags: match persist {
391                        EventPersistence::Transient => 0,
392                        EventPersistence::Trimable => UnorderedEventLogEntry::FLAG_TRIMABLE,
393                        EventPersistence::Persistent => UnorderedEventLogEntry::FLAG_PERSIST,
394                    },
395                    inner: EventLogEntry {
396                        kind,
397                        module: module_kind.map(|kind| (kind, module_id.unwrap())),
398                        ts_usecs: unordered_id.ts_usecs,
399                        payload,
400                    },
401                },
402            )
403            .await
404            .is_some()
405        {
406            panic!("Trying to overwrite event in the client event log");
407        }
408        self.on_commit(move || {
409            log_ordering_wakeup_tx.send_replace(());
410        });
411    }
412
413    async fn get_next_event_log_id(&mut self) -> EventLogId {
414        self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
415            .await
416            .next()
417            .await
418            .map(|(k, _v)| k.next())
419            .unwrap_or_default()
420    }
421
422    async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId {
423        EventLogTrimableId(
424            self.find_by_prefix_sorted_descending(&EventLogTrimableIdPrefixAll)
425                .await
426                .next()
427                .await
428                .map(|(k, _v)| k.0.next())
429                .unwrap_or_default(),
430        )
431    }
432
433    async fn get_event_log(
434        &mut self,
435        pos: Option<EventLogId>,
436        limit: u64,
437    ) -> Vec<PersistedLogEntry> {
438        let pos = pos.unwrap_or_default();
439        self.find_by_range(pos..pos.saturating_add(limit))
440            .await
441            .map(|(k, v)| PersistedLogEntry {
442                event_id: k,
443                event_kind: v.kind,
444                module: v.module,
445                timestamp: v.ts_usecs,
446                value: serde_json::from_slice(&v.payload).unwrap_or_default(),
447            })
448            .collect()
449            .await
450    }
451
452    async fn get_event_log_trimable(
453        &mut self,
454        pos: Option<EventLogTrimableId>,
455        limit: u64,
456    ) -> Vec<PersistedLogEntry> {
457        let pos = pos.unwrap_or_default();
458        self.find_by_range(pos..pos.saturating_add(limit))
459            .await
460            .map(|(k, v)| PersistedLogEntry {
461                event_id: k.0,
462                event_kind: v.kind,
463                module: v.module,
464                timestamp: v.ts_usecs,
465                value: serde_json::from_slice(&v.payload).unwrap_or_default(),
466            })
467            .collect()
468            .await
469    }
470}
471
472/// Trims old entries from the trimable event log
473async fn trim_trimable_log(db: &Database, current_time_usecs: u64) {
474    let mut dbtx = db.begin_transaction().await;
475
476    let current_trimable_id = dbtx.get_next_event_log_trimable_id().await;
477    let min_id_threshold = current_trimable_id
478        .0
479        .saturating_sub(TRIMABLE_EVENTLOG_MIN_ID_AGE);
480    let min_ts_threshold = current_time_usecs.saturating_sub(TRIMABLE_EVENTLOG_MIN_TS_AGE);
481
482    let entries_to_delete: Vec<_> = dbtx
483        .find_by_prefix(&EventLogTrimableIdPrefixAll)
484        .await
485        .take_while(|(id, entry)| {
486            let id_old_enough = id.0 <= min_id_threshold;
487            let ts_old_enough = entry.ts_usecs <= min_ts_threshold;
488
489            // Continue while both conditions are met
490            async move { id_old_enough && ts_old_enough }
491        })
492        .take(TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS)
493        .map(|(id, _entry)| id)
494        .collect()
495        .await;
496
497    for id in &entries_to_delete {
498        dbtx.remove_entry(id).await;
499    }
500
501    dbtx.commit_tx().await;
502}
503
504/// The code that handles new unordered events and rewriters them fully ordered
505/// into the final event log.
506pub async fn run_event_log_ordering_task(
507    db: Database,
508    mut log_ordering_task_wakeup: watch::Receiver<()>,
509    log_event_added: watch::Sender<()>,
510    log_event_added_transient: broadcast::Sender<EventLogEntry>,
511) {
512    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
513
514    let current_time_usecs =
515        u64::try_from(fedimint_core::time::duration_since_epoch().as_micros()).unwrap_or(u64::MAX);
516    trim_trimable_log(&db, current_time_usecs).await;
517
518    let mut next_entry_id = db
519        .begin_transaction_nc()
520        .await
521        .get_next_event_log_id()
522        .await;
523    let mut next_entry_id_trimable = db
524        .begin_transaction_nc()
525        .await
526        .get_next_event_log_trimable_id()
527        .await;
528
529    loop {
530        let mut dbtx = db.begin_transaction().await;
531
532        let unordered_events = dbtx
533            .find_by_prefix(&UnorderedEventLogIdPrefixAll)
534            .await
535            .collect::<Vec<_>>()
536            .await;
537        trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
538
539        for (unordered_id, entry) in &unordered_events {
540            assert!(
541                dbtx.remove_entry(unordered_id).await.is_some(),
542                "Must never fail to remove entry"
543            );
544            if entry.persist() {
545                // Non-trimable events get persisted in both the default event log
546                // and trimable event log
547                if !entry.trimable() {
548                    assert!(
549                        dbtx.insert_entry(&next_entry_id, &entry.inner)
550                            .await
551                            .is_none(),
552                        "Must never overwrite existing event"
553                    );
554                    trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
555                    next_entry_id = next_entry_id.next();
556                }
557
558                // Trimable events get persisted only in trimable log
559                assert!(
560                    dbtx.insert_entry(&next_entry_id_trimable, &entry.inner)
561                        .await
562                        .is_none(),
563                    "Must never overwrite existing event"
564                );
565                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
566                next_entry_id_trimable = next_entry_id_trimable.next();
567            } else {
568                // Transient events don't get persisted at all
569                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
570                dbtx.on_commit({
571                    let log_event_added_transient = log_event_added_transient.clone();
572                    let entry = entry.inner.clone();
573
574                    move || {
575                        // we ignore the no-subscribers
576                        let _ = log_event_added_transient.send(entry);
577                    }
578                });
579            }
580        }
581
582        // This thread is the only thread deleting already existing element of unordered
583        // log and inserting new elements into ordered log, so it should never
584        // fail to commit.
585        dbtx.commit_tx().await;
586        if !unordered_events.is_empty() {
587            log_event_added.send_replace(());
588        }
589
590        trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
591        if log_ordering_task_wakeup.changed().await.is_err() {
592            break;
593        }
594    }
595
596    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
597}
598
599pub async fn handle_events<F, R, K>(
600    db: Database,
601    pos_key: &K,
602    mut log_event_added: watch::Receiver<()>,
603    call_fn: F,
604) -> anyhow::Result<()>
605where
606    K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
607    K: DatabaseRecord<Value = EventLogId>,
608    F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
609    R: Future<Output = anyhow::Result<()>>,
610{
611    let mut next_key: EventLogId = db
612        .begin_transaction_nc()
613        .await
614        .get_value(pos_key)
615        .await
616        .unwrap_or_default();
617
618    trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
619
620    loop {
621        let mut dbtx = db.begin_transaction().await;
622
623        match dbtx.get_value(&next_key).await {
624            Some(event) => {
625                (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
626
627                next_key = next_key.next();
628                dbtx.insert_entry(pos_key, &next_key).await;
629
630                dbtx.commit_tx().await;
631            }
632            _ => {
633                if log_event_added.changed().await.is_err() {
634                    break Ok(());
635                }
636            }
637        }
638    }
639}
640
641pub async fn handle_trimable_events<F, R, K>(
642    db: Database,
643    pos_key: &K,
644    mut log_event_added: watch::Receiver<()>,
645    call_fn: F,
646) -> anyhow::Result<()>
647where
648    K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
649    K: DatabaseRecord<Value = EventLogTrimableId>,
650    F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
651    R: Future<Output = anyhow::Result<()>>,
652{
653    let mut next_key: EventLogTrimableId = db
654        .begin_transaction_nc()
655        .await
656        .get_value(pos_key)
657        .await
658        .unwrap_or_default();
659
660    trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling trimable events");
661
662    loop {
663        let mut dbtx = db.begin_transaction().await;
664
665        match dbtx.get_value(&next_key).await {
666            Some(event) => {
667                (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
668
669                next_key = next_key.next();
670                dbtx.insert_entry(pos_key, &next_key).await;
671
672                dbtx.commit_tx().await;
673            }
674            _ => {
675                if log_event_added.changed().await.is_err() {
676                    break Ok(());
677                }
678            }
679        }
680    }
681}
682
683/// Filters the `PersistedLogEntries` by the `EventKind` and
684/// `ModuleKind`.
685pub fn filter_events_by_kind<'a, I>(
686    all_events: I,
687    module_kind: ModuleKind,
688    event_kind: EventKind,
689) -> impl Iterator<Item = &'a PersistedLogEntry> + 'a
690where
691    I: IntoIterator<Item = &'a PersistedLogEntry> + 'a,
692{
693    all_events.into_iter().filter(move |e| {
694        if let Some((m, _)) = &e.module {
695            e.event_kind == event_kind && *m == module_kind
696        } else {
697            false
698        }
699    })
700}
701
702/// Joins two sets of events on a predicate.
703///
704/// This function computes a "nested loop join" by first computing the cross
705/// product of the start event vector and the success/failure event vectors. The
706/// resulting cartesian product is then filtered according to the join predicate
707/// supplied in the parameters.
708///
709/// This function is intended for small data sets. If the data set relations
710/// grow, this function should implement a different join algorithm or be moved
711/// out of the gateway.
712pub fn join_events<'a, L, R, Res>(
713    events_l: &'a [&PersistedLogEntry],
714    events_r: &'a [&PersistedLogEntry],
715    predicate: impl Fn(L, R, u64) -> Option<Res> + 'a,
716) -> impl Iterator<Item = Res> + 'a
717where
718    L: Event,
719    R: Event,
720{
721    events_l
722        .iter()
723        .cartesian_product(events_r)
724        .filter_map(move |(l, r)| {
725            if let Some(latency) = r.timestamp.checked_sub(l.timestamp) {
726                let event_l: L =
727                    serde_json::from_value(l.value.clone()).expect("could not parse JSON");
728                let event_r: R =
729                    serde_json::from_value(r.value.clone()).expect("could not parse JSON");
730                predicate(event_l, event_r, latency)
731            } else {
732                None
733            }
734        })
735}
736
737/// Helper struct for storing computed data about outgoing and incoming
738/// payments.
739#[derive(Debug, Default)]
740pub struct StructuredPaymentEvents {
741    pub latencies: Vec<u64>,
742    pub fees: Vec<Amount>,
743    pub latencies_failure: Vec<u64>,
744}
745
746impl StructuredPaymentEvents {
747    pub fn new(
748        success_stats: &[(u64, Amount)],
749        failure_stats: Vec<u64>,
750    ) -> StructuredPaymentEvents {
751        let mut events = StructuredPaymentEvents {
752            latencies: success_stats.iter().map(|(l, _)| *l).collect(),
753            fees: success_stats.iter().map(|(_, f)| *f).collect(),
754            latencies_failure: failure_stats,
755        };
756        events.sort();
757        events
758    }
759
760    /// Combines this `StructuredPaymentEvents` with the `other`
761    /// `StructuredPaymentEvents` by appending all of the internal vectors.
762    pub fn combine(&mut self, other: &mut StructuredPaymentEvents) {
763        self.latencies.append(&mut other.latencies);
764        self.fees.append(&mut other.fees);
765        self.latencies_failure.append(&mut other.latencies_failure);
766        self.sort();
767    }
768
769    /// Sorts this `StructuredPaymentEvents` by sorting all of the internal
770    /// vectors.
771    fn sort(&mut self) {
772        self.latencies.sort_unstable();
773        self.fees.sort_unstable();
774        self.latencies_failure.sort_unstable();
775    }
776}
777
778#[cfg(test)]
779mod tests;