fedimint_eventlog/
lib.rs

1#![allow(clippy::needless_lifetimes)]
2
3//! Client Event Log
4//!
5//! The goal here is to maintain a single, ordered, append only
6//! log of all important client-side events: low or high level,
7//! and move as much of coordination between different parts of
8//! the system in a natural and decomposed way.
9//!
10//! Any event log "follower" can just keep going through
11//! all events and react to ones it is interested in (and understands),
12//! potentially emitting events of its own, and atomically updating persisted
13//! event log position ("cursor") of events that were already processed.
14use std::borrow::Cow;
15use std::str::FromStr;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use fedimint_core::core::{ModuleInstanceId, ModuleKind};
19use fedimint_core::db::{
20    Database, DatabaseKey, DatabaseRecord, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
21    NonCommittable,
22};
23use fedimint_core::encoding::{Decodable, Encodable};
24use fedimint_core::task::{MaybeSend, MaybeSync};
25use fedimint_core::{Amount, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
26use fedimint_logging::LOG_CLIENT_EVENT_LOG;
27use futures::{Future, StreamExt};
28use itertools::Itertools;
29use serde::{Deserialize, Serialize};
30use tokio::sync::{broadcast, watch};
31use tracing::{debug, trace};
32
33/// DB prefixes hardcoded for use of the event log
34/// `fedimint-eventlog` was extracted from `fedimint-client` to help
35/// include/re-use in other part of the code. But fundamentally its role
36/// is to implement event log in the client.
37/// There is currently no way to inject the prefixes to use for db records,
38/// so we use these constants to keep them in sync. Any other app that will
39/// want to store its own even log, will need to use the exact same prefixes,
40/// which in practice should not be a problem.
41pub const DB_KEY_PREFIX_UNORDERED_EVENT_LOG: u8 = 0x3a;
42pub const DB_KEY_PREFIX_EVENT_LOG: u8 = 0x39;
43
44pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
45    const MODULE: Option<ModuleKind>;
46    const KIND: EventKind;
47    const PERSIST: bool = true;
48}
49
50/// An counter that resets on every restart, that guarantees that
51/// [`UnordedEventLogId`]s don't conflict with each other.
52static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
53
54/// A self-allocated ID that is mostly ordered
55///
56/// The goal here is to avoid concurrent database transaction
57/// conflicts due the ID allocation. Instead they are picked based on
58/// a time and a counter, so they are mostly but not strictly ordered and
59/// monotonic, and even more imporantly: not contiguous.
60#[derive(Debug, Encodable, Decodable)]
61pub struct UnordedEventLogId {
62    ts_usecs: u64,
63    counter: u64,
64}
65
66impl UnordedEventLogId {
67    fn new() -> Self {
68        Self {
69            ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
70                // This will never happen
71                .unwrap_or(u64::MAX),
72            counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
73        }
74    }
75}
76
77/// Ordered, contiguous ID space, which is easy for event log followers to
78/// track.
79#[derive(
80    Copy,
81    Clone,
82    Debug,
83    Encodable,
84    Decodable,
85    Default,
86    PartialEq,
87    Eq,
88    PartialOrd,
89    Ord,
90    Serialize,
91    Deserialize,
92)]
93pub struct EventLogId(u64);
94
95impl EventLogId {
96    pub const LOG_START: EventLogId = EventLogId(0);
97
98    fn next(self) -> EventLogId {
99        Self(self.0 + 1)
100    }
101
102    pub fn saturating_add(self, rhs: u64) -> EventLogId {
103        Self(self.0.saturating_add(rhs))
104    }
105
106    pub fn saturating_sub(self, rhs: u64) -> EventLogId {
107        Self(self.0.saturating_sub(rhs))
108    }
109}
110
111impl FromStr for EventLogId {
112    type Err = <u64 as FromStr>::Err;
113
114    fn from_str(s: &str) -> Result<Self, Self::Err> {
115        u64::from_str(s).map(Self)
116    }
117}
118
119#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
120pub struct EventKind(Cow<'static, str>);
121
122impl EventKind {
123    pub const fn from_static(value: &'static str) -> Self {
124        Self(Cow::Borrowed(value))
125    }
126}
127
128impl<'s> From<&'s str> for EventKind {
129    fn from(value: &'s str) -> Self {
130        Self(Cow::Owned(value.to_owned()))
131    }
132}
133
134impl From<String> for EventKind {
135    fn from(value: String) -> Self {
136        Self(Cow::Owned(value))
137    }
138}
139
140#[derive(Debug, Encodable, Decodable, Clone)]
141pub struct UnorderedEventLogEntry {
142    pub persist: bool,
143    pub inner: EventLogEntry,
144}
145
146#[derive(Debug, Encodable, Decodable, Clone)]
147pub struct EventLogEntry {
148    /// Type/kind of the event
149    ///
150    /// Any part of the client is free to self-allocate identifier, denoting a
151    /// certain kind of an event. Notably one event kind have multiple
152    /// instances. E.g. "successful wallet deposit" can be an event kind,
153    /// and it can happen multiple times with different payloads.
154    pub kind: EventKind,
155
156    /// To prevent accidental conflicts between `kind`s, a module kind the
157    /// given event kind belong is used as well.
158    ///
159    /// Note: the meaning of this field is mostly about which part of the code
160    /// defines this event kind. Oftentime a core (non-module)-defined event
161    /// will refer in some way to a module. It should use a separate `module_id`
162    /// field in the `payload`, instead of this field.
163    pub module: Option<(ModuleKind, ModuleInstanceId)>,
164
165    /// Timestamp in microseconds after unix epoch
166    ts_usecs: u64,
167
168    /// Event-kind specific payload, typically encoded as a json string for
169    /// flexibility.
170    pub payload: Vec<u8>,
171}
172
173/// Struct used for processing log entries after they have been persisted.
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct PersistedLogEntry {
176    pub event_id: EventLogId,
177    pub event_kind: EventKind,
178    pub module: Option<(ModuleKind, u16)>,
179    pub timestamp: u64,
180    pub value: serde_json::Value,
181}
182
183impl_db_record!(
184    key = UnordedEventLogId,
185    value = UnorderedEventLogEntry,
186    db_prefix = DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
187);
188
189#[derive(Clone, Debug, Encodable, Decodable)]
190pub struct UnorderedEventLogIdPrefixAll;
191
192impl_db_lookup!(
193    key = UnordedEventLogId,
194    query_prefix = UnorderedEventLogIdPrefixAll
195);
196
197#[derive(Clone, Debug, Encodable, Decodable)]
198pub struct EventLogIdPrefixAll;
199
200#[derive(Clone, Debug, Encodable, Decodable)]
201pub struct EventLogIdPrefix(EventLogId);
202
203impl_db_record!(
204    key = EventLogId,
205    value = EventLogEntry,
206    db_prefix = DB_KEY_PREFIX_EVENT_LOG,
207);
208
209impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
210
211impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
212
213#[apply(async_trait_maybe_send!)]
214pub trait DBTransactionEventLogExt {
215    async fn log_event_raw(
216        &mut self,
217        log_ordering_wakeup_tx: watch::Sender<()>,
218        kind: EventKind,
219        module_kind: Option<ModuleKind>,
220        module_id: Option<ModuleInstanceId>,
221        payload: Vec<u8>,
222        persist: bool,
223    );
224
225    /// Log an event log event
226    ///
227    /// The event will start "unordered", but after it is committed an ordering
228    /// task will be notified to "order" it into a final ordered log.
229    async fn log_event<E>(
230        &mut self,
231        log_ordering_wakeup_tx: watch::Sender<()>,
232        module_id: Option<ModuleInstanceId>,
233        event: E,
234    ) where
235        E: Event + Send,
236    {
237        self.log_event_raw(
238            log_ordering_wakeup_tx,
239            E::KIND,
240            E::MODULE,
241            module_id,
242            serde_json::to_vec(&event).expect("Serialization can't fail"),
243            <E as Event>::PERSIST,
244        )
245        .await;
246    }
247
248    /// Next [`EventLogId`] to use for new ordered events.
249    ///
250    /// Used by ordering task, though might be
251    /// useful to get the current count of events.
252    async fn get_next_event_log_id(&mut self) -> EventLogId;
253
254    /// Read a part of the event log.
255    async fn get_event_log(
256        &mut self,
257        pos: Option<EventLogId>,
258        limit: u64,
259    ) -> Vec<PersistedLogEntry>;
260}
261
262#[apply(async_trait_maybe_send!)]
263impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
264where
265    Cap: Send,
266{
267    async fn log_event_raw(
268        &mut self,
269        log_ordering_wakeup_tx: watch::Sender<()>,
270        kind: EventKind,
271        module_kind: Option<ModuleKind>,
272        module_id: Option<ModuleInstanceId>,
273        payload: Vec<u8>,
274        persist: bool,
275    ) {
276        assert_eq!(
277            module_kind.is_some(),
278            module_id.is_some(),
279            "Events of modules must have module_id set"
280        );
281
282        let unordered_id = UnordedEventLogId::new();
283        trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
284
285        if self
286            .insert_entry(
287                &unordered_id,
288                &UnorderedEventLogEntry {
289                    persist,
290                    inner: EventLogEntry {
291                        kind,
292                        module: module_kind.map(|kind| (kind, module_id.unwrap())),
293                        ts_usecs: unordered_id.ts_usecs,
294                        payload,
295                    },
296                },
297            )
298            .await
299            .is_some()
300        {
301            panic!("Trying to overwrite event in the client event log");
302        }
303        self.on_commit(move || {
304            let _ = log_ordering_wakeup_tx.send(());
305        });
306    }
307
308    async fn get_next_event_log_id(&mut self) -> EventLogId {
309        self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
310            .await
311            .next()
312            .await
313            .map(|(k, _v)| k.next())
314            .unwrap_or_default()
315    }
316
317    async fn get_event_log(
318        &mut self,
319        pos: Option<EventLogId>,
320        limit: u64,
321    ) -> Vec<PersistedLogEntry> {
322        let pos = pos.unwrap_or_default();
323        self.find_by_range(pos..pos.saturating_add(limit))
324            .await
325            .map(|(k, v)| PersistedLogEntry {
326                event_id: k,
327                event_kind: v.kind,
328                module: v.module,
329                timestamp: v.ts_usecs,
330                value: serde_json::from_slice(&v.payload).unwrap_or_default(),
331            })
332            .collect()
333            .await
334    }
335}
336
337/// The code that handles new unordered events and rewriters them fully ordered
338/// into the final event log.
339pub async fn run_event_log_ordering_task(
340    db: Database,
341    mut log_ordering_task_wakeup: watch::Receiver<()>,
342    log_event_added: watch::Sender<()>,
343    log_event_added_transient: broadcast::Sender<EventLogEntry>,
344) {
345    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
346    let mut next_entry_id = db
347        .begin_transaction_nc()
348        .await
349        .get_next_event_log_id()
350        .await;
351
352    loop {
353        let mut dbtx = db.begin_transaction().await;
354
355        let unordered_events = dbtx
356            .find_by_prefix(&UnorderedEventLogIdPrefixAll)
357            .await
358            .collect::<Vec<_>>()
359            .await;
360        trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
361
362        for (unordered_id, entry) in &unordered_events {
363            assert!(
364                dbtx.remove_entry(unordered_id).await.is_some(),
365                "Must never fail to remove entry"
366            );
367            if entry.persist {
368                assert!(
369                    dbtx.insert_entry(&next_entry_id, &entry.inner)
370                        .await
371                        .is_none(),
372                    "Must never overwrite existing event"
373                );
374                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
375                next_entry_id = next_entry_id.next();
376            } else {
377                trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
378                dbtx.on_commit({
379                    let log_event_added_transient = log_event_added_transient.clone();
380                    let entry = entry.inner.clone();
381
382                    move || {
383                        // we ignore the no-subscribers
384                        let _ = log_event_added_transient.send(entry);
385                    }
386                });
387            }
388        }
389
390        // This thread is the only thread deleting already existing element of unordered
391        // log and inserting new elements into ordered log, so it should never
392        // fail to commit.
393        dbtx.commit_tx().await;
394        if !unordered_events.is_empty() {
395            let _ = log_event_added.send(());
396        }
397
398        trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
399        if log_ordering_task_wakeup.changed().await.is_err() {
400            break;
401        }
402    }
403
404    debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
405}
406
407pub async fn handle_events<F, R, K>(
408    db: Database,
409    pos_key: &K,
410    mut log_event_added: watch::Receiver<()>,
411    call_fn: F,
412) -> anyhow::Result<()>
413where
414    K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
415    K: DatabaseRecord<Value = EventLogId>,
416    F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
417    R: Future<Output = anyhow::Result<()>>,
418{
419    let mut next_key: EventLogId = db
420        .begin_transaction_nc()
421        .await
422        .get_value(pos_key)
423        .await
424        .unwrap_or_default();
425
426    trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
427
428    loop {
429        let mut dbtx = db.begin_transaction().await;
430
431        match dbtx.get_value(&next_key).await {
432            Some(event) => {
433                (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
434
435                next_key = next_key.next();
436                dbtx.insert_entry(pos_key, &next_key).await;
437
438                dbtx.commit_tx().await;
439            }
440            _ => {
441                if log_event_added.changed().await.is_err() {
442                    break Ok(());
443                }
444            }
445        }
446    }
447}
448
449/// Filters the `PersistedLogEntries` by the `EventKind` and
450/// `ModuleKind`.
451pub fn filter_events_by_kind<'a, I>(
452    all_events: I,
453    module_kind: ModuleKind,
454    event_kind: EventKind,
455) -> impl Iterator<Item = &'a PersistedLogEntry> + 'a
456where
457    I: IntoIterator<Item = &'a PersistedLogEntry> + 'a,
458{
459    all_events.into_iter().filter(move |e| {
460        if let Some((m, _)) = &e.module {
461            e.event_kind == event_kind && *m == module_kind
462        } else {
463            false
464        }
465    })
466}
467
468/// Joins two sets of events on a predicate.
469///
470/// This function computes a "nested loop join" by first computing the cross
471/// product of the start event vector and the success/failure event vectors. The
472/// resulting cartesian product is then filtered according to the join predicate
473/// supplied in the parameters.
474///
475/// This function is intended for small data sets. If the data set relations
476/// grow, this function should implement a different join algorithm or be moved
477/// out of the gateway.
478pub fn join_events<'a, L, R, Res>(
479    events_l: &'a [&PersistedLogEntry],
480    events_r: &'a [&PersistedLogEntry],
481    predicate: impl Fn(L, R, u64) -> Option<Res> + 'a,
482) -> impl Iterator<Item = Res> + 'a
483where
484    L: Event,
485    R: Event,
486{
487    events_l
488        .iter()
489        .cartesian_product(events_r)
490        .filter_map(move |(l, r)| {
491            if let Some(latency) = r.timestamp.checked_sub(l.timestamp) {
492                let event_l: L =
493                    serde_json::from_value(l.value.clone()).expect("could not parse JSON");
494                let event_r: R =
495                    serde_json::from_value(r.value.clone()).expect("could not parse JSON");
496                predicate(event_l, event_r, latency)
497            } else {
498                None
499            }
500        })
501}
502
503/// Helper struct for storing computed data about outgoing and incoming
504/// payments.
505#[derive(Debug, Default)]
506pub struct StructuredPaymentEvents {
507    pub latencies: Vec<u64>,
508    pub fees: Vec<Amount>,
509    pub latencies_failure: Vec<u64>,
510}
511
512impl StructuredPaymentEvents {
513    pub fn new(
514        success_stats: &[(u64, Amount)],
515        failure_stats: Vec<u64>,
516    ) -> StructuredPaymentEvents {
517        let mut events = StructuredPaymentEvents {
518            latencies: success_stats.iter().map(|(l, _)| *l).collect(),
519            fees: success_stats.iter().map(|(_, f)| *f).collect(),
520            latencies_failure: failure_stats,
521        };
522        events.sort();
523        events
524    }
525
526    /// Combines this `StructuredPaymentEvents` with the `other`
527    /// `StructuredPaymentEvents` by appending all of the internal vectors.
528    pub fn combine(&mut self, other: &mut StructuredPaymentEvents) {
529        self.latencies.append(&mut other.latencies);
530        self.fees.append(&mut other.fees);
531        self.latencies_failure.append(&mut other.latencies_failure);
532        self.sort();
533    }
534
535    /// Sorts this `StructuredPaymentEvents` by sorting all of the internal
536    /// vectors.
537    fn sort(&mut self) {
538        self.latencies.sort_unstable();
539        self.fees.sort_unstable();
540        self.latencies_failure.sort_unstable();
541    }
542}
543
544#[cfg(test)]
545mod tests {
546    use std::sync::Arc;
547    use std::sync::atomic::AtomicU8;
548
549    use anyhow::bail;
550    use fedimint_core::db::IRawDatabaseExt as _;
551    use fedimint_core::db::mem_impl::MemDatabase;
552    use fedimint_core::encoding::{Decodable, Encodable};
553    use fedimint_core::impl_db_record;
554    use fedimint_core::task::TaskGroup;
555    use tokio::sync::{broadcast, watch};
556    use tokio::try_join;
557    use tracing::info;
558
559    use super::{
560        DBTransactionEventLogExt as _, EventLogId, handle_events, run_event_log_ordering_task,
561    };
562    use crate::EventKind;
563
564    #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
565    pub struct TestLogIdKey;
566
567    impl_db_record!(key = TestLogIdKey, value = EventLogId, db_prefix = 0x00,);
568
569    #[test_log::test(tokio::test)]
570    async fn sanity_handle_events() {
571        let db = MemDatabase::new().into_database();
572        let tg = TaskGroup::new();
573
574        let (log_event_added_tx, log_event_added_rx) = watch::channel(());
575        let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
576        let (log_event_added_transient_tx, _log_event_added_transient_rx) =
577            broadcast::channel(1024);
578
579        tg.spawn_cancellable(
580            "event log ordering task",
581            run_event_log_ordering_task(
582                db.clone(),
583                log_ordering_wakeup_rx,
584                log_event_added_tx,
585                log_event_added_transient_tx,
586            ),
587        );
588
589        let counter = Arc::new(AtomicU8::new(0));
590
591        let _ = try_join!(
592            handle_events(
593                db.clone(),
594                &TestLogIdKey,
595                log_event_added_rx,
596                move |_dbtx, event| {
597                    let counter = counter.clone();
598                    Box::pin(async move {
599                        info!("{event:?}");
600
601                        assert_eq!(
602                            event.kind,
603                            EventKind::from(format!(
604                                "{}",
605                                counter.load(std::sync::atomic::Ordering::Relaxed)
606                            ))
607                        );
608
609                        if counter.load(std::sync::atomic::Ordering::Relaxed) == 4 {
610                            bail!("Time to wrap up");
611                        }
612                        counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
613                        Ok(())
614                    })
615                },
616            ),
617            async {
618                for i in 0..=4 {
619                    let mut dbtx = db.begin_transaction().await;
620                    dbtx.log_event_raw(
621                        log_ordering_wakeup_tx.clone(),
622                        EventKind::from(format!("{i}")),
623                        None,
624                        None,
625                        vec![],
626                        true,
627                    )
628                    .await;
629
630                    dbtx.commit_tx().await;
631                }
632
633                Ok(())
634            }
635        );
636    }
637}