fedimint_core/db/
notifications.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

use bitvec::vec::BitVec;
use tokio::sync::futures::Notified;
use tokio::sync::Notify;

/// Number of buckets used for `Notifications`.
const NOTIFY_BUCKETS: usize = 32;

/// The state of Notification.
///
/// This stores `NOTIFY_BUCKETS` number of `Notifies`.
/// Each key is assigned a bucket based on its hash value.
/// This will cause some false positives.
#[derive(Debug)]
pub struct Notifications {
    buckets: Vec<Notify>,
}

impl Default for Notifications {
    fn default() -> Self {
        Self {
            buckets: (0..NOTIFY_BUCKETS).map(|_| Notify::new()).collect(),
        }
    }
}

fn slot_index_for_hash(hash_value: u64) -> usize {
    (hash_value % (NOTIFY_BUCKETS as u64)) as usize
}

fn slot_index_for_key<K: Hash>(key: K) -> usize {
    let mut hasher = DefaultHasher::new();
    key.hash(&mut hasher);
    let hash_value = hasher.finish();
    slot_index_for_hash(hash_value)
}

impl Notifications {
    pub fn new() -> Self {
        Self::default()
    }

    /// This registers for notification when called.
    ///
    /// Then waits for the notification when .awaited.
    ///
    /// NOTE: This may some false positives.
    pub fn register<K>(&self, key: K) -> Notified
    where
        K: Hash,
    {
        self.buckets[slot_index_for_key(key)].notified()
    }

    /// Notify a key.
    ///
    /// All the waiters for this keys will be notified.
    pub fn notify<K>(&self, key: K)
    where
        K: Hash,
    {
        self.buckets[slot_index_for_key(key)].notify_waiters();
    }

    /// Notifies the waiters about the notifications recorded in `NotifyQueue`.
    pub fn submit_queue(&self, queue: &NotifyQueue) {
        for bucket in queue.buckets.iter_ones() {
            self.buckets[bucket].notify_waiters();
        }
    }
}

/// Save notifications to be sent after transaction is complete.
#[derive(Debug)]
pub struct NotifyQueue {
    buckets: BitVec,
}

impl Default for NotifyQueue {
    fn default() -> Self {
        Self {
            buckets: BitVec::repeat(false, NOTIFY_BUCKETS),
        }
    }
}

impl NotifyQueue {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn add<K>(&mut self, key: &K)
    where
        K: Hash,
    {
        self.buckets.set(slot_index_for_key(key), true);
    }
}

#[cfg(test)]
mod tests {
    use fedimint_core::db::test_utils::future_returns_shortly;

    use super::*;

    #[tokio::test]
    async fn test_notification_after_notify() {
        let notifs = Notifications::new();
        let key = 1;
        let sub = notifs.register(key);
        notifs.notify(key);
        assert!(future_returns_shortly(sub).await.is_some(), "should notify");
    }

    #[tokio::test]
    async fn test_no_notification_without_notify() {
        let notifs = Notifications::new();
        let key = 1;
        let sub = notifs.register(key);
        assert!(
            future_returns_shortly(sub).await.is_none(),
            "should not notify"
        );
    }

    #[tokio::test]
    async fn test_multi_one() {
        let notifs = Notifications::new();
        let key1 = 1;
        let sub1 = notifs.register(key1);
        let sub2 = notifs.register(key1);
        let sub3 = notifs.register(key1);
        let sub4 = notifs.register(key1);
        notifs.notify(key1);
        assert!(
            future_returns_shortly(sub1).await.is_some(),
            "should notify"
        );
        assert!(
            future_returns_shortly(sub2).await.is_some(),
            "should notify"
        );
        assert!(
            future_returns_shortly(sub3).await.is_some(),
            "should notify"
        );
        assert!(
            future_returns_shortly(sub4).await.is_some(),
            "should notify"
        );
    }

    #[tokio::test]
    async fn test_multi() {
        let notifs = Notifications::new();
        let key1 = 1;
        let key2 = 2;
        let sub1 = notifs.register(key1);
        let sub2 = notifs.register(key2);
        notifs.notify(key1);
        notifs.notify(key2);
        assert!(
            future_returns_shortly(sub1).await.is_some(),
            "should notify"
        );
        assert!(
            future_returns_shortly(sub2).await.is_some(),
            "should notify"
        );
    }

    #[tokio::test]
    async fn test_notify_queue() {
        let notifs = Notifications::new();
        let key1 = 1;
        let key2 = 2;
        let sub1 = notifs.register(key1);
        let sub2 = notifs.register(key2);
        let mut queue = NotifyQueue::new();
        queue.add(&key1);
        queue.add(&key2);
        notifs.submit_queue(&queue);
        assert!(
            future_returns_shortly(sub1).await.is_some(),
            "should notify"
        );
        assert!(
            future_returns_shortly(sub2).await.is_some(),
            "should notify"
        );
    }
}