1#![allow(clippy::needless_lifetimes)]
2
3use std::borrow::Cow;
15use std::str::FromStr;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Duration;
18use std::{fmt, ops};
19
20use fedimint_core::core::{ModuleInstanceId, ModuleKind};
21use fedimint_core::db::{
22 Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, NonCommittable,
23};
24use fedimint_core::encoding::{Decodable, Encodable};
25use fedimint_core::{Amount, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
26use fedimint_logging::LOG_CLIENT_EVENT_LOG;
27use futures::{Future, StreamExt};
28use itertools::Itertools;
29use serde::{Deserialize, Serialize};
30use tokio::sync::{broadcast, watch};
31use tracing::{debug, trace};
32
33pub const DB_KEY_PREFIX_UNORDERED_EVENT_LOG: u8 = 0x3a;
42pub const DB_KEY_PREFIX_EVENT_LOG: u8 = 0x39;
43pub const DB_KEY_PREFIX_EVENT_LOG_TRIMABLE: u8 = 0x41;
44
45const TRIMABLE_EVENTLOG_MIN_ID_AGE: u64 = 10_000;
47const TRIMABLE_EVENTLOG_MIN_TS_AGE: u64 = 14 * 24 * 60 * 60 * 1_000_000;
49const TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS: usize = 100_000;
51
52pub enum EventPersistence {
77 Transient,
79 Trimable,
81 Persistent,
84}
85
86pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
87 const MODULE: Option<ModuleKind>;
88 const KIND: EventKind;
89 const PERSISTENCE: EventPersistence;
90}
91
92static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
95
96#[derive(Debug, Encodable, Decodable)]
103pub struct UnordedEventLogId {
104 ts_usecs: u64,
105 counter: u64,
106}
107
108impl UnordedEventLogId {
109 fn new() -> Self {
110 Self {
111 ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
112 .unwrap_or(u64::MAX),
114 counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
115 }
116 }
117}
118
119#[derive(
122 Copy,
123 Clone,
124 Debug,
125 Encodable,
126 Decodable,
127 Default,
128 PartialEq,
129 Eq,
130 PartialOrd,
131 Ord,
132 Serialize,
133 Deserialize,
134)]
135pub struct EventLogId(u64);
136
137impl EventLogId {
138 pub const LOG_START: EventLogId = EventLogId(0);
139
140 fn next(self) -> EventLogId {
141 Self(self.0 + 1)
142 }
143
144 pub fn saturating_add(self, rhs: u64) -> EventLogId {
145 Self(self.0.saturating_add(rhs))
146 }
147
148 pub fn saturating_sub(self, rhs: u64) -> EventLogId {
149 Self(self.0.saturating_sub(rhs))
150 }
151
152 pub fn checked_sub(self, rhs: u64) -> Option<EventLogId> {
153 self.0.checked_sub(rhs).map(EventLogId)
154 }
155}
156
157impl From<EventLogId> for u64 {
158 fn from(value: EventLogId) -> Self {
159 value.0
160 }
161}
162
163impl FromStr for EventLogId {
164 type Err = <u64 as FromStr>::Err;
165
166 fn from_str(s: &str) -> Result<Self, Self::Err> {
167 u64::from_str(s).map(Self)
168 }
169}
170
171impl fmt::Display for EventLogId {
172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
173 write!(f, "{}", self.0)
174 }
175}
176
177#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
178pub struct EventKind(Cow<'static, str>);
179
180impl EventKind {
181 pub const fn from_static(value: &'static str) -> Self {
182 Self(Cow::Borrowed(value))
183 }
184}
185
186impl<'s> From<&'s str> for EventKind {
187 fn from(value: &'s str) -> Self {
188 Self(Cow::Owned(value.to_owned()))
189 }
190}
191
192impl From<String> for EventKind {
193 fn from(value: String) -> Self {
194 Self(Cow::Owned(value))
195 }
196}
197
198impl fmt::Display for EventKind {
199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200 f.write_str(&self.0)
201 }
202}
203
204#[derive(Debug, Encodable, Decodable, Clone)]
205pub struct UnorderedEventLogEntry {
206 pub flags: u8,
207 pub inner: EventLogEntry,
208}
209
210impl UnorderedEventLogEntry {
211 pub const FLAG_PERSIST: u8 = 1;
212 pub const FLAG_TRIMABLE: u8 = 2;
213
214 fn persist(&self) -> bool {
215 self.flags & Self::FLAG_PERSIST != 0
216 }
217
218 fn trimable(&self) -> bool {
219 self.flags & Self::FLAG_TRIMABLE != 0
220 }
221}
222
223#[derive(Debug, Encodable, Decodable, Clone)]
224pub struct EventLogEntry {
225 pub kind: EventKind,
232
233 pub module: Option<(ModuleKind, ModuleInstanceId)>,
241
242 pub ts_usecs: u64,
244
245 pub payload: Vec<u8>,
248}
249
250impl EventLogEntry {
251 pub fn module_kind(&self) -> Option<&ModuleKind> {
252 self.module.as_ref().map(|m| &m.0)
253 }
254
255 pub fn module_id(&self) -> Option<ModuleInstanceId> {
256 self.module.as_ref().map(|m| m.1)
257 }
258
259 pub fn to_event<E>(&self) -> Option<E>
261 where
262 E: Event,
263 {
264 serde_json::from_slice(&self.payload).ok()
265 }
266}
267
268#[derive(Debug, Clone)]
270pub struct PersistedLogEntry {
271 id: EventLogId,
272 inner: EventLogEntry,
273}
274
275impl Serialize for PersistedLogEntry {
276 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
277 where
278 S: serde::Serializer,
279 {
280 use serde::ser::SerializeStruct;
281
282 let mut state = serializer.serialize_struct("PersistedLogEntry", 5)?;
283 state.serialize_field("id", &self.id)?;
284 state.serialize_field("kind", &self.inner.kind)?;
285 state.serialize_field("module", &self.inner.module)?;
286 state.serialize_field("ts_usecs", &self.inner.ts_usecs)?;
287
288 let payload_value: serde_json::Value = serde_json::from_slice(&self.inner.payload)
290 .unwrap_or_else(|_| serde_json::Value::String(hex::encode(&self.inner.payload)));
291 state.serialize_field("payload", &payload_value)?;
292
293 state.end()
294 }
295}
296
297impl<'de> Deserialize<'de> for PersistedLogEntry {
298 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
299 where
300 D: serde::Deserializer<'de>,
301 {
302 use serde::de::{self, MapAccess, Visitor};
303
304 #[derive(Deserialize)]
305 #[serde(field_identifier, rename_all = "snake_case")]
306 enum Field {
307 Id,
308 Kind,
309 Module,
310 TsUsecs,
311 Payload,
312 }
313
314 struct PersistedLogEntryVisitor;
315
316 impl<'de> Visitor<'de> for PersistedLogEntryVisitor {
317 type Value = PersistedLogEntry;
318
319 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
320 formatter.write_str("struct PersistedLogEntry")
321 }
322
323 fn visit_map<V>(self, mut map: V) -> Result<PersistedLogEntry, V::Error>
324 where
325 V: MapAccess<'de>,
326 {
327 let mut id = None;
328 let mut kind = None;
329 let mut module = None;
330 let mut ts_usecs = None;
331 let mut payload = None;
332
333 while let Some(key) = map.next_key()? {
334 match key {
335 Field::Id => {
336 if id.is_some() {
337 return Err(de::Error::duplicate_field("id"));
338 }
339 id = Some(map.next_value()?);
340 }
341 Field::Kind => {
342 if kind.is_some() {
343 return Err(de::Error::duplicate_field("kind"));
344 }
345 kind = Some(map.next_value()?);
346 }
347 Field::Module => {
348 if module.is_some() {
349 return Err(de::Error::duplicate_field("module"));
350 }
351 module = Some(map.next_value()?);
352 }
353 Field::TsUsecs => {
354 if ts_usecs.is_some() {
355 return Err(de::Error::duplicate_field("ts_usecs"));
356 }
357 ts_usecs = Some(map.next_value()?);
358 }
359 Field::Payload => {
360 if payload.is_some() {
361 return Err(de::Error::duplicate_field("payload"));
362 }
363 let value: serde_json::Value = map.next_value()?;
364 payload = Some(serde_json::to_vec(&value).map_err(de::Error::custom)?);
365 }
366 }
367 }
368
369 let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
370 let kind = kind.ok_or_else(|| de::Error::missing_field("kind"))?;
371 let module = module.ok_or_else(|| de::Error::missing_field("module"))?;
372 let ts_usecs = ts_usecs.ok_or_else(|| de::Error::missing_field("ts_usecs"))?;
373 let payload = payload.ok_or_else(|| de::Error::missing_field("payload"))?;
374
375 Ok(PersistedLogEntry {
376 id,
377 inner: EventLogEntry {
378 kind,
379 module,
380 ts_usecs,
381 payload,
382 },
383 })
384 }
385 }
386
387 const FIELDS: &[&str] = &["id", "kind", "module", "ts_usecs", "payload"];
388 deserializer.deserialize_struct("PersistedLogEntry", FIELDS, PersistedLogEntryVisitor)
389 }
390}
391
392impl PersistedLogEntry {
393 pub fn id(&self) -> EventLogId {
394 self.id
395 }
396
397 pub fn as_raw(&self) -> &EventLogEntry {
398 &self.inner
399 }
400}
401
402impl ops::Deref for PersistedLogEntry {
403 type Target = EventLogEntry;
404
405 fn deref(&self) -> &Self::Target {
406 &self.inner
407 }
408}
409
410impl_db_record!(
411 key = UnordedEventLogId,
412 value = UnorderedEventLogEntry,
413 db_prefix = DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
414);
415
416#[derive(Clone, Debug, Encodable, Decodable)]
417pub struct UnorderedEventLogIdPrefixAll;
418
419impl_db_lookup!(
420 key = UnordedEventLogId,
421 query_prefix = UnorderedEventLogIdPrefixAll
422);
423
424#[derive(Clone, Debug, Encodable, Decodable)]
425pub struct EventLogIdPrefixAll;
426
427#[derive(Clone, Debug, Encodable, Decodable)]
428pub struct EventLogIdPrefix(EventLogId);
429
430impl_db_record!(
431 key = EventLogId,
432 value = EventLogEntry,
433 db_prefix = DB_KEY_PREFIX_EVENT_LOG,
434);
435
436impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
437
438impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
439
440#[derive(
441 Copy,
442 Clone,
443 Debug,
444 Encodable,
445 Decodable,
446 Default,
447 PartialEq,
448 Eq,
449 PartialOrd,
450 Ord,
451 Serialize,
452 Deserialize,
453)]
454pub struct EventLogTrimableId(EventLogId);
455
456impl EventLogTrimableId {
457 fn next(&self) -> Self {
458 Self(self.0.next())
459 }
460
461 pub fn saturating_add(self, rhs: u64) -> Self {
462 Self(self.0.saturating_add(rhs))
463 }
464}
465
466impl From<u64> for EventLogTrimableId {
467 fn from(value: u64) -> Self {
468 Self(EventLogId(value))
469 }
470}
471
472#[derive(Clone, Debug, Encodable, Decodable)]
473pub struct EventLogTrimableIdPrefixAll;
474
475#[derive(Clone, Debug, Encodable, Decodable)]
476pub struct EventLogTrimableIdPrefix(EventLogId);
477
478impl_db_record!(
479 key = EventLogTrimableId,
480 value = EventLogEntry,
481 db_prefix = DB_KEY_PREFIX_EVENT_LOG_TRIMABLE,
482);
483
484impl_db_lookup!(
485 key = EventLogTrimableId,
486 query_prefix = EventLogTrimableIdPrefixAll
487);
488
489impl_db_lookup!(
490 key = EventLogTrimableId,
491 query_prefix = EventLogTrimableIdPrefix
492);
493
494#[apply(async_trait_maybe_send!)]
495pub trait DBTransactionEventLogExt {
496 #[allow(clippy::too_many_arguments)]
497 async fn log_event_raw(
498 &mut self,
499 log_ordering_wakeup_tx: watch::Sender<()>,
500 kind: EventKind,
501 module_kind: Option<ModuleKind>,
502 module_id: Option<ModuleInstanceId>,
503 payload: Vec<u8>,
504 persist: EventPersistence,
505 );
506
507 async fn log_event<E>(
512 &mut self,
513 log_ordering_wakeup_tx: watch::Sender<()>,
514 module_id: Option<ModuleInstanceId>,
515 event: E,
516 ) where
517 E: Event + Send,
518 {
519 self.log_event_raw(
520 log_ordering_wakeup_tx,
521 E::KIND,
522 E::MODULE,
523 module_id,
524 serde_json::to_vec(&event).expect("Serialization can't fail"),
525 <E as Event>::PERSISTENCE,
526 )
527 .await;
528 }
529
530 async fn get_next_event_log_id(&mut self) -> EventLogId;
535
536 async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId;
538
539 async fn get_event_log(
541 &mut self,
542 pos: Option<EventLogId>,
543 limit: u64,
544 ) -> Vec<PersistedLogEntry>;
545
546 async fn get_event_log_trimable(
547 &mut self,
548 pos: Option<EventLogTrimableId>,
549 limit: u64,
550 ) -> Vec<PersistedLogEntry>;
551}
552
553#[apply(async_trait_maybe_send!)]
554impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
555where
556 Cap: Send,
557{
558 async fn log_event_raw(
559 &mut self,
560 log_ordering_wakeup_tx: watch::Sender<()>,
561 kind: EventKind,
562 module_kind: Option<ModuleKind>,
563 module_id: Option<ModuleInstanceId>,
564 payload: Vec<u8>,
565 persist: EventPersistence,
566 ) {
567 assert_eq!(
568 module_kind.is_some(),
569 module_id.is_some(),
570 "Events of modules must have module_id set"
571 );
572
573 let unordered_id = UnordedEventLogId::new();
574 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
575
576 if self
577 .insert_entry(
578 &unordered_id,
579 &UnorderedEventLogEntry {
580 flags: match persist {
581 EventPersistence::Transient => 0,
582 EventPersistence::Trimable => UnorderedEventLogEntry::FLAG_TRIMABLE,
583 EventPersistence::Persistent => UnorderedEventLogEntry::FLAG_PERSIST,
584 },
585 inner: EventLogEntry {
586 kind,
587 module: module_kind.map(|kind| (kind, module_id.unwrap())),
588 ts_usecs: unordered_id.ts_usecs,
589 payload,
590 },
591 },
592 )
593 .await
594 .is_some()
595 {
596 panic!("Trying to overwrite event in the client event log");
597 }
598 self.on_commit(move || {
599 log_ordering_wakeup_tx.send_replace(());
600 });
601 }
602
603 async fn get_next_event_log_id(&mut self) -> EventLogId {
604 self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
605 .await
606 .next()
607 .await
608 .map(|(k, _v)| k.next())
609 .unwrap_or_default()
610 }
611
612 async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId {
613 EventLogTrimableId(
614 self.find_by_prefix_sorted_descending(&EventLogTrimableIdPrefixAll)
615 .await
616 .next()
617 .await
618 .map(|(k, _v)| k.0.next())
619 .unwrap_or_default(),
620 )
621 }
622
623 async fn get_event_log(
624 &mut self,
625 pos: Option<EventLogId>,
626 limit: u64,
627 ) -> Vec<PersistedLogEntry> {
628 let pos = pos.unwrap_or_default();
629 self.find_by_range(pos..pos.saturating_add(limit))
630 .await
631 .map(|(k, v)| PersistedLogEntry { id: k, inner: v })
632 .collect()
633 .await
634 }
635
636 async fn get_event_log_trimable(
637 &mut self,
638 pos: Option<EventLogTrimableId>,
639 limit: u64,
640 ) -> Vec<PersistedLogEntry> {
641 let pos = pos.unwrap_or_default();
642 self.find_by_range(pos..pos.saturating_add(limit))
643 .await
644 .map(|(k, v)| PersistedLogEntry { id: k.0, inner: v })
645 .collect()
646 .await
647 }
648}
649
650async fn trim_trimable_log(db: &Database, current_time_usecs: u64) {
652 let mut dbtx = db.begin_transaction().await;
653
654 let current_trimable_id = dbtx.get_next_event_log_trimable_id().await;
655 let min_id_threshold = current_trimable_id
656 .0
657 .saturating_sub(TRIMABLE_EVENTLOG_MIN_ID_AGE);
658 let min_ts_threshold = current_time_usecs.saturating_sub(TRIMABLE_EVENTLOG_MIN_TS_AGE);
659
660 let entries_to_delete: Vec<_> = dbtx
661 .find_by_prefix(&EventLogTrimableIdPrefixAll)
662 .await
663 .take_while(|(id, entry)| {
664 let id_old_enough = id.0 <= min_id_threshold;
665 let ts_old_enough = entry.ts_usecs <= min_ts_threshold;
666
667 async move { id_old_enough && ts_old_enough }
669 })
670 .take(TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS)
671 .map(|(id, _entry)| id)
672 .collect()
673 .await;
674
675 for id in &entries_to_delete {
676 dbtx.remove_entry(id).await;
677 }
678
679 dbtx.commit_tx().await;
680}
681
682pub async fn run_event_log_ordering_task(
685 db: Database,
686 mut log_ordering_task_wakeup: watch::Receiver<()>,
687 log_event_added: watch::Sender<()>,
688 log_event_added_transient: broadcast::Sender<EventLogEntry>,
689) {
690 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
691
692 let current_time_usecs =
693 u64::try_from(fedimint_core::time::duration_since_epoch().as_micros()).unwrap_or(u64::MAX);
694 trim_trimable_log(&db, current_time_usecs).await;
695
696 let mut next_entry_id = db
697 .begin_transaction_nc()
698 .await
699 .get_next_event_log_id()
700 .await;
701 let mut next_entry_id_trimable = db
702 .begin_transaction_nc()
703 .await
704 .get_next_event_log_trimable_id()
705 .await;
706
707 loop {
708 let mut dbtx = db.begin_transaction().await;
709
710 let unordered_events = dbtx
711 .find_by_prefix(&UnorderedEventLogIdPrefixAll)
712 .await
713 .collect::<Vec<_>>()
714 .await;
715 trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
716
717 for (unordered_id, entry) in &unordered_events {
718 assert!(
719 dbtx.remove_entry(unordered_id).await.is_some(),
720 "Must never fail to remove entry"
721 );
722 if entry.persist() {
723 if !entry.trimable() {
726 assert!(
727 dbtx.insert_entry(&next_entry_id, &entry.inner)
728 .await
729 .is_none(),
730 "Must never overwrite existing event"
731 );
732 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
733 next_entry_id = next_entry_id.next();
734 }
735
736 assert!(
738 dbtx.insert_entry(&next_entry_id_trimable, &entry.inner)
739 .await
740 .is_none(),
741 "Must never overwrite existing event"
742 );
743 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
744 next_entry_id_trimable = next_entry_id_trimable.next();
745 } else {
746 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
748 dbtx.on_commit({
749 let log_event_added_transient = log_event_added_transient.clone();
750 let entry = entry.inner.clone();
751
752 move || {
753 let _ = log_event_added_transient.send(entry);
755 }
756 });
757 }
758 }
759
760 dbtx.commit_tx().await;
764 if !unordered_events.is_empty() {
765 log_event_added.send_replace(());
766 }
767
768 trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
769 if log_ordering_task_wakeup.changed().await.is_err() {
770 break;
771 }
772 }
773
774 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
775}
776
777#[apply(async_trait_maybe_send!)]
791pub trait EventLogNonTrimableTracker {
792 async fn store(
794 &mut self,
795 dbtx: &mut DatabaseTransaction<NonCommittable>,
796 pos: EventLogId,
797 ) -> anyhow::Result<()>;
798
799 async fn load(
801 &mut self,
802 dbtx: &mut DatabaseTransaction<NonCommittable>,
803 ) -> anyhow::Result<Option<EventLogId>>;
804}
805pub type DynEventLogTracker = Box<dyn EventLogNonTrimableTracker>;
806
807#[apply(async_trait_maybe_send!)]
809pub trait EventLogTrimableTracker {
810 async fn store(
812 &mut self,
813 dbtx: &mut DatabaseTransaction<NonCommittable>,
814 pos: EventLogTrimableId,
815 ) -> anyhow::Result<()>;
816
817 async fn load(
819 &mut self,
820 dbtx: &mut DatabaseTransaction<NonCommittable>,
821 ) -> anyhow::Result<Option<EventLogTrimableId>>;
822}
823pub type DynEventLogTrimableTracker = Box<dyn EventLogTrimableTracker>;
824
825pub async fn handle_events<F, R>(
826 db: Database,
827 mut tracker: DynEventLogTracker,
828 mut log_event_added: watch::Receiver<()>,
829 call_fn: F,
830) -> anyhow::Result<()>
831where
832 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
833 R: Future<Output = anyhow::Result<()>>,
834{
835 let mut next_key: EventLogId = tracker
836 .load(&mut db.begin_transaction_nc().await)
837 .await?
838 .unwrap_or_default();
839
840 trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
841
842 loop {
843 let mut dbtx = db.begin_transaction().await;
844
845 match dbtx.get_value(&next_key).await {
846 Some(event) => {
847 (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
848
849 next_key = next_key.next();
850
851 tracker.store(&mut dbtx.to_ref_nc(), next_key).await?;
852
853 dbtx.commit_tx().await;
854 }
855 _ => {
856 if log_event_added.changed().await.is_err() {
857 break Ok(());
858 }
859 }
860 }
861 }
862}
863
864pub async fn handle_trimable_events<F, R>(
865 db: Database,
866 mut tracker: DynEventLogTrimableTracker,
867 mut log_event_added: watch::Receiver<()>,
868 call_fn: F,
869) -> anyhow::Result<()>
870where
871 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
872 R: Future<Output = anyhow::Result<()>>,
873{
874 let mut next_key: EventLogTrimableId = tracker
875 .load(&mut db.begin_transaction_nc().await)
876 .await?
877 .unwrap_or_default();
878 trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling trimable events");
879
880 loop {
881 let mut dbtx = db.begin_transaction().await;
882
883 match dbtx.get_value(&next_key).await {
884 Some(event) => {
885 (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
886
887 next_key = next_key.next();
888 tracker.store(&mut dbtx.to_ref_nc(), next_key).await?;
889
890 dbtx.commit_tx().await;
891 }
892 _ => {
893 if log_event_added.changed().await.is_err() {
894 break Ok(());
895 }
896 }
897 }
898 }
899}
900
901pub fn filter_events_by_kind<'a, I>(
904 all_events: I,
905 module_kind: ModuleKind,
906 event_kind: EventKind,
907) -> impl Iterator<Item = &'a PersistedLogEntry> + 'a
908where
909 I: IntoIterator<Item = &'a PersistedLogEntry> + 'a,
910{
911 all_events.into_iter().filter(move |e| {
912 if let Some((m, _)) = &e.inner.module {
913 e.inner.kind == event_kind && *m == module_kind
914 } else {
915 false
916 }
917 })
918}
919
920pub fn join_events<'a, L, R, Res>(
931 events_l: &'a [&PersistedLogEntry],
932 events_r: &'a [&PersistedLogEntry],
933 max_time_distance: Option<Duration>,
934 predicate: impl Fn(L, R, Duration) -> Option<Res> + 'a,
935) -> impl Iterator<Item = Res> + 'a
936where
937 L: Event,
938 R: Event,
939{
940 events_l
941 .iter()
942 .cartesian_product(events_r)
943 .filter_map(move |(l, r)| {
944 if L::MODULE.as_ref() == l.as_raw().module_kind()
945 && L::KIND == l.as_raw().kind
946 && R::MODULE.as_ref() == r.as_raw().module_kind()
947 && R::KIND == r.as_raw().kind
948 && let Some(latency_usecs) = r.inner.ts_usecs.checked_sub(l.inner.ts_usecs)
949 && max_time_distance.is_none_or(|max| u128::from(latency_usecs) <= max.as_millis())
950 && let Some(l) = l.as_raw().to_event()
951 && let Some(r) = r.as_raw().to_event()
952 {
953 predicate(l, r, Duration::from_millis(latency_usecs))
954 } else {
955 None
956 }
957 })
958}
959
960#[derive(Debug, Default)]
963pub struct StructuredPaymentEvents {
964 pub latencies_usecs: Vec<u64>,
965 pub fees: Vec<Amount>,
966 pub latencies_failure: Vec<u64>,
967}
968
969impl StructuredPaymentEvents {
970 pub fn new(
971 success_stats: &[(u64, Amount)],
972 failure_stats: Vec<u64>,
973 ) -> StructuredPaymentEvents {
974 let mut events = StructuredPaymentEvents {
975 latencies_usecs: success_stats.iter().map(|(l, _)| *l).collect(),
976 fees: success_stats.iter().map(|(_, f)| *f).collect(),
977 latencies_failure: failure_stats,
978 };
979 events.sort();
980 events
981 }
982
983 pub fn combine(&mut self, other: &mut StructuredPaymentEvents) {
986 self.latencies_usecs.append(&mut other.latencies_usecs);
987 self.fees.append(&mut other.fees);
988 self.latencies_failure.append(&mut other.latencies_failure);
989 self.sort();
990 }
991
992 fn sort(&mut self) {
995 self.latencies_usecs.sort_unstable();
996 self.fees.sort_unstable();
997 self.latencies_failure.sort_unstable();
998 }
999}
1000
1001#[cfg(test)]
1002mod tests;