fedimint_fountain/
lib.rs

1//! The fountain encoder splits a byte payload into multiple segments
2//! and emits an unbounded stream of parts which can be recombined at
3//! the receiving decoder side. The emitted parts are either original
4//! payload segments, or constructed by xor-ing a certain set of payload
5//! segments.
6
7mod fountain;
8
9use std::marker::PhantomData;
10
11use fedimint_core::encoding::{Decodable, Encodable};
12pub use fountain::Fragment;
13
14pub struct FountainEncoder {
15    encoder: fountain::Encoder,
16}
17
18impl FountainEncoder {
19    pub fn new(encodable: impl Encodable, max_fragment_length: usize) -> Self {
20        Self {
21            encoder: fountain::Encoder::new(
22                encodable.consensus_encode_to_vec().as_slice(),
23                max_fragment_length,
24            ),
25        }
26    }
27
28    /// Fragments never repeat, so this can be called indefinitely
29    pub fn next_fragment(&mut self) -> Fragment {
30        self.encoder.next_fragment()
31    }
32}
33
34/// Decoder for fountain-encoded encodable types
35pub struct FountainDecoder<E: Decodable> {
36    decoder: fountain::Decoder,
37    _pd: PhantomData<E>,
38}
39
40impl<E: Decodable> Default for FountainDecoder<E> {
41    fn default() -> Self {
42        Self {
43            decoder: fountain::Decoder::default(),
44            _pd: PhantomData,
45        }
46    }
47}
48
49impl<E: Decodable> FountainDecoder<E> {
50    /// Add a scanned fragment. Returns Some(E) when decoding is complete. If we
51    /// receive an invalid fragment, possibly belonging to a different fountain
52    /// encoding, the decoder is reset.
53    pub fn add_fragment(&mut self, fragment: &Fragment) -> Option<E> {
54        if let Some(Some(d)) = self
55            .decoder
56            .receive(fragment.clone())
57            .transpose()? // The fragment is valid but the decoding is not yet complete
58            .ok()
59            .map(|b| Decodable::consensus_decode_whole(&b, &Default::default()).ok())
60        {
61            return Some(d);
62        }
63
64        // The received fragment was either invalid or we failed to decode the raw bytes
65        self.decoder = fountain::Decoder::default();
66
67        None
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use super::*;
74
75    #[test]
76    fn test_fountain_encode_decode() {
77        for n in 0..10000 {
78            test_fountain_encode_decode_for_n(n);
79        }
80    }
81
82    fn test_fountain_encode_decode_for_n(n: usize) {
83        let original = (0..n).map(|i| i as u8).collect::<Vec<u8>>();
84
85        let mut encoder = FountainEncoder::new(&original, 1000);
86
87        let mut decoder: FountainDecoder<Vec<u8>> = FountainDecoder::default();
88
89        for k in 0..30 {
90            let fragment = encoder.next_fragment();
91
92            if let Some(data) = decoder.add_fragment(&fragment) {
93                assert_eq!(data, original);
94                if n % 100 == 0 {
95                    println!("Decoded {} bytes within {} fragments", n, k + 1);
96                }
97                return;
98            }
99
100            assert!(
101                decoder.add_fragment(&fragment).is_none(),
102                "Should not decode yet"
103            );
104
105            let _ = encoder.next_fragment();
106        }
107
108        panic!("Decoder did not decode the original data within 25 fragments");
109    }
110}