fedimint_eventlog/
lib.rs

1#![allow(clippy::needless_lifetimes)]
2
3//! Client Event Log
4//!
5//! The goal here is to maintain a single, ordered, append only
6//! log of all important client-side events: low or high level,
7//! and move as much of coordination between different parts of
8//! the system in a natural and decomposed way.
9//!
10//! Any event log "follower" can just keep going through
11//! all events and react to ones it is interested in (and understands),
12//! potentially emitting events of its own, and atomically updating persisted
13//! event log position ("cursor") of events that were already processed.
14use std::borrow::Cow;
15use std::str::FromStr;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use fedimint_core::core::{ModuleInstanceId, ModuleKind};
19use fedimint_core::db::{
20    Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, NonCommittable,
21};
22use fedimint_core::encoding::{Decodable, Encodable};
23use fedimint_core::{Amount, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
24use fedimint_logging::LOG_CLIENT_EVENT_LOG;
25use futures::{Future, StreamExt};
26use itertools::Itertools;
27use serde::{Deserialize, Serialize};
28use tokio::sync::{broadcast, watch};
29use tracing::{debug, trace};
30
31/// DB prefixes hardcoded for use of the event log
32/// `fedimint-eventlog` was extracted from `fedimint-client` to help
33/// include/re-use in other part of the code. But fundamentally its role
34/// is to implement event log in the client.
35/// There is currently no way to inject the prefixes to use for db records,
36/// so we use these constants to keep them in sync. Any other app that will
37/// want to store its own even log, will need to use the exact same prefixes,
38/// which in practice should not be a problem.
39pub const DB_KEY_PREFIX_UNORDERED_EVENT_LOG: u8 = 0x3a;
40pub const DB_KEY_PREFIX_EVENT_LOG: u8 = 0x39;
41pub const DB_KEY_PREFIX_EVENT_LOG_TRIMABLE: u8 = 0x41;
42
43/// Minimum age in ID count for trimable events to be deleted
44const TRIMABLE_EVENTLOG_MIN_ID_AGE: u64 = 10_000;
45/// Minimum age in microseconds for trimable events to be deleted (14 days)
46const TRIMABLE_EVENTLOG_MIN_TS_AGE: u64 = 14 * 24 * 60 * 60 * 1_000_000;
47/// Maximum number of entries to trim in one operation
48const TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS: usize = 100_000;
49
50/// Type of persistence the [`Event`] uses.
51///
52/// As a compromise between richness of events and amount of data to store
53/// Fedimint maintains two event logs in parallel:
54///
55/// * untrimable
56/// * trimable
57///
58/// Untrimable log will append only a subset of events that are infrequent,
59/// but important enough to be forever useful, e.g. for processing or debugging
60/// of historical events.
61///
62/// Trimable log will append all persistent events, but will over time remove
63/// the oldest ones. It will always retain enough events, that no log follower
64/// actively processing it should ever miss any event, but restarting processing
65/// from the start (index 0) can't be used for processing historical data.
66///
67/// Notably the positions in both logs are not interchangeable, so they use
68/// different types.
69///
70/// On top of it, some events are transient and are not persisted at all,
71/// and emitted only at runtime.
72///
73/// Consult [`Event::PERSISTENCE`] to know which event uses which persistence.
74pub enum EventPersistence {
75    /// Not written anywhere, just broadcasted as notification at runtime
76    Transient,
77    /// Persised only to log that gets trimmed
78    Trimable,
79    /// Persisted in both trimmed and untrimmed logs, so potentially
80    /// stored forever.
81    Persistent,
82}
83
84pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
85    const MODULE: Option<ModuleKind>;
86    const KIND: EventKind;
87    const PERSISTENCE: EventPersistence;
88}
89
90/// An counter that resets on every restart, that guarantees that
91/// [`UnordedEventLogId`]s don't conflict with each other.
92static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
93
94/// A self-allocated ID that is mostly ordered
95///
96/// The goal here is to avoid concurrent database transaction
97/// conflicts due the ID allocation. Instead they are picked based on
98/// a time and a counter, so they are mostly but not strictly ordered and
99/// monotonic, and even more importantly: not contiguous.
100#[derive(Debug, Encodable, Decodable)]
101pub struct UnordedEventLogId {
102    ts_usecs: u64,
103    counter: u64,
104}
105
106impl UnordedEventLogId {
107    fn new() -> Self {
108        Self {
109            ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
110                // This will never happen
111                .unwrap_or(u64::MAX),
112            counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
113        }
114    }
115}
116
117/// Ordered, contiguous ID space, which is easy for event log followers to
118/// track.
119#[derive(
120    Copy,
121    Clone,
122    Debug,
123    Encodable,
124    Decodable,
125    Default,
126    PartialEq,
127    Eq,
128    PartialOrd,
129    Ord,
130    Serialize,
131    Deserialize,
132)]
133pub struct EventLogId(u64);
134
135impl EventLogId {
136    pub const LOG_START: EventLogId = EventLogId(0);
137
138    fn next(self) -> EventLogId {
139        Self(self.0 + 1)
140    }
141
142    pub fn saturating_add(self, rhs: u64) -> EventLogId {
143        Self(self.0.saturating_add(rhs))
144    }
145
146    pub fn saturating_sub(self, rhs: u64) -> EventLogId {
147        Self(self.0.saturating_sub(rhs))
148    }
149}
150
151impl From<EventLogId> for u64 {
152    fn from(value: EventLogId) -> Self {
153        value.0
154    }
155}
156
157impl FromStr for EventLogId {
158    type Err = <u64 as FromStr>::Err;
159
160    fn from_str(s: &str) -> Result<Self, Self::Err> {
161        u64::from_str(s).map(Self)
162    }
163}
164
165#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
166pub struct EventKind(Cow<'static, str>);
167
168impl EventKind {
169    pub const fn from_static(value: &'static str) -> Self {
170        Self(Cow::Borrowed(value))
171    }
172}
173
174impl<'s> From<&'s str> for EventKind {
175    fn from(value: &'s str) -> Self {
176        Self(Cow::Owned(value.to_owned()))
177    }
178}
179
180impl From<String> for EventKind {
181    fn from(value: String) -> Self {
182        Self(Cow::Owned(value))
183    }
184}
185
186#[derive(Debug, Encodable, Decodable, Clone)]
187pub struct UnorderedEventLogEntry {
188    pub flags: u8,
189    pub inner: EventLogEntry,
190}
191
192impl UnorderedEventLogEntry {
193    pub const FLAG_PERSIST: u8 = 1;
194    pub const FLAG_TRIMABLE: u8 = 2;
195
196    fn persist(&self) -> bool {
197        self.flags & Self::FLAG_PERSIST != 0
198    }
199
200    fn trimable(&self) -> bool {
201        self.flags & Self::FLAG_TRIMABLE != 0
202    }
203}
204
205#[derive(Debug, Encodable, Decodable, Clone)]
206pub struct EventLogEntry {
207    /// Type/kind of the event
208    ///
209    /// Any part of the client is free to self-allocate identifier, denoting a
210    /// certain kind of an event. Notably one event kind have multiple
211    /// instances. E.g. "successful wallet deposit" can be an event kind,
212    /// and it can happen multiple times with different payloads.
213    pub kind: EventKind,
214
215    /// To prevent accidental conflicts between `kind`s, a module kind the
216    /// given event kind belong is used as well.
217    ///
218    /// Note: the meaning of this field is mostly about which part of the code
219    /// defines this event kind. Oftentime a core (non-module)-defined event
220    /// will refer in some way to a module. It should use a separate `module_id`
221    /// field in the `payload`, instead of this field.
222    pub module: Option<(ModuleKind, ModuleInstanceId)>,
223
224    /// Timestamp in microseconds after unix epoch
225    pub ts_usecs: u64,
226
227    /// Event-kind specific payload, typically encoded as a json string for
228    /// flexibility.
229    pub payload: Vec<u8>,
230}
231
232/// Struct used for processing log entries after they have been persisted.
233#[derive(Debug, Clone)]
234pub struct PersistedLogEntry {
235    id: EventLogId,
236    inner: EventLogEntry,
237}
238
239impl Serialize for PersistedLogEntry {
240    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
241    where
242        S: serde::Serializer,
243    {
244        use serde::ser::SerializeStruct;
245
246        let mut state = serializer.serialize_struct("PersistedLogEntry", 5)?;
247        state.serialize_field("id", &self.id)?;
248        state.serialize_field("kind", &self.inner.kind)?;
249        state.serialize_field("module", &self.inner.module)?;
250        state.serialize_field("ts_usecs", &self.inner.ts_usecs)?;
251
252        // Try to deserialize payload as JSON, fall back to hex encoding
253        let payload_value: serde_json::Value = serde_json::from_slice(&self.inner.payload)
254            .unwrap_or_else(|_| serde_json::Value::String(hex::encode(&self.inner.payload)));
255        state.serialize_field("payload", &payload_value)?;
256
257        state.end()
258    }
259}
260
261impl<'de> Deserialize<'de> for PersistedLogEntry {
262    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
263    where
264        D: serde::Deserializer<'de>,
265    {
266        use serde::de::{self, MapAccess, Visitor};
267
268        #[derive(Deserialize)]
269        #[serde(field_identifier, rename_all = "snake_case")]
270        enum Field {
271            Id,
272            Kind,
273            Module,
274            TsUsecs,
275            Payload,
276        }
277
278        struct PersistedLogEntryVisitor;
279
280        impl<'de> Visitor<'de> for PersistedLogEntryVisitor {
281            type Value = PersistedLogEntry;
282
283            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
284                formatter.write_str("struct PersistedLogEntry")
285            }
286
287            fn visit_map<V>(self, mut map: V) -> Result<PersistedLogEntry, V::Error>
288            where
289                V: MapAccess<'de>,
290            {
291                let mut id = None;
292                let mut kind = None;
293                let mut module = None;
294                let mut ts_usecs = None;
295                let mut payload = None;
296
297                while let Some(key) = map.next_key()? {
298                    match key {
299                        Field::Id => {
300                            if id.is_some() {
301                                return Err(de::Error::duplicate_field("id"));
302                            }
303                            id = Some(map.next_value()?);
304                        }
305                        Field::Kind => {
306                            if kind.is_some() {
307                                return Err(de::Error::duplicate_field("kind"));
308                            }
309                            kind = Some(map.next_value()?);
310                        }
311                        Field::Module => {
312                            if module.is_some() {
313                                return Err(de::Error::duplicate_field("module"));
314                            }
315                            module = Some(map.next_value()?);
316                        }
317                        Field::TsUsecs => {
318                            if ts_usecs.is_some() {
319                                return Err(de::Error::duplicate_field("ts_usecs"));
320                            }
321                            ts_usecs = Some(map.next_value()?);
322                        }
323                        Field::Payload => {
324                            if payload.is_some() {
325                                return Err(de::Error::duplicate_field("payload"));
326                            }
327                            let value: serde_json::Value = map.next_value()?;
328                            payload = Some(serde_json::to_vec(&value).map_err(de::Error::custom)?);
329                        }
330                    }
331                }
332
333                let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
334                let kind = kind.ok_or_else(|| de::Error::missing_field("kind"))?;
335                let module = module.ok_or_else(|| de::Error::missing_field("module"))?;
336                let ts_usecs = ts_usecs.ok_or_else(|| de::Error::missing_field("ts_usecs"))?;
337                let payload = payload.ok_or_else(|| de::Error::missing_field("payload"))?;
338
339                Ok(PersistedLogEntry {
340                    id,
341                    inner: EventLogEntry {
342                        kind,
343                        module,
344                        ts_usecs,
345                        payload,
346                    },
347                })
348            }
349        }
350
351        const FIELDS: &[&str] = &["id", "kind", "module", "ts_usecs", "payload"];
352        deserializer.deserialize_struct("PersistedLogEntry", FIELDS, PersistedLogEntryVisitor)
353    }
354}
355
356impl PersistedLogEntry {
357    pub fn id(&self) -> EventLogId {
358        self.id
359    }
360
361    pub fn as_raw(&self) -> &EventLogEntry {
362        &self.inner
363    }
364}
365impl_db_record!(
366    key = UnordedEventLogId,
367    value = UnorderedEventLogEntry,
368    db_prefix = DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
369);
370
371#[derive(Clone, Debug, Encodable, Decodable)]
372pub struct UnorderedEventLogIdPrefixAll;
373
374impl_db_lookup!(
375    key = UnordedEventLogId,
376    query_prefix = UnorderedEventLogIdPrefixAll
377);
378
379#[derive(Clone, Debug, Encodable, Decodable)]
380pub struct EventLogIdPrefixAll;
381
382#[derive(Clone, Debug, Encodable, Decodable)]
383pub struct EventLogIdPrefix(EventLogId);
384
385impl_db_record!(
386    key = EventLogId,
387    value = EventLogEntry,
388    db_prefix = DB_KEY_PREFIX_EVENT_LOG,
389);
390
391impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
392
393impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
394
395#[derive(
396    Copy,
397    Clone,
398    Debug,
399    Encodable,
400    Decodable,
401    Default,
402    PartialEq,
403    Eq,
404    PartialOrd,
405    Ord,
406    Serialize,
407    Deserialize,
408)]
409pub struct EventLogTrimableId(EventLogId);
410
411impl EventLogTrimableId {
412    fn next(&self) -> Self {
413        Self(self.0.next())
414    }
415
416    pub fn saturating_add(self, rhs: u64) -> Self {
417        Self(self.0.saturating_add(rhs))
418    }
419}
420
421impl From<u64> for EventLogTrimableId {
422    fn from(value: u64) -> Self {
423        Self(EventLogId(value))
424    }
425}
426
427#[derive(Clone, Debug, Encodable, Decodable)]
428pub struct EventLogTrimableIdPrefixAll;
429
430#[derive(Clone, Debug, Encodable, Decodable)]
431pub struct EventLogTrimableIdPrefix(EventLogId);
432
433impl_db_record!(
434    key = EventLogTrimableId,
435    value = EventLogEntry,
436    db_prefix = DB_KEY_PREFIX_EVENT_LOG_TRIMABLE,
437);
438
439impl_db_lookup!(
440    key = EventLogTrimableId,
441    query_prefix = EventLogTrimableIdPrefixAll
442);
443
444impl_db_lookup!(
445    key = EventLogTrimableId,
446    query_prefix = EventLogTrimableIdPrefix
447);
448
449#[apply(async_trait_maybe_send!)]
450pub trait DBTransactionEventLogExt {
451    #[allow(clippy::too_many_arguments)]
452    async fn log_event_raw(
453        &mut self,
454        log_ordering_wakeup_tx: watch::Sender<()>,
455        kind: EventKind,
456        module_kind: Option<ModuleKind>,
457        module_id: Option<ModuleInstanceId>,
458        payload: Vec<u8>,
459        persist: EventPersistence,
460    );
461
462    /// Log an event log event
463    ///
464    /// The event will start "unordered", but after it is committed an ordering
465    /// task will be notified to "order" it into a final ordered log.
466    async fn log_event<E>(
467        &mut self,
468        log_ordering_wakeup_tx: watch::Sender<()>,
469        module_id: Option<ModuleInstanceId>,
470        event: E,
471    ) where
472        E: Event + Send,
473    {
474        self.log_event_raw(
475            log_ordering_wakeup_tx,
476            E::KIND,
477            E::MODULE,
478            module_id,
479            serde_json::to_vec(&event).expect("Serialization can't fail"),
480            <E as Event>::PERSISTENCE,
481        )
482        .await;
483    }
484
485    /// Next [`EventLogId`] to use for new ordered events.
486    ///
487    /// Used by ordering task, though might be
488    /// useful to get the current count of events.
489    async fn get_next_event_log_id(&mut self) -> EventLogId;
490
491    /// Next [`EventLogTrimableId`] to use for new ordered trimable events
492    async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId;
493
494    /// Read a part of the event log.
495    async fn get_event_log(
496        &mut self,
497        pos: Option<EventLogId>,
498        limit: u64,
499    ) -> Vec<PersistedLogEntry>;
500
501    async fn get_event_log_trimable(
502        &mut self,
503        pos: Option<EventLogTrimableId>,
504        limit: u64,
505    ) -> Vec<PersistedLogEntry>;
506}
507
508#[apply(async_trait_maybe_send!)]
509impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
510where
511    Cap: Send,
512{
513    async fn log_event_raw(
514        &mut self,
515        log_ordering_wakeup_tx: watch::Sender<()>,
516        kind: EventKind,
517        module_kind: Option<ModuleKind>,
518        module_id: Option<ModuleInstanceId>,
519        payload: Vec<u8>,
520        persist: EventPersistence,
521    ) {
522        assert_eq!(
523            module_kind.is_some(),
524            module_id.is_some(),
525            "Events of modules must have module_id set"
526        );
527
528        let unordered_id = UnordedEventLogId::new();
529        trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
530
531        if self
532            .insert_entry(
533                &unordered_id,
534                &UnorderedEventLogEntry {
535                    flags: match persist {
536                        EventPersistence::Transient => 0,
537                        EventPersistence::Trimable => UnorderedEventLogEntry::FLAG_TRIMABLE,
538                        EventPersistence::Persistent => UnorderedEventLogEntry::FLAG_PERSIST,
539                    },
540                    inner: EventLogEntry {
541                        kind,
542                        module: module_kind.map(|kind| (kind, module_id.unwrap())),
543                        ts_usecs: unordered_id.ts_usecs,
544                        payload,
545                    },
546                },
547            )
548            .await
549            .is_some()
550        {
551            panic!("Trying to overwrite event in the client event log");
552        }
553        self.on_commit(move || {
554            log_ordering_wakeup_tx.send_replace(());
555        });
556    }
557
558    async fn get_next_event_log_id(&mut self) -> EventLogId {
559        self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
560            .await
561            .next()
562            .await
563            .map(|(k, _v)| k.next())
564            .unwrap_or_default()
565    }
566
567    async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId {
568        EventLogTrimableId(
569            self.find_by_prefix_sorted_descending(&EventLogTrimableIdPrefixAll)
570                .await
571                .next()
572                .await
573                .map(|(k, _v)| k.0.next())
574                .unwrap_or_default(),
575        )
576    }
577
578    async fn get_event_log(
579        &mut self,
580        pos: Option<EventLogId>,
581        limit: u64,
582    ) -> Vec<PersistedLogEntry> {
583        let pos = pos.unwrap_or_default();
584        self.find_by_range(pos..pos.saturating_add(limit))
585            .await
586            .map(|(k, v)| PersistedLogEntry { id: k, inner: v })
587            .collect()
588            .await
589    }
590
591    async fn get_event_log_trimable(
592        &mut self,
593        pos: Option<EventLogTrimableId>,
594        limit: u64,
595    ) -> Vec<PersistedLogEntry> {
596        let pos = pos.unwrap_or_default();
597        self.find_by_range(pos..pos.saturating_add(limit))
598            .await
599            .map(|(k, v)| PersistedLogEntry { id: k.0, inner: v })
600            .collect()
601            .await
602    }
603}
604
605/// Trims old entries from the trimable event log
606async fn trim_trimable_log(db: &Database, current_time_usecs: u64) {
607    let mut dbtx = db.begin_transaction().await;
608
609    let current_trimable_id = dbtx.get_next_event_log_trimable_id().await;
610    let min_id_threshold = current_trimable_id
611        .0
612        .saturating_sub(TRIMABLE_EVENTLOG_MIN_ID_AGE);
613    let min_ts_threshold = current_time_usecs.saturating_sub(TRIMABLE_EVENTLOG_MIN_TS_AGE);
614
615    let entries_to_delete: Vec<_> = dbtx
616        .find_by_prefix(&EventLogTrimableIdPrefixAll)
617        .await
618        .take_while(|(id, entry)| {
619            let id_old_enough = id.0 <= min_id_threshold;
620            let ts_old_enough = entry.ts_usecs <= min_ts_threshold;
621
622            // Continue while both conditions are met
623            async move { id_old_enough && ts_old_enough }
624        })
625        .take(TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS)
626        .map(|(id, _entry)| id)
627        .collect()
628        .await;
629
630    for id in &entries_to_delete {
631        dbtx.remove_entry(id).await;
632    }
633
634    dbtx.commit_tx().await;
635}
636
637/// The code that handles new unordered events and rewriters them fully ordered
638/// into the final event log.
639pub async fn run_event_log_ordering_task(
640    db: Database,
641    mut log_ordering_task_wakeup: watch::Receiver<()>,
642    log_event_added: watch::Sender<()>,
643    log_event_added_transient: broadcast::Sender<EventLogEntry>,
644) {
645    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
646
647    let current_time_usecs =
648        u64::try_from(fedimint_core::time::duration_since_epoch().as_micros()).unwrap_or(u64::MAX);
649    trim_trimable_log(&db, current_time_usecs).await;
650
651    let mut next_entry_id = db
652        .begin_transaction_nc()
653        .await
654        .get_next_event_log_id()
655        .await;
656    let mut next_entry_id_trimable = db
657        .begin_transaction_nc()
658        .await
659        .get_next_event_log_trimable_id()
660        .await;
661
662    loop {
663        let mut dbtx = db.begin_transaction().await;
664
665        let unordered_events = dbtx
666            .find_by_prefix(&UnorderedEventLogIdPrefixAll)
667            .await
668            .collect::<Vec<_>>()
669            .await;
670        trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
671
672        for (unordered_id, entry) in &unordered_events {
673            assert!(
674                dbtx.remove_entry(unordered_id).await.is_some(),
675                "Must never fail to remove entry"
676            );
677            if entry.persist() {
678                // Non-trimable events get persisted in both the default event log
679                // and trimable event log
680                if !entry.trimable() {
681                    assert!(
682                        dbtx.insert_entry(&next_entry_id, &entry.inner)
683                            .await
684                            .is_none(),
685                        "Must never overwrite existing event"
686                    );
687                    trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
688                    next_entry_id = next_entry_id.next();
689                }
690
691                // Trimable events get persisted only in trimable log
692                assert!(
693                    dbtx.insert_entry(&next_entry_id_trimable, &entry.inner)
694                        .await
695                        .is_none(),
696                    "Must never overwrite existing event"
697                );
698                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
699                next_entry_id_trimable = next_entry_id_trimable.next();
700            } else {
701                // Transient events don't get persisted at all
702                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
703                dbtx.on_commit({
704                    let log_event_added_transient = log_event_added_transient.clone();
705                    let entry = entry.inner.clone();
706
707                    move || {
708                        // we ignore the no-subscribers
709                        let _ = log_event_added_transient.send(entry);
710                    }
711                });
712            }
713        }
714
715        // This thread is the only thread deleting already existing element of unordered
716        // log and inserting new elements into ordered log, so it should never
717        // fail to commit.
718        dbtx.commit_tx().await;
719        if !unordered_events.is_empty() {
720            log_event_added.send_replace(());
721        }
722
723        trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
724        if log_ordering_task_wakeup.changed().await.is_err() {
725            break;
726        }
727    }
728
729    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
730}
731
732/// Persistent tracker of a position in the event log
733///
734/// During processing of event log the downstream consumer needs to
735/// keep track of which event were processed already. It needs to do it
736/// atomically and persist it so event in the presence of crashes no
737/// event is ever missed or processed twice.
738///
739/// This trait allows abstracting away where and how is such position stored,
740/// e.g. which key exactly is used, in what prefixed namespace etc.
741///
742/// ## Trimmable vs Non-Trimable log
743///
744/// See [`EventPersistence`]
745#[apply(async_trait_maybe_send!)]
746pub trait EventLogNonTrimableTracker {
747    // Store position in the event log
748    async fn store(
749        &mut self,
750        dbtx: &mut DatabaseTransaction<NonCommittable>,
751        pos: EventLogId,
752    ) -> anyhow::Result<()>;
753
754    /// Load the last previous stored position (or None if never stored)
755    async fn load(
756        &mut self,
757        dbtx: &mut DatabaseTransaction<NonCommittable>,
758    ) -> anyhow::Result<Option<EventLogId>>;
759}
760pub type DynEventLogTracker = Box<dyn EventLogNonTrimableTracker>;
761
762/// Like [`EventLogNonTrimableTracker`] but for trimable event log
763#[apply(async_trait_maybe_send!)]
764pub trait EventLogTrimableTracker {
765    // Store position in the event log
766    async fn store(
767        &mut self,
768        dbtx: &mut DatabaseTransaction<NonCommittable>,
769        pos: EventLogTrimableId,
770    ) -> anyhow::Result<()>;
771
772    /// Load the last previous stored position (or None if never stored)
773    async fn load(
774        &mut self,
775        dbtx: &mut DatabaseTransaction<NonCommittable>,
776    ) -> anyhow::Result<Option<EventLogTrimableId>>;
777}
778pub type DynEventLogTrimableTracker = Box<dyn EventLogTrimableTracker>;
779
780pub async fn handle_events<F, R>(
781    db: Database,
782    mut tracker: DynEventLogTracker,
783    mut log_event_added: watch::Receiver<()>,
784    call_fn: F,
785) -> anyhow::Result<()>
786where
787    F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
788    R: Future<Output = anyhow::Result<()>>,
789{
790    let mut next_key: EventLogId = tracker
791        .load(&mut db.begin_transaction_nc().await)
792        .await?
793        .unwrap_or_default();
794
795    trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
796
797    loop {
798        let mut dbtx = db.begin_transaction().await;
799
800        match dbtx.get_value(&next_key).await {
801            Some(event) => {
802                (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
803
804                next_key = next_key.next();
805
806                tracker.store(&mut dbtx.to_ref_nc(), next_key).await?;
807
808                dbtx.commit_tx().await;
809            }
810            _ => {
811                if log_event_added.changed().await.is_err() {
812                    break Ok(());
813                }
814            }
815        }
816    }
817}
818
819pub async fn handle_trimable_events<F, R>(
820    db: Database,
821    mut tracker: DynEventLogTrimableTracker,
822    mut log_event_added: watch::Receiver<()>,
823    call_fn: F,
824) -> anyhow::Result<()>
825where
826    F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
827    R: Future<Output = anyhow::Result<()>>,
828{
829    let mut next_key: EventLogTrimableId = tracker
830        .load(&mut db.begin_transaction_nc().await)
831        .await?
832        .unwrap_or_default();
833    trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling trimable events");
834
835    loop {
836        let mut dbtx = db.begin_transaction().await;
837
838        match dbtx.get_value(&next_key).await {
839            Some(event) => {
840                (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
841
842                next_key = next_key.next();
843                tracker.store(&mut dbtx.to_ref_nc(), next_key).await?;
844
845                dbtx.commit_tx().await;
846            }
847            _ => {
848                if log_event_added.changed().await.is_err() {
849                    break Ok(());
850                }
851            }
852        }
853    }
854}
855
856/// Filters the `PersistedLogEntries` by the `EventKind` and
857/// `ModuleKind`.
858pub fn filter_events_by_kind<'a, I>(
859    all_events: I,
860    module_kind: ModuleKind,
861    event_kind: EventKind,
862) -> impl Iterator<Item = &'a PersistedLogEntry> + 'a
863where
864    I: IntoIterator<Item = &'a PersistedLogEntry> + 'a,
865{
866    all_events.into_iter().filter(move |e| {
867        if let Some((m, _)) = &e.inner.module {
868            e.inner.kind == event_kind && *m == module_kind
869        } else {
870            false
871        }
872    })
873}
874
875/// Joins two sets of events on a predicate.
876///
877/// This function computes a "nested loop join" by first computing the cross
878/// product of the start event vector and the success/failure event vectors. The
879/// resulting cartesian product is then filtered according to the join predicate
880/// supplied in the parameters.
881///
882/// This function is intended for small data sets. If the data set relations
883/// grow, this function should implement a different join algorithm or be moved
884/// out of the gateway.
885///
886/// FIXME: This function eagerly stuff eagerly on a quadratic cartesian product
887/// of events is potentially a problem. We should delay deserialization as far
888/// as possible, so the filtering and matching on event types can remove as much
889/// work as possible.
890pub fn join_events<'a, L, R, Res>(
891    events_l: &'a [&PersistedLogEntry],
892    events_r: &'a [&PersistedLogEntry],
893    predicate: impl Fn(L, R, u64) -> Option<Res> + 'a,
894) -> impl Iterator<Item = Res> + 'a
895where
896    L: Event,
897    R: Event,
898{
899    events_l
900        .iter()
901        .cartesian_product(events_r)
902        .filter_map(move |(l, r)| {
903            if let Some(latency) = r.inner.ts_usecs.checked_sub(l.inner.ts_usecs) {
904                let event_l: L =
905                    serde_json::from_slice(&l.inner.payload).expect("could not parse JSON");
906                let event_r: R =
907                    serde_json::from_slice(&r.inner.payload).expect("could not parse JSON");
908                predicate(event_l, event_r, latency)
909            } else {
910                None
911            }
912        })
913}
914
915/// Helper struct for storing computed data about outgoing and incoming
916/// payments.
917#[derive(Debug, Default)]
918pub struct StructuredPaymentEvents {
919    pub latencies: Vec<u64>,
920    pub fees: Vec<Amount>,
921    pub latencies_failure: Vec<u64>,
922}
923
924impl StructuredPaymentEvents {
925    pub fn new(
926        success_stats: &[(u64, Amount)],
927        failure_stats: Vec<u64>,
928    ) -> StructuredPaymentEvents {
929        let mut events = StructuredPaymentEvents {
930            latencies: success_stats.iter().map(|(l, _)| *l).collect(),
931            fees: success_stats.iter().map(|(_, f)| *f).collect(),
932            latencies_failure: failure_stats,
933        };
934        events.sort();
935        events
936    }
937
938    /// Combines this `StructuredPaymentEvents` with the `other`
939    /// `StructuredPaymentEvents` by appending all of the internal vectors.
940    pub fn combine(&mut self, other: &mut StructuredPaymentEvents) {
941        self.latencies.append(&mut other.latencies);
942        self.fees.append(&mut other.fees);
943        self.latencies_failure.append(&mut other.latencies_failure);
944        self.sort();
945    }
946
947    /// Sorts this `StructuredPaymentEvents` by sorting all of the internal
948    /// vectors.
949    fn sort(&mut self) {
950        self.latencies.sort_unstable();
951        self.fees.sort_unstable();
952        self.latencies_failure.sort_unstable();
953    }
954}
955
956#[cfg(test)]
957mod tests;