1#![allow(clippy::needless_lifetimes)]
2
3use std::borrow::Cow;
15use std::ops;
16use std::str::FromStr;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::time::Duration;
19
20use fedimint_core::core::{ModuleInstanceId, ModuleKind};
21use fedimint_core::db::{
22 Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, NonCommittable,
23};
24use fedimint_core::encoding::{Decodable, Encodable};
25use fedimint_core::{Amount, apply, async_trait_maybe_send, impl_db_lookup, impl_db_record};
26use fedimint_logging::LOG_CLIENT_EVENT_LOG;
27use futures::{Future, StreamExt};
28use itertools::Itertools;
29use serde::{Deserialize, Serialize};
30use tokio::sync::{broadcast, watch};
31use tracing::{debug, trace};
32
33pub const DB_KEY_PREFIX_UNORDERED_EVENT_LOG: u8 = 0x3a;
42pub const DB_KEY_PREFIX_EVENT_LOG: u8 = 0x39;
43pub const DB_KEY_PREFIX_EVENT_LOG_TRIMABLE: u8 = 0x41;
44
45const TRIMABLE_EVENTLOG_MIN_ID_AGE: u64 = 10_000;
47const TRIMABLE_EVENTLOG_MIN_TS_AGE: u64 = 14 * 24 * 60 * 60 * 1_000_000;
49const TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS: usize = 100_000;
51
52pub enum EventPersistence {
77 Transient,
79 Trimable,
81 Persistent,
84}
85
86pub trait Event: serde::Serialize + serde::de::DeserializeOwned {
87 const MODULE: Option<ModuleKind>;
88 const KIND: EventKind;
89 const PERSISTENCE: EventPersistence;
90}
91
92static UNORDEREDED_EVENT_LOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
95
96#[derive(Debug, Encodable, Decodable)]
103pub struct UnordedEventLogId {
104 ts_usecs: u64,
105 counter: u64,
106}
107
108impl UnordedEventLogId {
109 fn new() -> Self {
110 Self {
111 ts_usecs: u64::try_from(fedimint_core::time::duration_since_epoch().as_micros())
112 .unwrap_or(u64::MAX),
114 counter: UNORDEREDED_EVENT_LOG_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
115 }
116 }
117}
118
119#[derive(
122 Copy,
123 Clone,
124 Debug,
125 Encodable,
126 Decodable,
127 Default,
128 PartialEq,
129 Eq,
130 PartialOrd,
131 Ord,
132 Serialize,
133 Deserialize,
134)]
135pub struct EventLogId(u64);
136
137impl EventLogId {
138 pub const LOG_START: EventLogId = EventLogId(0);
139
140 fn next(self) -> EventLogId {
141 Self(self.0 + 1)
142 }
143
144 pub fn saturating_add(self, rhs: u64) -> EventLogId {
145 Self(self.0.saturating_add(rhs))
146 }
147
148 pub fn saturating_sub(self, rhs: u64) -> EventLogId {
149 Self(self.0.saturating_sub(rhs))
150 }
151}
152
153impl From<EventLogId> for u64 {
154 fn from(value: EventLogId) -> Self {
155 value.0
156 }
157}
158
159impl FromStr for EventLogId {
160 type Err = <u64 as FromStr>::Err;
161
162 fn from_str(s: &str) -> Result<Self, Self::Err> {
163 u64::from_str(s).map(Self)
164 }
165}
166
167#[derive(Debug, Clone, Encodable, Decodable, PartialEq, Eq, Serialize, Deserialize)]
168pub struct EventKind(Cow<'static, str>);
169
170impl EventKind {
171 pub const fn from_static(value: &'static str) -> Self {
172 Self(Cow::Borrowed(value))
173 }
174}
175
176impl<'s> From<&'s str> for EventKind {
177 fn from(value: &'s str) -> Self {
178 Self(Cow::Owned(value.to_owned()))
179 }
180}
181
182impl From<String> for EventKind {
183 fn from(value: String) -> Self {
184 Self(Cow::Owned(value))
185 }
186}
187
188#[derive(Debug, Encodable, Decodable, Clone)]
189pub struct UnorderedEventLogEntry {
190 pub flags: u8,
191 pub inner: EventLogEntry,
192}
193
194impl UnorderedEventLogEntry {
195 pub const FLAG_PERSIST: u8 = 1;
196 pub const FLAG_TRIMABLE: u8 = 2;
197
198 fn persist(&self) -> bool {
199 self.flags & Self::FLAG_PERSIST != 0
200 }
201
202 fn trimable(&self) -> bool {
203 self.flags & Self::FLAG_TRIMABLE != 0
204 }
205}
206
207#[derive(Debug, Encodable, Decodable, Clone)]
208pub struct EventLogEntry {
209 pub kind: EventKind,
216
217 pub module: Option<(ModuleKind, ModuleInstanceId)>,
225
226 pub ts_usecs: u64,
228
229 pub payload: Vec<u8>,
232}
233
234impl EventLogEntry {
235 pub fn module_kind(&self) -> Option<&ModuleKind> {
236 self.module.as_ref().map(|m| &m.0)
237 }
238
239 pub fn module_id(&self) -> Option<ModuleInstanceId> {
240 self.module.as_ref().map(|m| m.1)
241 }
242
243 pub fn to_event<E>(&self) -> Option<E>
245 where
246 E: Event,
247 {
248 serde_json::from_slice(&self.payload).ok()
249 }
250}
251
252#[derive(Debug, Clone)]
254pub struct PersistedLogEntry {
255 id: EventLogId,
256 inner: EventLogEntry,
257}
258
259impl Serialize for PersistedLogEntry {
260 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
261 where
262 S: serde::Serializer,
263 {
264 use serde::ser::SerializeStruct;
265
266 let mut state = serializer.serialize_struct("PersistedLogEntry", 5)?;
267 state.serialize_field("id", &self.id)?;
268 state.serialize_field("kind", &self.inner.kind)?;
269 state.serialize_field("module", &self.inner.module)?;
270 state.serialize_field("ts_usecs", &self.inner.ts_usecs)?;
271
272 let payload_value: serde_json::Value = serde_json::from_slice(&self.inner.payload)
274 .unwrap_or_else(|_| serde_json::Value::String(hex::encode(&self.inner.payload)));
275 state.serialize_field("payload", &payload_value)?;
276
277 state.end()
278 }
279}
280
281impl<'de> Deserialize<'de> for PersistedLogEntry {
282 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
283 where
284 D: serde::Deserializer<'de>,
285 {
286 use serde::de::{self, MapAccess, Visitor};
287
288 #[derive(Deserialize)]
289 #[serde(field_identifier, rename_all = "snake_case")]
290 enum Field {
291 Id,
292 Kind,
293 Module,
294 TsUsecs,
295 Payload,
296 }
297
298 struct PersistedLogEntryVisitor;
299
300 impl<'de> Visitor<'de> for PersistedLogEntryVisitor {
301 type Value = PersistedLogEntry;
302
303 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
304 formatter.write_str("struct PersistedLogEntry")
305 }
306
307 fn visit_map<V>(self, mut map: V) -> Result<PersistedLogEntry, V::Error>
308 where
309 V: MapAccess<'de>,
310 {
311 let mut id = None;
312 let mut kind = None;
313 let mut module = None;
314 let mut ts_usecs = None;
315 let mut payload = None;
316
317 while let Some(key) = map.next_key()? {
318 match key {
319 Field::Id => {
320 if id.is_some() {
321 return Err(de::Error::duplicate_field("id"));
322 }
323 id = Some(map.next_value()?);
324 }
325 Field::Kind => {
326 if kind.is_some() {
327 return Err(de::Error::duplicate_field("kind"));
328 }
329 kind = Some(map.next_value()?);
330 }
331 Field::Module => {
332 if module.is_some() {
333 return Err(de::Error::duplicate_field("module"));
334 }
335 module = Some(map.next_value()?);
336 }
337 Field::TsUsecs => {
338 if ts_usecs.is_some() {
339 return Err(de::Error::duplicate_field("ts_usecs"));
340 }
341 ts_usecs = Some(map.next_value()?);
342 }
343 Field::Payload => {
344 if payload.is_some() {
345 return Err(de::Error::duplicate_field("payload"));
346 }
347 let value: serde_json::Value = map.next_value()?;
348 payload = Some(serde_json::to_vec(&value).map_err(de::Error::custom)?);
349 }
350 }
351 }
352
353 let id = id.ok_or_else(|| de::Error::missing_field("id"))?;
354 let kind = kind.ok_or_else(|| de::Error::missing_field("kind"))?;
355 let module = module.ok_or_else(|| de::Error::missing_field("module"))?;
356 let ts_usecs = ts_usecs.ok_or_else(|| de::Error::missing_field("ts_usecs"))?;
357 let payload = payload.ok_or_else(|| de::Error::missing_field("payload"))?;
358
359 Ok(PersistedLogEntry {
360 id,
361 inner: EventLogEntry {
362 kind,
363 module,
364 ts_usecs,
365 payload,
366 },
367 })
368 }
369 }
370
371 const FIELDS: &[&str] = &["id", "kind", "module", "ts_usecs", "payload"];
372 deserializer.deserialize_struct("PersistedLogEntry", FIELDS, PersistedLogEntryVisitor)
373 }
374}
375
376impl PersistedLogEntry {
377 pub fn id(&self) -> EventLogId {
378 self.id
379 }
380
381 pub fn as_raw(&self) -> &EventLogEntry {
382 &self.inner
383 }
384}
385
386impl ops::Deref for PersistedLogEntry {
387 type Target = EventLogEntry;
388
389 fn deref(&self) -> &Self::Target {
390 &self.inner
391 }
392}
393
394impl_db_record!(
395 key = UnordedEventLogId,
396 value = UnorderedEventLogEntry,
397 db_prefix = DB_KEY_PREFIX_UNORDERED_EVENT_LOG,
398);
399
400#[derive(Clone, Debug, Encodable, Decodable)]
401pub struct UnorderedEventLogIdPrefixAll;
402
403impl_db_lookup!(
404 key = UnordedEventLogId,
405 query_prefix = UnorderedEventLogIdPrefixAll
406);
407
408#[derive(Clone, Debug, Encodable, Decodable)]
409pub struct EventLogIdPrefixAll;
410
411#[derive(Clone, Debug, Encodable, Decodable)]
412pub struct EventLogIdPrefix(EventLogId);
413
414impl_db_record!(
415 key = EventLogId,
416 value = EventLogEntry,
417 db_prefix = DB_KEY_PREFIX_EVENT_LOG,
418);
419
420impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefixAll);
421
422impl_db_lookup!(key = EventLogId, query_prefix = EventLogIdPrefix);
423
424#[derive(
425 Copy,
426 Clone,
427 Debug,
428 Encodable,
429 Decodable,
430 Default,
431 PartialEq,
432 Eq,
433 PartialOrd,
434 Ord,
435 Serialize,
436 Deserialize,
437)]
438pub struct EventLogTrimableId(EventLogId);
439
440impl EventLogTrimableId {
441 fn next(&self) -> Self {
442 Self(self.0.next())
443 }
444
445 pub fn saturating_add(self, rhs: u64) -> Self {
446 Self(self.0.saturating_add(rhs))
447 }
448}
449
450impl From<u64> for EventLogTrimableId {
451 fn from(value: u64) -> Self {
452 Self(EventLogId(value))
453 }
454}
455
456#[derive(Clone, Debug, Encodable, Decodable)]
457pub struct EventLogTrimableIdPrefixAll;
458
459#[derive(Clone, Debug, Encodable, Decodable)]
460pub struct EventLogTrimableIdPrefix(EventLogId);
461
462impl_db_record!(
463 key = EventLogTrimableId,
464 value = EventLogEntry,
465 db_prefix = DB_KEY_PREFIX_EVENT_LOG_TRIMABLE,
466);
467
468impl_db_lookup!(
469 key = EventLogTrimableId,
470 query_prefix = EventLogTrimableIdPrefixAll
471);
472
473impl_db_lookup!(
474 key = EventLogTrimableId,
475 query_prefix = EventLogTrimableIdPrefix
476);
477
478#[apply(async_trait_maybe_send!)]
479pub trait DBTransactionEventLogExt {
480 #[allow(clippy::too_many_arguments)]
481 async fn log_event_raw(
482 &mut self,
483 log_ordering_wakeup_tx: watch::Sender<()>,
484 kind: EventKind,
485 module_kind: Option<ModuleKind>,
486 module_id: Option<ModuleInstanceId>,
487 payload: Vec<u8>,
488 persist: EventPersistence,
489 );
490
491 async fn log_event<E>(
496 &mut self,
497 log_ordering_wakeup_tx: watch::Sender<()>,
498 module_id: Option<ModuleInstanceId>,
499 event: E,
500 ) where
501 E: Event + Send,
502 {
503 self.log_event_raw(
504 log_ordering_wakeup_tx,
505 E::KIND,
506 E::MODULE,
507 module_id,
508 serde_json::to_vec(&event).expect("Serialization can't fail"),
509 <E as Event>::PERSISTENCE,
510 )
511 .await;
512 }
513
514 async fn get_next_event_log_id(&mut self) -> EventLogId;
519
520 async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId;
522
523 async fn get_event_log(
525 &mut self,
526 pos: Option<EventLogId>,
527 limit: u64,
528 ) -> Vec<PersistedLogEntry>;
529
530 async fn get_event_log_trimable(
531 &mut self,
532 pos: Option<EventLogTrimableId>,
533 limit: u64,
534 ) -> Vec<PersistedLogEntry>;
535}
536
537#[apply(async_trait_maybe_send!)]
538impl<'tx, Cap> DBTransactionEventLogExt for DatabaseTransaction<'tx, Cap>
539where
540 Cap: Send,
541{
542 async fn log_event_raw(
543 &mut self,
544 log_ordering_wakeup_tx: watch::Sender<()>,
545 kind: EventKind,
546 module_kind: Option<ModuleKind>,
547 module_id: Option<ModuleInstanceId>,
548 payload: Vec<u8>,
549 persist: EventPersistence,
550 ) {
551 assert_eq!(
552 module_kind.is_some(),
553 module_id.is_some(),
554 "Events of modules must have module_id set"
555 );
556
557 let unordered_id = UnordedEventLogId::new();
558 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, "New unordered event log event");
559
560 if self
561 .insert_entry(
562 &unordered_id,
563 &UnorderedEventLogEntry {
564 flags: match persist {
565 EventPersistence::Transient => 0,
566 EventPersistence::Trimable => UnorderedEventLogEntry::FLAG_TRIMABLE,
567 EventPersistence::Persistent => UnorderedEventLogEntry::FLAG_PERSIST,
568 },
569 inner: EventLogEntry {
570 kind,
571 module: module_kind.map(|kind| (kind, module_id.unwrap())),
572 ts_usecs: unordered_id.ts_usecs,
573 payload,
574 },
575 },
576 )
577 .await
578 .is_some()
579 {
580 panic!("Trying to overwrite event in the client event log");
581 }
582 self.on_commit(move || {
583 log_ordering_wakeup_tx.send_replace(());
584 });
585 }
586
587 async fn get_next_event_log_id(&mut self) -> EventLogId {
588 self.find_by_prefix_sorted_descending(&EventLogIdPrefixAll)
589 .await
590 .next()
591 .await
592 .map(|(k, _v)| k.next())
593 .unwrap_or_default()
594 }
595
596 async fn get_next_event_log_trimable_id(&mut self) -> EventLogTrimableId {
597 EventLogTrimableId(
598 self.find_by_prefix_sorted_descending(&EventLogTrimableIdPrefixAll)
599 .await
600 .next()
601 .await
602 .map(|(k, _v)| k.0.next())
603 .unwrap_or_default(),
604 )
605 }
606
607 async fn get_event_log(
608 &mut self,
609 pos: Option<EventLogId>,
610 limit: u64,
611 ) -> Vec<PersistedLogEntry> {
612 let pos = pos.unwrap_or_default();
613 self.find_by_range(pos..pos.saturating_add(limit))
614 .await
615 .map(|(k, v)| PersistedLogEntry { id: k, inner: v })
616 .collect()
617 .await
618 }
619
620 async fn get_event_log_trimable(
621 &mut self,
622 pos: Option<EventLogTrimableId>,
623 limit: u64,
624 ) -> Vec<PersistedLogEntry> {
625 let pos = pos.unwrap_or_default();
626 self.find_by_range(pos..pos.saturating_add(limit))
627 .await
628 .map(|(k, v)| PersistedLogEntry { id: k.0, inner: v })
629 .collect()
630 .await
631 }
632}
633
634async fn trim_trimable_log(db: &Database, current_time_usecs: u64) {
636 let mut dbtx = db.begin_transaction().await;
637
638 let current_trimable_id = dbtx.get_next_event_log_trimable_id().await;
639 let min_id_threshold = current_trimable_id
640 .0
641 .saturating_sub(TRIMABLE_EVENTLOG_MIN_ID_AGE);
642 let min_ts_threshold = current_time_usecs.saturating_sub(TRIMABLE_EVENTLOG_MIN_TS_AGE);
643
644 let entries_to_delete: Vec<_> = dbtx
645 .find_by_prefix(&EventLogTrimableIdPrefixAll)
646 .await
647 .take_while(|(id, entry)| {
648 let id_old_enough = id.0 <= min_id_threshold;
649 let ts_old_enough = entry.ts_usecs <= min_ts_threshold;
650
651 async move { id_old_enough && ts_old_enough }
653 })
654 .take(TRIMABLE_EVENTLOG_MAX_TRIMMED_EVENTS)
655 .map(|(id, _entry)| id)
656 .collect()
657 .await;
658
659 for id in &entries_to_delete {
660 dbtx.remove_entry(id).await;
661 }
662
663 dbtx.commit_tx().await;
664}
665
666pub async fn run_event_log_ordering_task(
669 db: Database,
670 mut log_ordering_task_wakeup: watch::Receiver<()>,
671 log_event_added: watch::Sender<()>,
672 log_event_added_transient: broadcast::Sender<EventLogEntry>,
673) {
674 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task started");
675
676 let current_time_usecs =
677 u64::try_from(fedimint_core::time::duration_since_epoch().as_micros()).unwrap_or(u64::MAX);
678 trim_trimable_log(&db, current_time_usecs).await;
679
680 let mut next_entry_id = db
681 .begin_transaction_nc()
682 .await
683 .get_next_event_log_id()
684 .await;
685 let mut next_entry_id_trimable = db
686 .begin_transaction_nc()
687 .await
688 .get_next_event_log_trimable_id()
689 .await;
690
691 loop {
692 let mut dbtx = db.begin_transaction().await;
693
694 let unordered_events = dbtx
695 .find_by_prefix(&UnorderedEventLogIdPrefixAll)
696 .await
697 .collect::<Vec<_>>()
698 .await;
699 trace!(target: LOG_CLIENT_EVENT_LOG, num=unordered_events.len(), "Fetched unordered events");
700
701 for (unordered_id, entry) in &unordered_events {
702 assert!(
703 dbtx.remove_entry(unordered_id).await.is_some(),
704 "Must never fail to remove entry"
705 );
706 if entry.persist() {
707 if !entry.trimable() {
710 assert!(
711 dbtx.insert_entry(&next_entry_id, &entry.inner)
712 .await
713 .is_none(),
714 "Must never overwrite existing event"
715 );
716 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
717 next_entry_id = next_entry_id.next();
718 }
719
720 assert!(
722 dbtx.insert_entry(&next_entry_id_trimable, &entry.inner)
723 .await
724 .is_none(),
725 "Must never overwrite existing event"
726 );
727 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Ordered event log event");
728 next_entry_id_trimable = next_entry_id_trimable.next();
729 } else {
730 trace!(target: LOG_CLIENT_EVENT_LOG, ?unordered_id, id=?next_entry_id, "Transient event log event");
732 dbtx.on_commit({
733 let log_event_added_transient = log_event_added_transient.clone();
734 let entry = entry.inner.clone();
735
736 move || {
737 let _ = log_event_added_transient.send(entry);
739 }
740 });
741 }
742 }
743
744 dbtx.commit_tx().await;
748 if !unordered_events.is_empty() {
749 log_event_added.send_replace(());
750 }
751
752 trace!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task waits for more events");
753 if log_ordering_task_wakeup.changed().await.is_err() {
754 break;
755 }
756 }
757
758 debug!(target: LOG_CLIENT_EVENT_LOG, "Event log ordering task finished");
759}
760
761#[apply(async_trait_maybe_send!)]
775pub trait EventLogNonTrimableTracker {
776 async fn store(
778 &mut self,
779 dbtx: &mut DatabaseTransaction<NonCommittable>,
780 pos: EventLogId,
781 ) -> anyhow::Result<()>;
782
783 async fn load(
785 &mut self,
786 dbtx: &mut DatabaseTransaction<NonCommittable>,
787 ) -> anyhow::Result<Option<EventLogId>>;
788}
789pub type DynEventLogTracker = Box<dyn EventLogNonTrimableTracker>;
790
791#[apply(async_trait_maybe_send!)]
793pub trait EventLogTrimableTracker {
794 async fn store(
796 &mut self,
797 dbtx: &mut DatabaseTransaction<NonCommittable>,
798 pos: EventLogTrimableId,
799 ) -> anyhow::Result<()>;
800
801 async fn load(
803 &mut self,
804 dbtx: &mut DatabaseTransaction<NonCommittable>,
805 ) -> anyhow::Result<Option<EventLogTrimableId>>;
806}
807pub type DynEventLogTrimableTracker = Box<dyn EventLogTrimableTracker>;
808
809pub async fn handle_events<F, R>(
810 db: Database,
811 mut tracker: DynEventLogTracker,
812 mut log_event_added: watch::Receiver<()>,
813 call_fn: F,
814) -> anyhow::Result<()>
815where
816 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
817 R: Future<Output = anyhow::Result<()>>,
818{
819 let mut next_key: EventLogId = tracker
820 .load(&mut db.begin_transaction_nc().await)
821 .await?
822 .unwrap_or_default();
823
824 trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling events");
825
826 loop {
827 let mut dbtx = db.begin_transaction().await;
828
829 match dbtx.get_value(&next_key).await {
830 Some(event) => {
831 (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
832
833 next_key = next_key.next();
834
835 tracker.store(&mut dbtx.to_ref_nc(), next_key).await?;
836
837 dbtx.commit_tx().await;
838 }
839 _ => {
840 if log_event_added.changed().await.is_err() {
841 break Ok(());
842 }
843 }
844 }
845 }
846}
847
848pub async fn handle_trimable_events<F, R>(
849 db: Database,
850 mut tracker: DynEventLogTrimableTracker,
851 mut log_event_added: watch::Receiver<()>,
852 call_fn: F,
853) -> anyhow::Result<()>
854where
855 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
856 R: Future<Output = anyhow::Result<()>>,
857{
858 let mut next_key: EventLogTrimableId = tracker
859 .load(&mut db.begin_transaction_nc().await)
860 .await?
861 .unwrap_or_default();
862 trace!(target: LOG_CLIENT_EVENT_LOG, ?next_key, "Handling trimable events");
863
864 loop {
865 let mut dbtx = db.begin_transaction().await;
866
867 match dbtx.get_value(&next_key).await {
868 Some(event) => {
869 (call_fn)(&mut dbtx.to_ref_nc(), event).await?;
870
871 next_key = next_key.next();
872 tracker.store(&mut dbtx.to_ref_nc(), next_key).await?;
873
874 dbtx.commit_tx().await;
875 }
876 _ => {
877 if log_event_added.changed().await.is_err() {
878 break Ok(());
879 }
880 }
881 }
882 }
883}
884
885pub fn filter_events_by_kind<'a, I>(
888 all_events: I,
889 module_kind: ModuleKind,
890 event_kind: EventKind,
891) -> impl Iterator<Item = &'a PersistedLogEntry> + 'a
892where
893 I: IntoIterator<Item = &'a PersistedLogEntry> + 'a,
894{
895 all_events.into_iter().filter(move |e| {
896 if let Some((m, _)) = &e.inner.module {
897 e.inner.kind == event_kind && *m == module_kind
898 } else {
899 false
900 }
901 })
902}
903
904pub fn join_events<'a, L, R, Res>(
915 events_l: &'a [&PersistedLogEntry],
916 events_r: &'a [&PersistedLogEntry],
917 max_time_distance: Option<Duration>,
918 predicate: impl Fn(L, R, Duration) -> Option<Res> + 'a,
919) -> impl Iterator<Item = Res> + 'a
920where
921 L: Event,
922 R: Event,
923{
924 events_l
925 .iter()
926 .cartesian_product(events_r)
927 .filter_map(move |(l, r)| {
928 if L::MODULE.as_ref() == l.as_raw().module_kind()
929 && L::KIND == l.as_raw().kind
930 && R::MODULE.as_ref() == r.as_raw().module_kind()
931 && R::KIND == r.as_raw().kind
932 && let Some(latency_usecs) = r.inner.ts_usecs.checked_sub(l.inner.ts_usecs)
933 && max_time_distance.is_none_or(|max| u128::from(latency_usecs) <= max.as_millis())
934 && let Some(l) = l.as_raw().to_event()
935 && let Some(r) = r.as_raw().to_event()
936 {
937 predicate(l, r, Duration::from_millis(latency_usecs))
938 } else {
939 None
940 }
941 })
942}
943
944#[derive(Debug, Default)]
947pub struct StructuredPaymentEvents {
948 pub latencies_usecs: Vec<u64>,
949 pub fees: Vec<Amount>,
950 pub latencies_failure: Vec<u64>,
951}
952
953impl StructuredPaymentEvents {
954 pub fn new(
955 success_stats: &[(u64, Amount)],
956 failure_stats: Vec<u64>,
957 ) -> StructuredPaymentEvents {
958 let mut events = StructuredPaymentEvents {
959 latencies_usecs: success_stats.iter().map(|(l, _)| *l).collect(),
960 fees: success_stats.iter().map(|(_, f)| *f).collect(),
961 latencies_failure: failure_stats,
962 };
963 events.sort();
964 events
965 }
966
967 pub fn combine(&mut self, other: &mut StructuredPaymentEvents) {
970 self.latencies_usecs.append(&mut other.latencies_usecs);
971 self.fees.append(&mut other.fees);
972 self.latencies_failure.append(&mut other.latencies_failure);
973 self.sort();
974 }
975
976 fn sort(&mut self) {
979 self.latencies_usecs.sort_unstable();
980 self.fees.sort_unstable();
981 self.latencies_failure.sort_unstable();
982 }
983}
984
985#[cfg(test)]
986mod tests;