fedimint_eventlog/
lib.rs

1#![allow(clippy::needless_lifetimes)]
2
3//! Client Event Log
4//!
5//! The goal here is to maintain a single, ordered, append only
6//! log of all important client-side events: low or high level,
7//! and move as much of coordination between different parts of
8//! the system in a natural and decomposed way.
9//!
10//! Any event log "follower" can just keep going through
11//! all events and react to ones it is interested in (and understands),
12//! potentially emitting events of its own, and atomically updating persisted
13//! event log position ("cursor") of events that were already processed.
14use std::borrow::Cow;
15use std::str::FromStr;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Duration;
18use std::{fmt, ops};
19
20use fedimint_core::core::{ModuleInstanceId, ModuleKind};
21use fedimint_core::db::{
22    Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, NonCommittable,
23};
24use fedimint_core::encoding::{Decodable, Encodable};
25use fedimint_core::{Amount, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
26use fedimint_logging::LOG_CLIENT_EVENT_LOG;
27use futures::{Future, StreamExt};
28use itertools::Itertools;
29use serde::{Deserialize, Serialize};
30use tokio::sync::{broadcast, watch};
31use tracing::{debug, trace};
32
33/// DB prefixes hardcoded for use of the event log
34/// `fedimint-eventlog` was extracted from `fedimint-client` to help
35/// include/re-use in other part of the code. But fundamentally its role
36/// is to implement event log in the client.
37/// There is currently no way to inject the prefixes to use for db records,
38/// so we use these constants to keep them in sync. Any other app that will
39/// want to store its own even log, will need to use the exact same prefixes,
40/// which in practice should not be a problem.
41pub const DB_KEY_PREFIX_UNORDERED_EVENT_LOG: u8 = 0x3a;
42pub const DB_KEY_PREFIX_EVENT_LOG: u8 = 0x39;
43pub const DB_KEY_PREFIX_EVENT_LOG_TRIMABLE: u8 = 0x41;
44
45/// Minimum age in ID count for trimable events to be deleted
46const TRIMABLE_EVENTLOG_MIN_ID_AGE: u64 = 10_000;
47/// Minimum age in microseconds for trimable events to be deleted (14 days)
48const TRIMABLE_EVENTLOG_MIN_TS_AGE: u64 = 14 * 24 * 60 * 60 * 1_000_000;
49/// Maximum number of entries to trim in one operation
50const TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS: usize = 100_000;
51
52/// Type of persistence the [`Event`] uses.
53///
54/// As a compromise between richness of events and amount of data to store
55/// Fedimint maintains two event logs in parallel:
56///
57/// * untrimable
58/// * trimable
59///
60/// Untrimable log will append only a subset of events that are infrequent,
61/// but important enough to be forever useful, e.g. for processing or debugging
62/// of historical events.
63///
64/// Trimable log will append all persistent events, but will over time remove
65/// the oldest ones. It will always retain enough events, that no log follower
66/// actively processing it should ever miss any event, but restarting processing
67/// from the start (index 0) can't be used for processing historical data.
68///
69/// Notably the positions in both logs are not interchangeable, so they use
70/// different types.
71///
72/// On top of it, some events are transient and are not persisted at all,
73/// and emitted only at runtime.
74///
75/// Consult [`Event::PERSISTENCE`] to know which event uses which persistence.
76pub enum EventPersistence {
77    /// Not written anywhere, just broadcasted as notification at runtime
78    Transient,
79    /// Persised only to log that gets trimmed
80    Trimable,
81    /// Persisted in both trimmed and untrimmed logs, so potentially
82    /// stored forever.
83    Persistent,
84}
85
86pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
87    const MODULE: Option<ModuleKind>;
88    const KIND: EventKind;
89    const PERSISTENCE: EventPersistence;
90}
91
92/// An counter that resets on every restart, that guarantees that
93/// [`UnordedEventLogId`]s don't conflict with each other.
94static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
95
96/// A self-allocated ID that is mostly ordered
97///
98/// The goal here is to avoid concurrent database transaction
99/// conflicts due the ID allocation. Instead they are picked based on
100/// a time and a counter, so they are mostly but not strictly ordered and
101/// monotonic, and even more importantly: not contiguous.
102#[derive(Debug, Encodable, Decodable)]
103pub struct UnordedEventLogId {
104    ts_usecs: u64,
105    counter: u64,
106}
107
108impl UnordedEventLogId {
109    fn new() -> Self {
110        Self {
111            ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
112                // This will never happen
113                .unwrap_or(u64::MAX),
114            counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
115        }
116    }
117}
118
119/// Ordered, contiguous ID space, which is easy for event log followers to
120/// track.
121#[derive(
122    Copy,
123    Clone,
124    Debug,
125    Encodable,
126    Decodable,
127    Default,
128    PartialEq,
129    Eq,
130    PartialOrd,
131    Ord,
132    Serialize,
133    Deserialize,
134)]
135pub struct EventLogId(u64);
136
137impl EventLogId {
138    pub const LOG_START: EventLogId = EventLogId(0);
139
140    fn next(self) -> EventLogId {
141        Self(self.0 + 1)
142    }
143
144    pub fn saturating_add(self, rhs: u64) -> EventLogId {
145        Self(self.0.saturating_add(rhs))
146    }
147
148    pub fn saturating_sub(self, rhs: u64) -> EventLogId {
149        Self(self.0.saturating_sub(rhs))
150    }
151
152    pub fn checked_sub(self, rhs: u64) -> Option<EventLogId> {
153        self.0.checked_sub(rhs).map(EventLogId)
154    }
155}
156
157impl From<EventLogId> for u64 {
158    fn from(value: EventLogId) -> Self {
159        value.0
160    }
161}
162
163impl FromStr for EventLogId {
164    type Err = <u64 as FromStr>::Err;
165
166    fn from_str(s: &str) -> Result<Self, Self::Err> {
167        u64::from_str(s).map(Self)
168    }
169}
170
171impl fmt::Display for EventLogId {
172    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173        write!(f, "{}", self.0)
174    }
175}
176
177#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
178pub struct EventKind(Cow<'static, str>);
179
180impl EventKind {
181    pub const fn from_static(value: &'static str) -> Self {
182        Self(Cow::Borrowed(value))
183    }
184}
185
186impl<'s> From<&'s str> for EventKind {
187    fn from(value: &'s str) -> Self {
188        Self(Cow::Owned(value.to_owned()))
189    }
190}
191
192impl From<String> for EventKind {
193    fn from(value: String) -> Self {
194        Self(Cow::Owned(value))
195    }
196}
197
198impl fmt::Display for EventKind {
199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200        f.write_str(&self.0)
201    }
202}
203
204#[derive(Debug, Encodable, Decodable, Clone)]
205pub struct UnorderedEventLogEntry {
206    pub flags: u8,
207    pub inner: EventLogEntry,
208}
209
210impl UnorderedEventLogEntry {
211    pub const FLAG_PERSIST: u8 = 1;
212    pub const FLAG_TRIMABLE: u8 = 2;
213
214    fn persist(&self) -> bool {
215        self.flags & Self::FLAG_PERSIST != 0
216    }
217
218    fn trimable(&self) -> bool {
219        self.flags & Self::FLAG_TRIMABLE != 0
220    }
221}
222
223#[derive(Debug, Encodable, Decodable, Clone)]
224pub struct EventLogEntry {
225    /// Type/kind of the event
226    ///
227    /// Any part of the client is free to self-allocate identifier, denoting a
228    /// certain kind of an event. Notably one event kind have multiple
229    /// instances. E.g. "successful wallet deposit" can be an event kind,
230    /// and it can happen multiple times with different payloads.
231    pub kind: EventKind,
232
233    /// To prevent accidental conflicts between `kind`s, a module kind the
234    /// given event kind belong is used as well.
235    ///
236    /// Note: the meaning of this field is mostly about which part of the code
237    /// defines this event kind. Oftentime a core (non-module)-defined event
238    /// will refer in some way to a module. It should use a separate `module_id`
239    /// field in the `payload`, instead of this field.
240    pub module: Option<(ModuleKind, ModuleInstanceId)>,
241
242    /// Timestamp in microseconds after unix epoch
243    pub ts_usecs: u64,
244
245    /// Event-kind specific payload, typically encoded as a json string for
246    /// flexibility.
247    pub payload: Vec<u8>,
248}
249
250impl EventLogEntry {
251    pub fn module_kind(&self) -> Option<&ModuleKind> {
252        self.module.as_ref().map(|m| &m.0)
253    }
254
255    pub fn module_id(&self) -> Option<ModuleInstanceId> {
256        self.module.as_ref().map(|m| m.1)
257    }
258
259    /// Get the event payload as typed value
260    pub fn to_event<E>(&self) -> Option<E>
261    where
262        E: Event,
263    {
264        serde_json::from_slice(&self.payload).ok()
265    }
266}
267
268/// An `EventLogEntry` that was already persisted (so has an id)
269#[derive(Debug, Clone)]
270pub struct PersistedLogEntry {
271    id: EventLogId,
272    inner: EventLogEntry,
273}
274
275impl Serialize for PersistedLogEntry {
276    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
277    where
278        S: serde::Serializer,
279    {
280        use serde::ser::SerializeStruct;
281
282        let mut state = serializer.serialize_struct("PersistedLogEntry", 5)?;
283        state.serialize_field("id", &self.id)?;
284        state.serialize_field("kind", &self.inner.kind)?;
285        state.serialize_field("module", &self.inner.module)?;
286        state.serialize_field("ts_usecs", &self.inner.ts_usecs)?;
287
288        // Try to deserialize payload as JSON, fall back to hex encoding
289        let payload_value: serde_json::Value = serde_json::from_slice(&self.inner.payload)
290            .unwrap_or_else(|_| serde_json::Value::String(hex::encode(&self.inner.payload)));
291        state.serialize_field("payload", &payload_value)?;
292
293        state.end()
294    }
295}
296
297impl<'de> Deserialize<'de> for PersistedLogEntry {
298    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
299    where
300        D: serde::Deserializer<'de>,
301    {
302        use serde::de::{self, MapAccess, Visitor};
303
304        #[derive(Deserialize)]
305        #[serde(field_identifier, rename_all = "snake_case")]
306        enum Field {
307            Id,
308            Kind,
309            Module,
310            TsUsecs,
311            Payload,
312        }
313
314        struct PersistedLogEntryVisitor;
315
316        impl<'de> Visitor<'de> for PersistedLogEntryVisitor {
317            type Value = PersistedLogEntry;
318
319            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
320                formatter.write_str("struct PersistedLogEntry")
321            }
322
323            fn visit_map<V>(self, mut map: V) -> Result<PersistedLogEntry, V::Error>
324            where
325                V: MapAccess<'de>,
326            {
327                let mut id = None;
328                let mut kind = None;
329                let mut module = None;
330                let mut ts_usecs = None;
331                let mut payload = None;
332
333                while let Some(key) = map.next_key()? {
334                    match key {
335                        Field::Id => {
336                            if id.is_some() {
337                                return Err(de::Error::duplicate_field("id"));
338                            }
339                            id = Some(map.next_value()?);
340                        }
341                        Field::Kind => {
342                            if kind.is_some() {
343                                return Err(de::Error::duplicate_field("kind"));
344                            }
345                            kind = Some(map.next_value()?);
346                        }
347                        Field::Module => {
348                            if module.is_some() {
349                                return Err(de::Error::duplicate_field("module"));
350                            }
351                            module = Some(map.next_value()?);
352                        }
353                        Field::TsUsecs => {
354                            if ts_usecs.is_some() {
355                                return Err(de::Error::duplicate_field("ts_usecs"));
356                            }
357                            ts_usecs = Some(map.next_value()?);
358                        }
359                        Field::Payload => {
360                            if payload.is_some() {
361                                return Err(de::Error::duplicate_field("payload"));
362                            }
363                            let value: serde_json::Value = map.next_value()?;
364                            payload = Some(serde_json::to_vec(&value).map_err(de::Error::custom)?);
365                        }
366                    }
367                }
368
369                let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
370                let kind = kind.ok_or_else(|| de::Error::missing_field("kind"))?;
371                let module = module.ok_or_else(|| de::Error::missing_field("module"))?;
372                let ts_usecs = ts_usecs.ok_or_else(|| de::Error::missing_field("ts_usecs"))?;
373                let payload = payload.ok_or_else(|| de::Error::missing_field("payload"))?;
374
375                Ok(PersistedLogEntry {
376                    id,
377                    inner: EventLogEntry {
378                        kind,
379                        module,
380                        ts_usecs,
381                        payload,
382                    },
383                })
384            }
385        }
386
387        const FIELDS: &[&str] = &["id", "kind", "module", "ts_usecs", "payload"];
388        deserializer.deserialize_struct("PersistedLogEntry", FIELDS, PersistedLogEntryVisitor)
389    }
390}
391
392impl PersistedLogEntry {
393    pub fn id(&self) -> EventLogId {
394        self.id
395    }
396
397    pub fn as_raw(&self) -> &EventLogEntry {
398        &self.inner
399    }
400}
401
402impl ops::Deref for PersistedLogEntry {
403    type Target = EventLogEntry;
404
405    fn deref(&self) -> &Self::Target {
406        &self.inner
407    }
408}
409
410impl_db_record!(
411    key = UnordedEventLogId,
412    value = UnorderedEventLogEntry,
413    db_prefix = DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
414);
415
416#[derive(Clone, Debug, Encodable, Decodable)]
417pub struct UnorderedEventLogIdPrefixAll;
418
419impl_db_lookup!(
420    key = UnordedEventLogId,
421    query_prefix = UnorderedEventLogIdPrefixAll
422);
423
424#[derive(Clone, Debug, Encodable, Decodable)]
425pub struct EventLogIdPrefixAll;
426
427#[derive(Clone, Debug, Encodable, Decodable)]
428pub struct EventLogIdPrefix(EventLogId);
429
430impl_db_record!(
431    key = EventLogId,
432    value = EventLogEntry,
433    db_prefix = DB_KEY_PREFIX_EVENT_LOG,
434);
435
436impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
437
438impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
439
440#[derive(
441    Copy,
442    Clone,
443    Debug,
444    Encodable,
445    Decodable,
446    Default,
447    PartialEq,
448    Eq,
449    PartialOrd,
450    Ord,
451    Serialize,
452    Deserialize,
453)]
454pub struct EventLogTrimableId(EventLogId);
455
456impl EventLogTrimableId {
457    fn next(&self) -> Self {
458        Self(self.0.next())
459    }
460
461    pub fn saturating_add(self, rhs: u64) -> Self {
462        Self(self.0.saturating_add(rhs))
463    }
464}
465
466impl From<u64> for EventLogTrimableId {
467    fn from(value: u64) -> Self {
468        Self(EventLogId(value))
469    }
470}
471
472#[derive(Clone, Debug, Encodable, Decodable)]
473pub struct EventLogTrimableIdPrefixAll;
474
475#[derive(Clone, Debug, Encodable, Decodable)]
476pub struct EventLogTrimableIdPrefix(EventLogId);
477
478impl_db_record!(
479    key = EventLogTrimableId,
480    value = EventLogEntry,
481    db_prefix = DB_KEY_PREFIX_EVENT_LOG_TRIMABLE,
482);
483
484impl_db_lookup!(
485    key = EventLogTrimableId,
486    query_prefix = EventLogTrimableIdPrefixAll
487);
488
489impl_db_lookup!(
490    key = EventLogTrimableId,
491    query_prefix = EventLogTrimableIdPrefix
492);
493
494#[apply(async_trait_maybe_send!)]
495pub trait DBTransactionEventLogExt {
496    #[allow(clippy::too_many_arguments)]
497    async fn log_event_raw(
498        &mut self,
499        log_ordering_wakeup_tx: watch::Sender<()>,
500        kind: EventKind,
501        module_kind: Option<ModuleKind>,
502        module_id: Option<ModuleInstanceId>,
503        payload: Vec<u8>,
504        persist: EventPersistence,
505    );
506
507    /// Log an event log event
508    ///
509    /// The event will start "unordered", but after it is committed an ordering
510    /// task will be notified to "order" it into a final ordered log.
511    async fn log_event<E>(
512        &mut self,
513        log_ordering_wakeup_tx: watch::Sender<()>,
514        module_id: Option<ModuleInstanceId>,
515        event: E,
516    ) where
517        E: Event + Send,
518    {
519        self.log_event_raw(
520            log_ordering_wakeup_tx,
521            E::KIND,
522            E::MODULE,
523            module_id,
524            serde_json::to_vec(&event).expect("Serialization can't fail"),
525            <E as Event>::PERSISTENCE,
526        )
527        .await;
528    }
529
530    /// Next [`EventLogId`] to use for new ordered events.
531    ///
532    /// Used by ordering task, though might be
533    /// useful to get the current count of events.
534    async fn get_next_event_log_id(&mut self) -> EventLogId;
535
536    /// Next [`EventLogTrimableId`] to use for new ordered trimable events
537    async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId;
538
539    /// Read a part of the event log.
540    async fn get_event_log(
541        &mut self,
542        pos: Option<EventLogId>,
543        limit: u64,
544    ) -> Vec<PersistedLogEntry>;
545
546    async fn get_event_log_trimable(
547        &mut self,
548        pos: Option<EventLogTrimableId>,
549        limit: u64,
550    ) -> Vec<PersistedLogEntry>;
551}
552
553#[apply(async_trait_maybe_send!)]
554impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
555where
556    Cap: Send,
557{
558    async fn log_event_raw(
559        &mut self,
560        log_ordering_wakeup_tx: watch::Sender<()>,
561        kind: EventKind,
562        module_kind: Option<ModuleKind>,
563        module_id: Option<ModuleInstanceId>,
564        payload: Vec<u8>,
565        persist: EventPersistence,
566    ) {
567        assert_eq!(
568            module_kind.is_some(),
569            module_id.is_some(),
570            "Events of modules must have module_id set"
571        );
572
573        let unordered_id = UnordedEventLogId::new();
574        trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
575
576        if self
577            .insert_entry(
578                &unordered_id,
579                &UnorderedEventLogEntry {
580                    flags: match persist {
581                        EventPersistence::Transient => 0,
582                        EventPersistence::Trimable => UnorderedEventLogEntry::FLAG_TRIMABLE,
583                        EventPersistence::Persistent => UnorderedEventLogEntry::FLAG_PERSIST,
584                    },
585                    inner: EventLogEntry {
586                        kind,
587                        module: module_kind.map(|kind| (kind, module_id.unwrap())),
588                        ts_usecs: unordered_id.ts_usecs,
589                        payload,
590                    },
591                },
592            )
593            .await
594            .is_some()
595        {
596            panic!("Trying to overwrite event in the client event log");
597        }
598        self.on_commit(move || {
599            log_ordering_wakeup_tx.send_replace(());
600        });
601    }
602
603    async fn get_next_event_log_id(&mut self) -> EventLogId {
604        self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
605            .await
606            .next()
607            .await
608            .map(|(k, _v)| k.next())
609            .unwrap_or_default()
610    }
611
612    async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId {
613        EventLogTrimableId(
614            self.find_by_prefix_sorted_descending(&EventLogTrimableIdPrefixAll)
615                .await
616                .next()
617                .await
618                .map(|(k, _v)| k.0.next())
619                .unwrap_or_default(),
620        )
621    }
622
623    async fn get_event_log(
624        &mut self,
625        pos: Option<EventLogId>,
626        limit: u64,
627    ) -> Vec<PersistedLogEntry> {
628        let pos = pos.unwrap_or_default();
629        self.find_by_range(pos..pos.saturating_add(limit))
630            .await
631            .map(|(k, v)| PersistedLogEntry { id: k, inner: v })
632            .collect()
633            .await
634    }
635
636    async fn get_event_log_trimable(
637        &mut self,
638        pos: Option<EventLogTrimableId>,
639        limit: u64,
640    ) -> Vec<PersistedLogEntry> {
641        let pos = pos.unwrap_or_default();
642        self.find_by_range(pos..pos.saturating_add(limit))
643            .await
644            .map(|(k, v)| PersistedLogEntry { id: k.0, inner: v })
645            .collect()
646            .await
647    }
648}
649
650/// Trims old entries from the trimable event log
651async fn trim_trimable_log(db: &Database, current_time_usecs: u64) {
652    let mut dbtx = db.begin_transaction().await;
653
654    let current_trimable_id = dbtx.get_next_event_log_trimable_id().await;
655    let min_id_threshold = current_trimable_id
656        .0
657        .saturating_sub(TRIMABLE_EVENTLOG_MIN_ID_AGE);
658    let min_ts_threshold = current_time_usecs.saturating_sub(TRIMABLE_EVENTLOG_MIN_TS_AGE);
659
660    let entries_to_delete: Vec<_> = dbtx
661        .find_by_prefix(&EventLogTrimableIdPrefixAll)
662        .await
663        .take_while(|(id, entry)| {
664            let id_old_enough = id.0 <= min_id_threshold;
665            let ts_old_enough = entry.ts_usecs <= min_ts_threshold;
666
667            // Continue while both conditions are met
668            async move { id_old_enough && ts_old_enough }
669        })
670        .take(TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS)
671        .map(|(id, _entry)| id)
672        .collect()
673        .await;
674
675    for id in &entries_to_delete {
676        dbtx.remove_entry(id).await;
677    }
678
679    dbtx.commit_tx().await;
680}
681
682/// The code that handles new unordered events and rewriters them fully ordered
683/// into the final event log.
684pub async fn run_event_log_ordering_task(
685    db: Database,
686    mut log_ordering_task_wakeup: watch::Receiver<()>,
687    log_event_added: watch::Sender<()>,
688    log_event_added_transient: broadcast::Sender<EventLogEntry>,
689) {
690    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
691
692    let current_time_usecs =
693        u64::try_from(fedimint_core::time::duration_since_epoch().as_micros()).unwrap_or(u64::MAX);
694    trim_trimable_log(&db, current_time_usecs).await;
695
696    let mut next_entry_id = db
697        .begin_transaction_nc()
698        .await
699        .get_next_event_log_id()
700        .await;
701    let mut next_entry_id_trimable = db
702        .begin_transaction_nc()
703        .await
704        .get_next_event_log_trimable_id()
705        .await;
706
707    loop {
708        let mut dbtx = db.begin_transaction().await;
709
710        let unordered_events = dbtx
711            .find_by_prefix(&UnorderedEventLogIdPrefixAll)
712            .await
713            .collect::<Vec<_>>()
714            .await;
715        trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
716
717        for (unordered_id, entry) in &unordered_events {
718            assert!(
719                dbtx.remove_entry(unordered_id).await.is_some(),
720                "Must never fail to remove entry"
721            );
722            if entry.persist() {
723                // Non-trimable events get persisted in both the default event log
724                // and trimable event log
725                if !entry.trimable() {
726                    assert!(
727                        dbtx.insert_entry(&next_entry_id, &entry.inner)
728                            .await
729                            .is_none(),
730                        "Must never overwrite existing event"
731                    );
732                    trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
733                    next_entry_id = next_entry_id.next();
734                }
735
736                // Trimable events get persisted only in trimable log
737                assert!(
738                    dbtx.insert_entry(&next_entry_id_trimable, &entry.inner)
739                        .await
740                        .is_none(),
741                    "Must never overwrite existing event"
742                );
743                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
744                next_entry_id_trimable = next_entry_id_trimable.next();
745            } else {
746                // Transient events don't get persisted at all
747                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
748                dbtx.on_commit({
749                    let log_event_added_transient = log_event_added_transient.clone();
750                    let entry = entry.inner.clone();
751
752                    move || {
753                        // we ignore the no-subscribers
754                        let _ = log_event_added_transient.send(entry);
755                    }
756                });
757            }
758        }
759
760        // This thread is the only thread deleting already existing element of unordered
761        // log and inserting new elements into ordered log, so it should never
762        // fail to commit.
763        dbtx.commit_tx().await;
764        if !unordered_events.is_empty() {
765            log_event_added.send_replace(());
766        }
767
768        trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
769        if log_ordering_task_wakeup.changed().await.is_err() {
770            break;
771        }
772    }
773
774    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
775}
776
777/// Persistent tracker of a position in the event log
778///
779/// During processing of event log the downstream consumer needs to
780/// keep track of which event were processed already. It needs to do it
781/// atomically and persist it so event in the presence of crashes no
782/// event is ever missed or processed twice.
783///
784/// This trait allows abstracting away where and how is such position stored,
785/// e.g. which key exactly is used, in what prefixed namespace etc.
786///
787/// ## Trimmable vs Non-Trimable log
788///
789/// See [`EventPersistence`]
790#[apply(async_trait_maybe_send!)]
791pub trait EventLogNonTrimableTracker {
792    // Store position in the event log
793    async fn store(
794        &mut self,
795        dbtx: &mut DatabaseTransaction<NonCommittable>,
796        pos: EventLogId,
797    ) -> anyhow::Result<()>;
798
799    /// Load the last previous stored position (or None if never stored)
800    async fn load(
801        &mut self,
802        dbtx: &mut DatabaseTransaction<NonCommittable>,
803    ) -> anyhow::Result<Option<EventLogId>>;
804}
805pub type DynEventLogTracker = Box<dyn EventLogNonTrimableTracker>;
806
807/// Like [`EventLogNonTrimableTracker`] but for trimable event log
808#[apply(async_trait_maybe_send!)]
809pub trait EventLogTrimableTracker {
810    // Store position in the event log
811    async fn store(
812        &mut self,
813        dbtx: &mut DatabaseTransaction<NonCommittable>,
814        pos: EventLogTrimableId,
815    ) -> anyhow::Result<()>;
816
817    /// Load the last previous stored position (or None if never stored)
818    async fn load(
819        &mut self,
820        dbtx: &mut DatabaseTransaction<NonCommittable>,
821    ) -> anyhow::Result<Option<EventLogTrimableId>>;
822}
823pub type DynEventLogTrimableTracker = Box<dyn EventLogTrimableTracker>;
824
825pub async fn handle_events<F, R>(
826    db: Database,
827    mut tracker: DynEventLogTracker,
828    mut log_event_added: watch::Receiver<()>,
829    call_fn: F,
830) -> anyhow::Result<()>
831where
832    F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
833    R: Future<Output = anyhow::Result<()>>,
834{
835    let mut next_key: EventLogId = tracker
836        .load(&mut db.begin_transaction_nc().await)
837        .await?
838        .unwrap_or_default();
839
840    trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
841
842    loop {
843        let mut dbtx = db.begin_transaction().await;
844
845        match dbtx.get_value(&next_key).await {
846            Some(event) => {
847                (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
848
849                next_key = next_key.next();
850
851                tracker.store(&mut dbtx.to_ref_nc(), next_key).await?;
852
853                dbtx.commit_tx().await;
854            }
855            _ => {
856                if log_event_added.changed().await.is_err() {
857                    break Ok(());
858                }
859            }
860        }
861    }
862}
863
864pub async fn handle_trimable_events<F, R>(
865    db: Database,
866    mut tracker: DynEventLogTrimableTracker,
867    mut log_event_added: watch::Receiver<()>,
868    call_fn: F,
869) -> anyhow::Result<()>
870where
871    F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
872    R: Future<Output = anyhow::Result<()>>,
873{
874    let mut next_key: EventLogTrimableId = tracker
875        .load(&mut db.begin_transaction_nc().await)
876        .await?
877        .unwrap_or_default();
878    trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling trimable events");
879
880    loop {
881        let mut dbtx = db.begin_transaction().await;
882
883        match dbtx.get_value(&next_key).await {
884            Some(event) => {
885                (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
886
887                next_key = next_key.next();
888                tracker.store(&mut dbtx.to_ref_nc(), next_key).await?;
889
890                dbtx.commit_tx().await;
891            }
892            _ => {
893                if log_event_added.changed().await.is_err() {
894                    break Ok(());
895                }
896            }
897        }
898    }
899}
900
901/// Filters the `PersistedLogEntries` by the `EventKind` and
902/// `ModuleKind`.
903pub fn filter_events_by_kind<'a, I>(
904    all_events: I,
905    module_kind: ModuleKind,
906    event_kind: EventKind,
907) -> impl Iterator<Item = &'a PersistedLogEntry> + 'a
908where
909    I: IntoIterator<Item = &'a PersistedLogEntry> + 'a,
910{
911    all_events.into_iter().filter(move |e| {
912        if let Some((m, _)) = &e.inner.module {
913            e.inner.kind == event_kind && *m == module_kind
914        } else {
915            false
916        }
917    })
918}
919
920/// Joins two sets of events on a predicate.
921///
922/// This function computes a "nested loop join" by first computing the cross
923/// product of the start event vector and the success/failure event vectors. The
924/// resulting cartesian product is then filtered according to the join predicate
925/// supplied in the parameters.
926///
927/// This function is intended for small data sets. If the data set relations
928/// grow, this function should implement a different join algorithm or be moved
929/// out of the gateway.
930pub fn join_events<'a, L, R, Res>(
931    events_l: &'a [&PersistedLogEntry],
932    events_r: &'a [&PersistedLogEntry],
933    max_time_distance: Option<Duration>,
934    predicate: impl Fn(L, R, Duration) -> Option<Res> + 'a,
935) -> impl Iterator<Item = Res> + 'a
936where
937    L: Event,
938    R: Event,
939{
940    events_l
941        .iter()
942        .cartesian_product(events_r)
943        .filter_map(move |(l, r)| {
944            if L::MODULE.as_ref() == l.as_raw().module_kind()
945                && L::KIND == l.as_raw().kind
946                && R::MODULE.as_ref() == r.as_raw().module_kind()
947                && R::KIND == r.as_raw().kind
948                && let Some(latency_usecs) = r.inner.ts_usecs.checked_sub(l.inner.ts_usecs)
949                && max_time_distance.is_none_or(|max| u128::from(latency_usecs) <= max.as_millis())
950                && let Some(l) = l.as_raw().to_event()
951                && let Some(r) = r.as_raw().to_event()
952            {
953                predicate(l, r, Duration::from_millis(latency_usecs))
954            } else {
955                None
956            }
957        })
958}
959
960/// Helper struct for storing computed data about outgoing and incoming
961/// payments.
962#[derive(Debug, Default)]
963pub struct StructuredPaymentEvents {
964    pub latencies_usecs: Vec<u64>,
965    pub fees: Vec<Amount>,
966    pub latencies_failure: Vec<u64>,
967}
968
969impl StructuredPaymentEvents {
970    pub fn new(
971        success_stats: &[(u64, Amount)],
972        failure_stats: Vec<u64>,
973    ) -> StructuredPaymentEvents {
974        let mut events = StructuredPaymentEvents {
975            latencies_usecs: success_stats.iter().map(|(l, _)| *l).collect(),
976            fees: success_stats.iter().map(|(_, f)| *f).collect(),
977            latencies_failure: failure_stats,
978        };
979        events.sort();
980        events
981    }
982
983    /// Combines this `StructuredPaymentEvents` with the `other`
984    /// `StructuredPaymentEvents` by appending all of the internal vectors.
985    pub fn combine(&mut self, other: &mut StructuredPaymentEvents) {
986        self.latencies_usecs.append(&mut other.latencies_usecs);
987        self.fees.append(&mut other.fees);
988        self.latencies_failure.append(&mut other.latencies_failure);
989        self.sort();
990    }
991
992    /// Sorts this `StructuredPaymentEvents` by sorting all of the internal
993    /// vectors.
994    fn sort(&mut self) {
995        self.latencies_usecs.sort_unstable();
996        self.fees.sort_unstable();
997        self.latencies_failure.sort_unstable();
998    }
999}
1000
1001#[cfg(test)]
1002mod tests;