fedimint_server/consensus/aleph_bft/
data_provider.rs
1use std::collections::BTreeSet;
2use std::time::Instant;
3
4use fedimint_core::TransactionId;
5use fedimint_core::config::ALEPH_BFT_UNIT_BYTE_LIMIT;
6use fedimint_core::encoding::Encodable;
7use fedimint_core::epoch::ConsensusItem;
8use fedimint_core::session_outcome::SchnorrSignature;
9use tokio::sync::watch;
10
11use crate::LOG_CONSENSUS;
12
13#[derive(
14 Clone, Debug, PartialEq, Eq, Hash, parity_scale_codec::Encode, parity_scale_codec::Decode,
15)]
16pub enum UnitData {
17 Batch(Vec<u8>),
18 Signature(SchnorrSignature),
19}
20
21impl UnitData {
22 pub fn is_valid(&self) -> bool {
25 match self {
26 UnitData::Signature(..) => true,
27 UnitData::Batch(bytes) => bytes.len() <= ALEPH_BFT_UNIT_BYTE_LIMIT,
28 }
29 }
30}
31
32pub struct DataProvider {
33 mempool_item_receiver: async_channel::Receiver<ConsensusItem>,
34 signature_receiver: watch::Receiver<Option<SchnorrSignature>>,
35 submitted_transactions: BTreeSet<TransactionId>,
36 leftover_item: Option<ConsensusItem>,
37 timestamp_sender: async_channel::Sender<Instant>,
38 is_recovery: bool,
39}
40
41impl DataProvider {
42 pub fn new(
43 mempool_item_receiver: async_channel::Receiver<ConsensusItem>,
44 signature_receiver: watch::Receiver<Option<SchnorrSignature>>,
45 timestamp_sender: async_channel::Sender<Instant>,
46 is_recovery: bool,
47 ) -> Self {
48 Self {
49 mempool_item_receiver,
50 signature_receiver,
51 submitted_transactions: BTreeSet::new(),
52 leftover_item: None,
53 timestamp_sender,
54 is_recovery,
55 }
56 }
57}
58
59#[async_trait::async_trait]
60impl aleph_bft::DataProvider<UnitData> for DataProvider {
61 async fn get_data(&mut self) -> Option<UnitData> {
62 if let Some(signature) = self.signature_receiver.borrow().clone() {
64 return Some(UnitData::Signature(signature));
65 }
66
67 let mut n_bytes = 9;
69 let mut items = Vec::new();
70
71 if let Some(item) = self.leftover_item.take() {
72 let n_bytes_item = item.consensus_encode_to_vec().len();
73
74 if n_bytes_item + n_bytes <= ALEPH_BFT_UNIT_BYTE_LIMIT {
75 n_bytes += n_bytes_item;
76 items.push(item);
77 } else {
78 tracing::warn!(target: LOG_CONSENSUS, ?item, "Consensus item length is over BYTE_LIMIT");
79 }
80 }
81
82 while let Ok(item) = self.mempool_item_receiver.try_recv() {
85 if let ConsensusItem::Transaction(transaction) = &item {
86 if !self.submitted_transactions.insert(transaction.tx_hash()) {
87 continue;
88 }
89 }
90
91 let n_bytes_item = item.consensus_encode_to_vec().len();
92
93 if n_bytes + n_bytes_item <= ALEPH_BFT_UNIT_BYTE_LIMIT {
94 n_bytes += n_bytes_item;
95 items.push(item);
96 } else {
97 self.leftover_item = Some(item);
98 break;
99 }
100 }
101
102 if items.is_empty() {
103 return None;
104 }
105
106 if !self.is_recovery {
107 self.timestamp_sender.send(Instant::now()).await.ok();
108 }
109
110 let bytes = items.consensus_encode_to_vec();
111
112 assert!(bytes.len() <= ALEPH_BFT_UNIT_BYTE_LIMIT);
113
114 Some(UnitData::Batch(bytes))
115 }
116}