fedimint_eventlog/
lib.rs

1#![allow(clippy::needless_lifetimes)]
2
3//! Client Event Log
4//!
5//! The goal here is to maintain a single, ordered, append only
6//! log of all important client-side events: low or high level,
7//! and move as much of coordination between different parts of
8//! the system in a natural and decomposed way.
9//!
10//! Any event log "follower" can just keep going through
11//! all events and react to ones it is interested in (and understands),
12//! potentially emitting events of its own, and atomically updating persisted
13//! event log position ("cursor") of events that were already processed.
14use std::borrow::Cow;
15use std::ops;
16use std::str::FromStr;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::time::Duration;
19
20use fedimint_core::core::{ModuleInstanceId, ModuleKind};
21use fedimint_core::db::{
22    Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, NonCommittable,
23};
24use fedimint_core::encoding::{Decodable, Encodable};
25use fedimint_core::{Amount, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
26use fedimint_logging::LOG_CLIENT_EVENT_LOG;
27use futures::{Future, StreamExt};
28use itertools::Itertools;
29use serde::{Deserialize, Serialize};
30use tokio::sync::{broadcast, watch};
31use tracing::{debug, trace};
32
33/// DB prefixes hardcoded for use of the event log
34/// `fedimint-eventlog` was extracted from `fedimint-client` to help
35/// include/re-use in other part of the code. But fundamentally its role
36/// is to implement event log in the client.
37/// There is currently no way to inject the prefixes to use for db records,
38/// so we use these constants to keep them in sync. Any other app that will
39/// want to store its own even log, will need to use the exact same prefixes,
40/// which in practice should not be a problem.
41pub const DB_KEY_PREFIX_UNORDERED_EVENT_LOG: u8 = 0x3a;
42pub const DB_KEY_PREFIX_EVENT_LOG: u8 = 0x39;
43pub const DB_KEY_PREFIX_EVENT_LOG_TRIMABLE: u8 = 0x41;
44
45/// Minimum age in ID count for trimable events to be deleted
46const TRIMABLE_EVENTLOG_MIN_ID_AGE: u64 = 10_000;
47/// Minimum age in microseconds for trimable events to be deleted (14 days)
48const TRIMABLE_EVENTLOG_MIN_TS_AGE: u64 = 14 * 24 * 60 * 60 * 1_000_000;
49/// Maximum number of entries to trim in one operation
50const TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS: usize = 100_000;
51
52/// Type of persistence the [`Event`] uses.
53///
54/// As a compromise between richness of events and amount of data to store
55/// Fedimint maintains two event logs in parallel:
56///
57/// * untrimable
58/// * trimable
59///
60/// Untrimable log will append only a subset of events that are infrequent,
61/// but important enough to be forever useful, e.g. for processing or debugging
62/// of historical events.
63///
64/// Trimable log will append all persistent events, but will over time remove
65/// the oldest ones. It will always retain enough events, that no log follower
66/// actively processing it should ever miss any event, but restarting processing
67/// from the start (index 0) can't be used for processing historical data.
68///
69/// Notably the positions in both logs are not interchangeable, so they use
70/// different types.
71///
72/// On top of it, some events are transient and are not persisted at all,
73/// and emitted only at runtime.
74///
75/// Consult [`Event::PERSISTENCE`] to know which event uses which persistence.
76pub enum EventPersistence {
77    /// Not written anywhere, just broadcasted as notification at runtime
78    Transient,
79    /// Persised only to log that gets trimmed
80    Trimable,
81    /// Persisted in both trimmed and untrimmed logs, so potentially
82    /// stored forever.
83    Persistent,
84}
85
86pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
87    const MODULE: Option<ModuleKind>;
88    const KIND: EventKind;
89    const PERSISTENCE: EventPersistence;
90}
91
92/// An counter that resets on every restart, that guarantees that
93/// [`UnordedEventLogId`]s don't conflict with each other.
94static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
95
96/// A self-allocated ID that is mostly ordered
97///
98/// The goal here is to avoid concurrent database transaction
99/// conflicts due the ID allocation. Instead they are picked based on
100/// a time and a counter, so they are mostly but not strictly ordered and
101/// monotonic, and even more importantly: not contiguous.
102#[derive(Debug, Encodable, Decodable)]
103pub struct UnordedEventLogId {
104    ts_usecs: u64,
105    counter: u64,
106}
107
108impl UnordedEventLogId {
109    fn new() -> Self {
110        Self {
111            ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
112                // This will never happen
113                .unwrap_or(u64::MAX),
114            counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
115        }
116    }
117}
118
119/// Ordered, contiguous ID space, which is easy for event log followers to
120/// track.
121#[derive(
122    Copy,
123    Clone,
124    Debug,
125    Encodable,
126    Decodable,
127    Default,
128    PartialEq,
129    Eq,
130    PartialOrd,
131    Ord,
132    Serialize,
133    Deserialize,
134)]
135pub struct EventLogId(u64);
136
137impl EventLogId {
138    pub const LOG_START: EventLogId = EventLogId(0);
139
140    fn next(self) -> EventLogId {
141        Self(self.0 + 1)
142    }
143
144    pub fn saturating_add(self, rhs: u64) -> EventLogId {
145        Self(self.0.saturating_add(rhs))
146    }
147
148    pub fn saturating_sub(self, rhs: u64) -> EventLogId {
149        Self(self.0.saturating_sub(rhs))
150    }
151}
152
153impl From<EventLogId> for u64 {
154    fn from(value: EventLogId) -> Self {
155        value.0
156    }
157}
158
159impl FromStr for EventLogId {
160    type Err = <u64 as FromStr>::Err;
161
162    fn from_str(s: &str) -> Result<Self, Self::Err> {
163        u64::from_str(s).map(Self)
164    }
165}
166
167#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
168pub struct EventKind(Cow<'static, str>);
169
170impl EventKind {
171    pub const fn from_static(value: &'static str) -> Self {
172        Self(Cow::Borrowed(value))
173    }
174}
175
176impl<'s> From<&'s str> for EventKind {
177    fn from(value: &'s str) -> Self {
178        Self(Cow::Owned(value.to_owned()))
179    }
180}
181
182impl From<String> for EventKind {
183    fn from(value: String) -> Self {
184        Self(Cow::Owned(value))
185    }
186}
187
188#[derive(Debug, Encodable, Decodable, Clone)]
189pub struct UnorderedEventLogEntry {
190    pub flags: u8,
191    pub inner: EventLogEntry,
192}
193
194impl UnorderedEventLogEntry {
195    pub const FLAG_PERSIST: u8 = 1;
196    pub const FLAG_TRIMABLE: u8 = 2;
197
198    fn persist(&self) -> bool {
199        self.flags & Self::FLAG_PERSIST != 0
200    }
201
202    fn trimable(&self) -> bool {
203        self.flags & Self::FLAG_TRIMABLE != 0
204    }
205}
206
207#[derive(Debug, Encodable, Decodable, Clone)]
208pub struct EventLogEntry {
209    /// Type/kind of the event
210    ///
211    /// Any part of the client is free to self-allocate identifier, denoting a
212    /// certain kind of an event. Notably one event kind have multiple
213    /// instances. E.g. "successful wallet deposit" can be an event kind,
214    /// and it can happen multiple times with different payloads.
215    pub kind: EventKind,
216
217    /// To prevent accidental conflicts between `kind`s, a module kind the
218    /// given event kind belong is used as well.
219    ///
220    /// Note: the meaning of this field is mostly about which part of the code
221    /// defines this event kind. Oftentime a core (non-module)-defined event
222    /// will refer in some way to a module. It should use a separate `module_id`
223    /// field in the `payload`, instead of this field.
224    pub module: Option<(ModuleKind, ModuleInstanceId)>,
225
226    /// Timestamp in microseconds after unix epoch
227    pub ts_usecs: u64,
228
229    /// Event-kind specific payload, typically encoded as a json string for
230    /// flexibility.
231    pub payload: Vec<u8>,
232}
233
234impl EventLogEntry {
235    pub fn module_kind(&self) -> Option<&ModuleKind> {
236        self.module.as_ref().map(|m| &m.0)
237    }
238
239    pub fn module_id(&self) -> Option<ModuleInstanceId> {
240        self.module.as_ref().map(|m| m.1)
241    }
242
243    /// Get the event payload as typed value
244    pub fn to_event<E>(&self) -> Option<E>
245    where
246        E: Event,
247    {
248        serde_json::from_slice(&self.payload).ok()
249    }
250}
251
252/// An `EventLogEntry` that was already persisted (so has an id)
253#[derive(Debug, Clone)]
254pub struct PersistedLogEntry {
255    id: EventLogId,
256    inner: EventLogEntry,
257}
258
259impl Serialize for PersistedLogEntry {
260    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
261    where
262        S: serde::Serializer,
263    {
264        use serde::ser::SerializeStruct;
265
266        let mut state = serializer.serialize_struct("PersistedLogEntry", 5)?;
267        state.serialize_field("id", &self.id)?;
268        state.serialize_field("kind", &self.inner.kind)?;
269        state.serialize_field("module", &self.inner.module)?;
270        state.serialize_field("ts_usecs", &self.inner.ts_usecs)?;
271
272        // Try to deserialize payload as JSON, fall back to hex encoding
273        let payload_value: serde_json::Value = serde_json::from_slice(&self.inner.payload)
274            .unwrap_or_else(|_| serde_json::Value::String(hex::encode(&self.inner.payload)));
275        state.serialize_field("payload", &payload_value)?;
276
277        state.end()
278    }
279}
280
281impl<'de> Deserialize<'de> for PersistedLogEntry {
282    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
283    where
284        D: serde::Deserializer<'de>,
285    {
286        use serde::de::{self, MapAccess, Visitor};
287
288        #[derive(Deserialize)]
289        #[serde(field_identifier, rename_all = "snake_case")]
290        enum Field {
291            Id,
292            Kind,
293            Module,
294            TsUsecs,
295            Payload,
296        }
297
298        struct PersistedLogEntryVisitor;
299
300        impl<'de> Visitor<'de> for PersistedLogEntryVisitor {
301            type Value = PersistedLogEntry;
302
303            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
304                formatter.write_str("struct PersistedLogEntry")
305            }
306
307            fn visit_map<V>(self, mut map: V) -> Result<PersistedLogEntry, V::Error>
308            where
309                V: MapAccess<'de>,
310            {
311                let mut id = None;
312                let mut kind = None;
313                let mut module = None;
314                let mut ts_usecs = None;
315                let mut payload = None;
316
317                while let Some(key) = map.next_key()? {
318                    match key {
319                        Field::Id => {
320                            if id.is_some() {
321                                return Err(de::Error::duplicate_field("id"));
322                            }
323                            id = Some(map.next_value()?);
324                        }
325                        Field::Kind => {
326                            if kind.is_some() {
327                                return Err(de::Error::duplicate_field("kind"));
328                            }
329                            kind = Some(map.next_value()?);
330                        }
331                        Field::Module => {
332                            if module.is_some() {
333                                return Err(de::Error::duplicate_field("module"));
334                            }
335                            module = Some(map.next_value()?);
336                        }
337                        Field::TsUsecs => {
338                            if ts_usecs.is_some() {
339                                return Err(de::Error::duplicate_field("ts_usecs"));
340                            }
341                            ts_usecs = Some(map.next_value()?);
342                        }
343                        Field::Payload => {
344                            if payload.is_some() {
345                                return Err(de::Error::duplicate_field("payload"));
346                            }
347                            let value: serde_json::Value = map.next_value()?;
348                            payload = Some(serde_json::to_vec(&value).map_err(de::Error::custom)?);
349                        }
350                    }
351                }
352
353                let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
354                let kind = kind.ok_or_else(|| de::Error::missing_field("kind"))?;
355                let module = module.ok_or_else(|| de::Error::missing_field("module"))?;
356                let ts_usecs = ts_usecs.ok_or_else(|| de::Error::missing_field("ts_usecs"))?;
357                let payload = payload.ok_or_else(|| de::Error::missing_field("payload"))?;
358
359                Ok(PersistedLogEntry {
360                    id,
361                    inner: EventLogEntry {
362                        kind,
363                        module,
364                        ts_usecs,
365                        payload,
366                    },
367                })
368            }
369        }
370
371        const FIELDS: &[&str] = &["id", "kind", "module", "ts_usecs", "payload"];
372        deserializer.deserialize_struct("PersistedLogEntry", FIELDS, PersistedLogEntryVisitor)
373    }
374}
375
376impl PersistedLogEntry {
377    pub fn id(&self) -> EventLogId {
378        self.id
379    }
380
381    pub fn as_raw(&self) -> &EventLogEntry {
382        &self.inner
383    }
384}
385
386impl ops::Deref for PersistedLogEntry {
387    type Target = EventLogEntry;
388
389    fn deref(&self) -> &Self::Target {
390        &self.inner
391    }
392}
393
394impl_db_record!(
395    key = UnordedEventLogId,
396    value = UnorderedEventLogEntry,
397    db_prefix = DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
398);
399
400#[derive(Clone, Debug, Encodable, Decodable)]
401pub struct UnorderedEventLogIdPrefixAll;
402
403impl_db_lookup!(
404    key = UnordedEventLogId,
405    query_prefix = UnorderedEventLogIdPrefixAll
406);
407
408#[derive(Clone, Debug, Encodable, Decodable)]
409pub struct EventLogIdPrefixAll;
410
411#[derive(Clone, Debug, Encodable, Decodable)]
412pub struct EventLogIdPrefix(EventLogId);
413
414impl_db_record!(
415    key = EventLogId,
416    value = EventLogEntry,
417    db_prefix = DB_KEY_PREFIX_EVENT_LOG,
418);
419
420impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
421
422impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
423
424#[derive(
425    Copy,
426    Clone,
427    Debug,
428    Encodable,
429    Decodable,
430    Default,
431    PartialEq,
432    Eq,
433    PartialOrd,
434    Ord,
435    Serialize,
436    Deserialize,
437)]
438pub struct EventLogTrimableId(EventLogId);
439
440impl EventLogTrimableId {
441    fn next(&self) -> Self {
442        Self(self.0.next())
443    }
444
445    pub fn saturating_add(self, rhs: u64) -> Self {
446        Self(self.0.saturating_add(rhs))
447    }
448}
449
450impl From<u64> for EventLogTrimableId {
451    fn from(value: u64) -> Self {
452        Self(EventLogId(value))
453    }
454}
455
456#[derive(Clone, Debug, Encodable, Decodable)]
457pub struct EventLogTrimableIdPrefixAll;
458
459#[derive(Clone, Debug, Encodable, Decodable)]
460pub struct EventLogTrimableIdPrefix(EventLogId);
461
462impl_db_record!(
463    key = EventLogTrimableId,
464    value = EventLogEntry,
465    db_prefix = DB_KEY_PREFIX_EVENT_LOG_TRIMABLE,
466);
467
468impl_db_lookup!(
469    key = EventLogTrimableId,
470    query_prefix = EventLogTrimableIdPrefixAll
471);
472
473impl_db_lookup!(
474    key = EventLogTrimableId,
475    query_prefix = EventLogTrimableIdPrefix
476);
477
478#[apply(async_trait_maybe_send!)]
479pub trait DBTransactionEventLogExt {
480    #[allow(clippy::too_many_arguments)]
481    async fn log_event_raw(
482        &mut self,
483        log_ordering_wakeup_tx: watch::Sender<()>,
484        kind: EventKind,
485        module_kind: Option<ModuleKind>,
486        module_id: Option<ModuleInstanceId>,
487        payload: Vec<u8>,
488        persist: EventPersistence,
489    );
490
491    /// Log an event log event
492    ///
493    /// The event will start "unordered", but after it is committed an ordering
494    /// task will be notified to "order" it into a final ordered log.
495    async fn log_event<E>(
496        &mut self,
497        log_ordering_wakeup_tx: watch::Sender<()>,
498        module_id: Option<ModuleInstanceId>,
499        event: E,
500    ) where
501        E: Event + Send,
502    {
503        self.log_event_raw(
504            log_ordering_wakeup_tx,
505            E::KIND,
506            E::MODULE,
507            module_id,
508            serde_json::to_vec(&event).expect("Serialization can't fail"),
509            <E as Event>::PERSISTENCE,
510        )
511        .await;
512    }
513
514    /// Next [`EventLogId`] to use for new ordered events.
515    ///
516    /// Used by ordering task, though might be
517    /// useful to get the current count of events.
518    async fn get_next_event_log_id(&mut self) -> EventLogId;
519
520    /// Next [`EventLogTrimableId`] to use for new ordered trimable events
521    async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId;
522
523    /// Read a part of the event log.
524    async fn get_event_log(
525        &mut self,
526        pos: Option<EventLogId>,
527        limit: u64,
528    ) -> Vec<PersistedLogEntry>;
529
530    async fn get_event_log_trimable(
531        &mut self,
532        pos: Option<EventLogTrimableId>,
533        limit: u64,
534    ) -> Vec<PersistedLogEntry>;
535}
536
537#[apply(async_trait_maybe_send!)]
538impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
539where
540    Cap: Send,
541{
542    async fn log_event_raw(
543        &mut self,
544        log_ordering_wakeup_tx: watch::Sender<()>,
545        kind: EventKind,
546        module_kind: Option<ModuleKind>,
547        module_id: Option<ModuleInstanceId>,
548        payload: Vec<u8>,
549        persist: EventPersistence,
550    ) {
551        assert_eq!(
552            module_kind.is_some(),
553            module_id.is_some(),
554            "Events of modules must have module_id set"
555        );
556
557        let unordered_id = UnordedEventLogId::new();
558        trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
559
560        if self
561            .insert_entry(
562                &unordered_id,
563                &UnorderedEventLogEntry {
564                    flags: match persist {
565                        EventPersistence::Transient => 0,
566                        EventPersistence::Trimable => UnorderedEventLogEntry::FLAG_TRIMABLE,
567                        EventPersistence::Persistent => UnorderedEventLogEntry::FLAG_PERSIST,
568                    },
569                    inner: EventLogEntry {
570                        kind,
571                        module: module_kind.map(|kind| (kind, module_id.unwrap())),
572                        ts_usecs: unordered_id.ts_usecs,
573                        payload,
574                    },
575                },
576            )
577            .await
578            .is_some()
579        {
580            panic!("Trying to overwrite event in the client event log");
581        }
582        self.on_commit(move || {
583            log_ordering_wakeup_tx.send_replace(());
584        });
585    }
586
587    async fn get_next_event_log_id(&mut self) -> EventLogId {
588        self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
589            .await
590            .next()
591            .await
592            .map(|(k, _v)| k.next())
593            .unwrap_or_default()
594    }
595
596    async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId {
597        EventLogTrimableId(
598            self.find_by_prefix_sorted_descending(&EventLogTrimableIdPrefixAll)
599                .await
600                .next()
601                .await
602                .map(|(k, _v)| k.0.next())
603                .unwrap_or_default(),
604        )
605    }
606
607    async fn get_event_log(
608        &mut self,
609        pos: Option<EventLogId>,
610        limit: u64,
611    ) -> Vec<PersistedLogEntry> {
612        let pos = pos.unwrap_or_default();
613        self.find_by_range(pos..pos.saturating_add(limit))
614            .await
615            .map(|(k, v)| PersistedLogEntry { id: k, inner: v })
616            .collect()
617            .await
618    }
619
620    async fn get_event_log_trimable(
621        &mut self,
622        pos: Option<EventLogTrimableId>,
623        limit: u64,
624    ) -> Vec<PersistedLogEntry> {
625        let pos = pos.unwrap_or_default();
626        self.find_by_range(pos..pos.saturating_add(limit))
627            .await
628            .map(|(k, v)| PersistedLogEntry { id: k.0, inner: v })
629            .collect()
630            .await
631    }
632}
633
634/// Trims old entries from the trimable event log
635async fn trim_trimable_log(db: &Database, current_time_usecs: u64) {
636    let mut dbtx = db.begin_transaction().await;
637
638    let current_trimable_id = dbtx.get_next_event_log_trimable_id().await;
639    let min_id_threshold = current_trimable_id
640        .0
641        .saturating_sub(TRIMABLE_EVENTLOG_MIN_ID_AGE);
642    let min_ts_threshold = current_time_usecs.saturating_sub(TRIMABLE_EVENTLOG_MIN_TS_AGE);
643
644    let entries_to_delete: Vec<_> = dbtx
645        .find_by_prefix(&EventLogTrimableIdPrefixAll)
646        .await
647        .take_while(|(id, entry)| {
648            let id_old_enough = id.0 <= min_id_threshold;
649            let ts_old_enough = entry.ts_usecs <= min_ts_threshold;
650
651            // Continue while both conditions are met
652            async move { id_old_enough && ts_old_enough }
653        })
654        .take(TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS)
655        .map(|(id, _entry)| id)
656        .collect()
657        .await;
658
659    for id in &entries_to_delete {
660        dbtx.remove_entry(id).await;
661    }
662
663    dbtx.commit_tx().await;
664}
665
666/// The code that handles new unordered events and rewriters them fully ordered
667/// into the final event log.
668pub async fn run_event_log_ordering_task(
669    db: Database,
670    mut log_ordering_task_wakeup: watch::Receiver<()>,
671    log_event_added: watch::Sender<()>,
672    log_event_added_transient: broadcast::Sender<EventLogEntry>,
673) {
674    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
675
676    let current_time_usecs =
677        u64::try_from(fedimint_core::time::duration_since_epoch().as_micros()).unwrap_or(u64::MAX);
678    trim_trimable_log(&db, current_time_usecs).await;
679
680    let mut next_entry_id = db
681        .begin_transaction_nc()
682        .await
683        .get_next_event_log_id()
684        .await;
685    let mut next_entry_id_trimable = db
686        .begin_transaction_nc()
687        .await
688        .get_next_event_log_trimable_id()
689        .await;
690
691    loop {
692        let mut dbtx = db.begin_transaction().await;
693
694        let unordered_events = dbtx
695            .find_by_prefix(&UnorderedEventLogIdPrefixAll)
696            .await
697            .collect::<Vec<_>>()
698            .await;
699        trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
700
701        for (unordered_id, entry) in &unordered_events {
702            assert!(
703                dbtx.remove_entry(unordered_id).await.is_some(),
704                "Must never fail to remove entry"
705            );
706            if entry.persist() {
707                // Non-trimable events get persisted in both the default event log
708                // and trimable event log
709                if !entry.trimable() {
710                    assert!(
711                        dbtx.insert_entry(&next_entry_id, &entry.inner)
712                            .await
713                            .is_none(),
714                        "Must never overwrite existing event"
715                    );
716                    trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
717                    next_entry_id = next_entry_id.next();
718                }
719
720                // Trimable events get persisted only in trimable log
721                assert!(
722                    dbtx.insert_entry(&next_entry_id_trimable, &entry.inner)
723                        .await
724                        .is_none(),
725                    "Must never overwrite existing event"
726                );
727                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
728                next_entry_id_trimable = next_entry_id_trimable.next();
729            } else {
730                // Transient events don't get persisted at all
731                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
732                dbtx.on_commit({
733                    let log_event_added_transient = log_event_added_transient.clone();
734                    let entry = entry.inner.clone();
735
736                    move || {
737                        // we ignore the no-subscribers
738                        let _ = log_event_added_transient.send(entry);
739                    }
740                });
741            }
742        }
743
744        // This thread is the only thread deleting already existing element of unordered
745        // log and inserting new elements into ordered log, so it should never
746        // fail to commit.
747        dbtx.commit_tx().await;
748        if !unordered_events.is_empty() {
749            log_event_added.send_replace(());
750        }
751
752        trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
753        if log_ordering_task_wakeup.changed().await.is_err() {
754            break;
755        }
756    }
757
758    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
759}
760
761/// Persistent tracker of a position in the event log
762///
763/// During processing of event log the downstream consumer needs to
764/// keep track of which event were processed already. It needs to do it
765/// atomically and persist it so event in the presence of crashes no
766/// event is ever missed or processed twice.
767///
768/// This trait allows abstracting away where and how is such position stored,
769/// e.g. which key exactly is used, in what prefixed namespace etc.
770///
771/// ## Trimmable vs Non-Trimable log
772///
773/// See [`EventPersistence`]
774#[apply(async_trait_maybe_send!)]
775pub trait EventLogNonTrimableTracker {
776    // Store position in the event log
777    async fn store(
778        &mut self,
779        dbtx: &mut DatabaseTransaction<NonCommittable>,
780        pos: EventLogId,
781    ) -> anyhow::Result<()>;
782
783    /// Load the last previous stored position (or None if never stored)
784    async fn load(
785        &mut self,
786        dbtx: &mut DatabaseTransaction<NonCommittable>,
787    ) -> anyhow::Result<Option<EventLogId>>;
788}
789pub type DynEventLogTracker = Box<dyn EventLogNonTrimableTracker>;
790
791/// Like [`EventLogNonTrimableTracker`] but for trimable event log
792#[apply(async_trait_maybe_send!)]
793pub trait EventLogTrimableTracker {
794    // Store position in the event log
795    async fn store(
796        &mut self,
797        dbtx: &mut DatabaseTransaction<NonCommittable>,
798        pos: EventLogTrimableId,
799    ) -> anyhow::Result<()>;
800
801    /// Load the last previous stored position (or None if never stored)
802    async fn load(
803        &mut self,
804        dbtx: &mut DatabaseTransaction<NonCommittable>,
805    ) -> anyhow::Result<Option<EventLogTrimableId>>;
806}
807pub type DynEventLogTrimableTracker = Box<dyn EventLogTrimableTracker>;
808
809pub async fn handle_events<F, R>(
810    db: Database,
811    mut tracker: DynEventLogTracker,
812    mut log_event_added: watch::Receiver<()>,
813    call_fn: F,
814) -> anyhow::Result<()>
815where
816    F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
817    R: Future<Output = anyhow::Result<()>>,
818{
819    let mut next_key: EventLogId = tracker
820        .load(&mut db.begin_transaction_nc().await)
821        .await?
822        .unwrap_or_default();
823
824    trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
825
826    loop {
827        let mut dbtx = db.begin_transaction().await;
828
829        match dbtx.get_value(&next_key).await {
830            Some(event) => {
831                (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
832
833                next_key = next_key.next();
834
835                tracker.store(&mut dbtx.to_ref_nc(), next_key).await?;
836
837                dbtx.commit_tx().await;
838            }
839            _ => {
840                if log_event_added.changed().await.is_err() {
841                    break Ok(());
842                }
843            }
844        }
845    }
846}
847
848pub async fn handle_trimable_events<F, R>(
849    db: Database,
850    mut tracker: DynEventLogTrimableTracker,
851    mut log_event_added: watch::Receiver<()>,
852    call_fn: F,
853) -> anyhow::Result<()>
854where
855    F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
856    R: Future<Output = anyhow::Result<()>>,
857{
858    let mut next_key: EventLogTrimableId = tracker
859        .load(&mut db.begin_transaction_nc().await)
860        .await?
861        .unwrap_or_default();
862    trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling trimable events");
863
864    loop {
865        let mut dbtx = db.begin_transaction().await;
866
867        match dbtx.get_value(&next_key).await {
868            Some(event) => {
869                (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
870
871                next_key = next_key.next();
872                tracker.store(&mut dbtx.to_ref_nc(), next_key).await?;
873
874                dbtx.commit_tx().await;
875            }
876            _ => {
877                if log_event_added.changed().await.is_err() {
878                    break Ok(());
879                }
880            }
881        }
882    }
883}
884
885/// Filters the `PersistedLogEntries` by the `EventKind` and
886/// `ModuleKind`.
887pub fn filter_events_by_kind<'a, I>(
888    all_events: I,
889    module_kind: ModuleKind,
890    event_kind: EventKind,
891) -> impl Iterator<Item = &'a PersistedLogEntry> + 'a
892where
893    I: IntoIterator<Item = &'a PersistedLogEntry> + 'a,
894{
895    all_events.into_iter().filter(move |e| {
896        if let Some((m, _)) = &e.inner.module {
897            e.inner.kind == event_kind && *m == module_kind
898        } else {
899            false
900        }
901    })
902}
903
904/// Joins two sets of events on a predicate.
905///
906/// This function computes a "nested loop join" by first computing the cross
907/// product of the start event vector and the success/failure event vectors. The
908/// resulting cartesian product is then filtered according to the join predicate
909/// supplied in the parameters.
910///
911/// This function is intended for small data sets. If the data set relations
912/// grow, this function should implement a different join algorithm or be moved
913/// out of the gateway.
914pub fn join_events<'a, L, R, Res>(
915    events_l: &'a [&PersistedLogEntry],
916    events_r: &'a [&PersistedLogEntry],
917    max_time_distance: Option<Duration>,
918    predicate: impl Fn(L, R, Duration) -> Option<Res> + 'a,
919) -> impl Iterator<Item = Res> + 'a
920where
921    L: Event,
922    R: Event,
923{
924    events_l
925        .iter()
926        .cartesian_product(events_r)
927        .filter_map(move |(l, r)| {
928            if L::MODULE.as_ref() == l.as_raw().module_kind()
929                && L::KIND == l.as_raw().kind
930                && R::MODULE.as_ref() == r.as_raw().module_kind()
931                && R::KIND == r.as_raw().kind
932                && let Some(latency_usecs) = r.inner.ts_usecs.checked_sub(l.inner.ts_usecs)
933                && max_time_distance.is_none_or(|max| u128::from(latency_usecs) <= max.as_millis())
934                && let Some(l) = l.as_raw().to_event()
935                && let Some(r) = r.as_raw().to_event()
936            {
937                predicate(l, r, Duration::from_millis(latency_usecs))
938            } else {
939                None
940            }
941        })
942}
943
944/// Helper struct for storing computed data about outgoing and incoming
945/// payments.
946#[derive(Debug, Default)]
947pub struct StructuredPaymentEvents {
948    pub latencies_usecs: Vec<u64>,
949    pub fees: Vec<Amount>,
950    pub latencies_failure: Vec<u64>,
951}
952
953impl StructuredPaymentEvents {
954    pub fn new(
955        success_stats: &[(u64, Amount)],
956        failure_stats: Vec<u64>,
957    ) -> StructuredPaymentEvents {
958        let mut events = StructuredPaymentEvents {
959            latencies_usecs: success_stats.iter().map(|(l, _)| *l).collect(),
960            fees: success_stats.iter().map(|(_, f)| *f).collect(),
961            latencies_failure: failure_stats,
962        };
963        events.sort();
964        events
965    }
966
967    /// Combines this `StructuredPaymentEvents` with the `other`
968    /// `StructuredPaymentEvents` by appending all of the internal vectors.
969    pub fn combine(&mut self, other: &mut StructuredPaymentEvents) {
970        self.latencies_usecs.append(&mut other.latencies_usecs);
971        self.fees.append(&mut other.fees);
972        self.latencies_failure.append(&mut other.latencies_failure);
973        self.sort();
974    }
975
976    /// Sorts this `StructuredPaymentEvents` by sorting all of the internal
977    /// vectors.
978    fn sort(&mut self) {
979        self.latencies_usecs.sort_unstable();
980        self.fees.sort_unstable();
981        self.latencies_failure.sort_unstable();
982    }
983}
984
985#[cfg(test)]
986mod tests;