fedimint_server/consensus/aleph_bft/
data_provider.rs1use std::collections::BTreeSet;
2use std::time::Instant;
3
4use bitcoin::hashes::Hash;
5use fedimint_core::TransactionId;
6use fedimint_core::config::ALEPH_BFT_UNIT_BYTE_LIMIT;
7use fedimint_core::encoding::Encodable;
8use fedimint_core::epoch::ConsensusItem;
9use fedimint_core::session_outcome::SchnorrSignature;
10use tokio::sync::watch;
11
12use crate::LOG_CONSENSUS;
13
14#[derive(
15 Clone, Debug, PartialEq, Eq, Hash, parity_scale_codec::Encode, parity_scale_codec::Decode,
16)]
17pub enum UnitData {
18 Batch(Vec<u8>),
19 Signature(SchnorrSignature),
20}
21
22impl UnitData {
23 pub fn is_valid(&self) -> bool {
26 match self {
27 UnitData::Signature(..) => true,
28 UnitData::Batch(bytes) => bytes.len() <= ALEPH_BFT_UNIT_BYTE_LIMIT,
29 }
30 }
31}
32
33pub struct DataProvider {
34 mempool_item_receiver: async_channel::Receiver<ConsensusItem>,
35 signature_receiver: watch::Receiver<Option<SchnorrSignature>>,
36 submitted_transactions: BTreeSet<TransactionId>,
37 leftover_item: Option<ConsensusItem>,
38 timestamp_sender: async_channel::Sender<(Instant, u64)>,
43}
44
45impl DataProvider {
46 pub fn new(
47 mempool_item_receiver: async_channel::Receiver<ConsensusItem>,
48 signature_receiver: watch::Receiver<Option<SchnorrSignature>>,
49 timestamp_sender: async_channel::Sender<(Instant, u64)>,
50 ) -> Self {
51 Self {
52 mempool_item_receiver,
53 signature_receiver,
54 submitted_transactions: BTreeSet::new(),
55 leftover_item: None,
56 timestamp_sender,
57 }
58 }
59}
60
61#[async_trait::async_trait]
62impl aleph_bft::DataProvider<UnitData> for DataProvider {
63 async fn get_data(&mut self) -> Option<UnitData> {
64 if let Some(signature) = self.signature_receiver.borrow().clone() {
66 return Some(UnitData::Signature(signature));
67 }
68
69 let mut n_bytes = 9;
71 let mut items = Vec::new();
72
73 if let Some(item) = self.leftover_item.take() {
74 let n_bytes_item = item.consensus_encode_to_vec().len();
75
76 if n_bytes_item + n_bytes <= ALEPH_BFT_UNIT_BYTE_LIMIT {
77 n_bytes += n_bytes_item;
78 items.push(item);
79 } else {
80 tracing::warn!(target: LOG_CONSENSUS, ?item, "Consensus item length is over BYTE_LIMIT");
81 }
82 }
83
84 while let Ok(item) = self.mempool_item_receiver.try_recv() {
87 if let ConsensusItem::Transaction(transaction) = &item {
88 if !self.submitted_transactions.insert(transaction.tx_hash()) {
89 continue;
90 }
91 }
92
93 let n_bytes_item = item.consensus_encode_to_vec().len();
94
95 if n_bytes + n_bytes_item <= ALEPH_BFT_UNIT_BYTE_LIMIT {
96 n_bytes += n_bytes_item;
97 items.push(item);
98 } else {
99 self.leftover_item = Some(item);
100 break;
101 }
102 }
103
104 if items.is_empty() {
105 return None;
106 }
107
108 let bytes = items.consensus_encode_to_vec();
109
110 assert!(bytes.len() <= ALEPH_BFT_UNIT_BYTE_LIMIT);
111
112 self.timestamp_sender
113 .send((Instant::now(), get_citem_bytes_chsum(&bytes)))
114 .await
115 .ok();
116
117 Some(UnitData::Batch(bytes))
118 }
119}
120
121pub(crate) fn get_citem_bytes_chsum(bytes: &[u8]) -> u64 {
123 let chsum = bitcoin::hashes::sha256::Hash::hash(bytes);
124 u64::from_le_bytes(chsum[..8].try_into().expect("Can't fail"))
125}