fedimint_fountain/
lib.rs

1//! Fountain encoder/decoder for splitting and recombining byte payloads
2//!
3//! This is an inline fork of the fountain module from ur-rs,
4//! providing rateless fountain codes for efficient data transmission.
5//!
6//! The fountain encoder splits a byte payload into multiple segments
7//! and emits an unbounded stream of parts which can be recombined at
8//! the receiving decoder side. The emitted parts are either original
9//! payload segments, or constructed by xor-ing a certain set of payload
10//! segments.
11
12mod fountain;
13
14use std::marker::PhantomData;
15
16use fedimint_core::encoding::{Decodable, Encodable};
17use fedimint_core::module::registry::ModuleDecoderRegistry;
18
19pub struct FountainEncoder {
20    encoder: fountain::Encoder,
21}
22
23impl FountainEncoder {
24    pub fn new(encodable: impl Encodable, max_fragment_length: usize) -> Self {
25        Self {
26            encoder: fountain::Encoder::new(
27                encodable.consensus_encode_to_vec().as_slice(),
28                max_fragment_length,
29            ),
30        }
31    }
32
33    /// Fragments never repeat, so this can be called indefinitely
34    pub fn next_fragment(&mut self) -> fountain::Fragment {
35        self.encoder.next_fragment()
36    }
37}
38
39/// Decoder for fountain-encoded encodable types
40pub struct FountainDecoder<E: Decodable> {
41    decoder: fountain::Decoder,
42    _pd: PhantomData<E>,
43}
44
45impl<E: Decodable> Default for FountainDecoder<E> {
46    fn default() -> Self {
47        Self {
48            decoder: fountain::Decoder::default(),
49            _pd: PhantomData,
50        }
51    }
52}
53
54impl<E: Decodable> FountainDecoder<E> {
55    /// Add a scanned fragment. Returns Some(E) when decoding is complete
56    pub fn add_fragment(&mut self, fragment: &fountain::Fragment) -> Option<E> {
57        // Try to receive the fragment
58        match self.decoder.receive(fragment.clone()) {
59            Ok(Some(bytes)) => {
60                // Decoding complete!
61                Decodable::consensus_decode_whole(&bytes, &ModuleDecoderRegistry::default()).ok()
62            }
63            Ok(None) => {
64                // Not done yet
65                None
66            }
67            Err(_) => {
68                // Invalid fragment, reset decoder
69                self.decoder = fountain::Decoder::default();
70                None
71            }
72        }
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79
80    #[test]
81    fn test_fountain_encode_decode() {
82        for n in 0..10000 {
83            test_fountain_encode_decode_for_n(n);
84        }
85    }
86
87    fn test_fountain_encode_decode_for_n(n: usize) {
88        let original = (0..n).map(|i| i as u8).collect::<Vec<u8>>();
89
90        let mut encoder = FountainEncoder::new(&original, 1000);
91
92        let mut decoder: FountainDecoder<Vec<u8>> = FountainDecoder::default();
93
94        for k in 0..30 {
95            let fragment = encoder.next_fragment();
96
97            if let Some(data) = decoder.add_fragment(&fragment) {
98                assert_eq!(data, original);
99                if n % 100 == 0 {
100                    println!("Decoded {} bytes within {} fragments", n, k + 1);
101                }
102                return;
103            }
104
105            assert!(
106                decoder.add_fragment(&fragment).is_none(),
107                "Should not decode yet"
108            );
109
110            let _ = encoder.next_fragment();
111        }
112
113        panic!("Decoder did not decode the original data within 25 fragments");
114    }
115}