fedimint_server/consensus/aleph_bft/
data_provider.rs

1use std::collections::BTreeSet;
2use std::time::Instant;
3
4use fedimint_core::TransactionId;
5use fedimint_core::config::ALEPH_BFT_UNIT_BYTE_LIMIT;
6use fedimint_core::encoding::Encodable;
7use fedimint_core::epoch::ConsensusItem;
8use fedimint_core::session_outcome::SchnorrSignature;
9use tokio::sync::watch;
10
11use crate::LOG_CONSENSUS;
12
13#[derive(
14    Clone, Debug, PartialEq, Eq, Hash, parity_scale_codec::Encode, parity_scale_codec::Decode,
15)]
16pub enum UnitData {
17    Batch(Vec<u8>),
18    Signature(SchnorrSignature),
19}
20
21impl UnitData {
22    // in order to bound the RAM consumption of a session we have to bound an
23    // individual units size, hence the size of its attached unit data in memory
24    pub fn is_valid(&self) -> bool {
25        match self {
26            UnitData::Signature(..) => true,
27            UnitData::Batch(bytes) => bytes.len() <= ALEPH_BFT_UNIT_BYTE_LIMIT,
28        }
29    }
30}
31
32pub struct DataProvider {
33    mempool_item_receiver: async_channel::Receiver<ConsensusItem>,
34    signature_receiver: watch::Receiver<Option<SchnorrSignature>>,
35    submitted_transactions: BTreeSet<TransactionId>,
36    leftover_item: Option<ConsensusItem>,
37    timestamp_sender: async_channel::Sender<Instant>,
38    is_recovery: bool,
39}
40
41impl DataProvider {
42    pub fn new(
43        mempool_item_receiver: async_channel::Receiver<ConsensusItem>,
44        signature_receiver: watch::Receiver<Option<SchnorrSignature>>,
45        timestamp_sender: async_channel::Sender<Instant>,
46        is_recovery: bool,
47    ) -> Self {
48        Self {
49            mempool_item_receiver,
50            signature_receiver,
51            submitted_transactions: BTreeSet::new(),
52            leftover_item: None,
53            timestamp_sender,
54            is_recovery,
55        }
56    }
57}
58
59#[async_trait::async_trait]
60impl aleph_bft::DataProvider<UnitData> for DataProvider {
61    async fn get_data(&mut self) -> Option<UnitData> {
62        // we only attach our signature as no more items can be ordered in this session
63        if let Some(signature) = self.signature_receiver.borrow().clone() {
64            return Some(UnitData::Signature(signature));
65        }
66
67        // the length of a vector is encoded in at most 9 bytes
68        let mut n_bytes = 9;
69        let mut items = Vec::new();
70
71        if let Some(item) = self.leftover_item.take() {
72            let n_bytes_item = item.consensus_encode_to_vec().len();
73
74            if n_bytes_item + n_bytes <= ALEPH_BFT_UNIT_BYTE_LIMIT {
75                n_bytes += n_bytes_item;
76                items.push(item);
77            } else {
78                tracing::warn!(target: LOG_CONSENSUS, ?item, "Consensus item length is over BYTE_LIMIT");
79            }
80        }
81
82        // if the channel is empty we want to return the batch immediately in order to
83        // not delay the creation of our next unit, even if the batch is empty
84        while let Ok(item) = self.mempool_item_receiver.try_recv() {
85            if let ConsensusItem::Transaction(transaction) = &item {
86                if !self.submitted_transactions.insert(transaction.tx_hash()) {
87                    continue;
88                }
89            }
90
91            let n_bytes_item = item.consensus_encode_to_vec().len();
92
93            if n_bytes + n_bytes_item <= ALEPH_BFT_UNIT_BYTE_LIMIT {
94                n_bytes += n_bytes_item;
95                items.push(item);
96            } else {
97                self.leftover_item = Some(item);
98                break;
99            }
100        }
101
102        if items.is_empty() {
103            return None;
104        }
105
106        if !self.is_recovery {
107            self.timestamp_sender.send(Instant::now()).await.ok();
108        }
109
110        let bytes = items.consensus_encode_to_vec();
111
112        assert!(bytes.len() <= ALEPH_BFT_UNIT_BYTE_LIMIT);
113
114        Some(UnitData::Batch(bytes))
115    }
116}