fedimint_server/consensus/aleph_bft/
data_provider.rs

1use std::collections::BTreeSet;
2use std::time::Instant;
3
4use bitcoin::hashes::Hash;
5use fedimint_core::TransactionId;
6use fedimint_core::config::ALEPH_BFT_UNIT_BYTE_LIMIT;
7use fedimint_core::encoding::Encodable;
8use fedimint_core::epoch::ConsensusItem;
9use fedimint_core::session_outcome::SchnorrSignature;
10use tokio::sync::watch;
11
12use crate::LOG_CONSENSUS;
13
14#[derive(
15    Clone, Debug, PartialEq, Eq, Hash, parity_scale_codec::Encode, parity_scale_codec::Decode,
16)]
17pub enum UnitData {
18    Batch(Vec<u8>),
19    Signature(SchnorrSignature),
20}
21
22impl UnitData {
23    // in order to bound the RAM consumption of a session we have to bound an
24    // individual units size, hence the size of its attached unit data in memory
25    pub fn is_valid(&self) -> bool {
26        match self {
27            UnitData::Signature(..) => true,
28            UnitData::Batch(bytes) => bytes.len() <= ALEPH_BFT_UNIT_BYTE_LIMIT,
29        }
30    }
31}
32
33pub struct DataProvider {
34    mempool_item_receiver: async_channel::Receiver<ConsensusItem>,
35    signature_receiver: watch::Receiver<Option<SchnorrSignature>>,
36    submitted_transactions: BTreeSet<TransactionId>,
37    leftover_item: Option<ConsensusItem>,
38    // Since it's possible that `fedimintd` after restart will receive citems it
39    // sent before restart, we use cheap citem's chsum, as a simple method
40    // to self-synchronize. See <https://github.com/fedimint/fedimint/pull/5432#issuecomment-2176860609>
41    // for discussion about it.
42    timestamp_sender: async_channel::Sender<(Instant, u64)>,
43}
44
45impl DataProvider {
46    pub fn new(
47        mempool_item_receiver: async_channel::Receiver<ConsensusItem>,
48        signature_receiver: watch::Receiver<Option<SchnorrSignature>>,
49        timestamp_sender: async_channel::Sender<(Instant, u64)>,
50    ) -> Self {
51        Self {
52            mempool_item_receiver,
53            signature_receiver,
54            submitted_transactions: BTreeSet::new(),
55            leftover_item: None,
56            timestamp_sender,
57        }
58    }
59}
60
61#[async_trait::async_trait]
62impl aleph_bft::DataProvider<UnitData> for DataProvider {
63    async fn get_data(&mut self) -> Option<UnitData> {
64        // we only attach our signature as no more items can be ordered in this session
65        if let Some(signature) = self.signature_receiver.borrow().clone() {
66            return Some(UnitData::Signature(signature));
67        }
68
69        // the length of a vector is encoded in at most 9 bytes
70        let mut n_bytes = 9;
71        let mut items = Vec::new();
72
73        if let Some(item) = self.leftover_item.take() {
74            let n_bytes_item = item.consensus_encode_to_vec().len();
75
76            if n_bytes_item + n_bytes <= ALEPH_BFT_UNIT_BYTE_LIMIT {
77                n_bytes += n_bytes_item;
78                items.push(item);
79            } else {
80                tracing::warn!(target: LOG_CONSENSUS, ?item, "Consensus item length is over BYTE_LIMIT");
81            }
82        }
83
84        // if the channel is empty we want to return the batch immediately in order to
85        // not delay the creation of our next unit, even if the batch is empty
86        while let Ok(item) = self.mempool_item_receiver.try_recv() {
87            if let ConsensusItem::Transaction(transaction) = &item {
88                if !self.submitted_transactions.insert(transaction.tx_hash()) {
89                    continue;
90                }
91            }
92
93            let n_bytes_item = item.consensus_encode_to_vec().len();
94
95            if n_bytes + n_bytes_item <= ALEPH_BFT_UNIT_BYTE_LIMIT {
96                n_bytes += n_bytes_item;
97                items.push(item);
98            } else {
99                self.leftover_item = Some(item);
100                break;
101            }
102        }
103
104        if items.is_empty() {
105            return None;
106        }
107
108        let bytes = items.consensus_encode_to_vec();
109
110        assert!(bytes.len() <= ALEPH_BFT_UNIT_BYTE_LIMIT);
111
112        self.timestamp_sender
113            .send((Instant::now(), get_citem_bytes_chsum(&bytes)))
114            .await
115            .ok();
116
117        Some(UnitData::Batch(bytes))
118    }
119}
120
121/// Calculate a cheap chesum of an encoded citem
122pub(crate) fn get_citem_bytes_chsum(bytes: &[u8]) -> u64 {
123    let chsum = bitcoin::hashes::sha256::Hash::hash(bytes);
124    u64::from_le_bytes(chsum[..8].try_into().expect("Can't fail"))
125}