fedimint_api_client/
query.rs1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::mem;
4
5use fedimint_connectors::ServerResult;
6use fedimint_connectors::error::ServerError;
7use fedimint_core::task::{MaybeSend, MaybeSync};
8use fedimint_core::{NumPeers, PeerId, maybe_add_send_sync};
9
10pub trait QueryStrategy<IR, OR = IR> {
17 fn process(&mut self, peer_id: PeerId, response: IR) -> QueryStep<OR>;
18}
19
20#[derive(Debug)]
26pub enum QueryStep<R> {
27 Retry(BTreeSet<PeerId>),
29 Continue,
31 Success(R),
33 Failure(ServerError),
35}
36
37pub struct FilterMap<R, T> {
40 filter_map: Box<maybe_add_send_sync!(dyn Fn(R) -> ServerResult<T>)>,
41}
42
43impl<R, T> FilterMap<R, T> {
44 pub fn new(
45 filter_map: impl Fn(R) -> ServerResult<T> + MaybeSend + MaybeSync + 'static,
46 ) -> Self {
47 Self {
48 filter_map: Box::new(filter_map),
49 }
50 }
51}
52
53impl<R, T> QueryStrategy<R, T> for FilterMap<R, T> {
54 fn process(&mut self, _peer: PeerId, response: R) -> QueryStep<T> {
55 match (self.filter_map)(response) {
56 Ok(value) => QueryStep::Success(value),
57 Err(e) => QueryStep::Failure(e),
58 }
59 }
60}
61
62pub struct FilterMapThreshold<R, T> {
65 filter_map: Box<maybe_add_send_sync!(dyn Fn(PeerId, R) -> ServerResult<T>)>,
66 filtered_responses: BTreeMap<PeerId, T>,
67 threshold: usize,
68}
69
70impl<R, T> FilterMapThreshold<R, T> {
71 pub fn new(
72 verifier: impl Fn(PeerId, R) -> ServerResult<T> + MaybeSend + MaybeSync + 'static,
73 num_peers: NumPeers,
74 ) -> Self {
75 Self {
76 filter_map: Box::new(verifier),
77 filtered_responses: BTreeMap::new(),
78 threshold: num_peers.threshold(),
79 }
80 }
81}
82
83impl<R, T> QueryStrategy<R, BTreeMap<PeerId, T>> for FilterMapThreshold<R, T> {
84 fn process(&mut self, peer: PeerId, response: R) -> QueryStep<BTreeMap<PeerId, T>> {
85 match (self.filter_map)(peer, response) {
86 Ok(response) => {
87 self.filtered_responses.insert(peer, response);
88
89 if self.filtered_responses.len() == self.threshold {
90 QueryStep::Success(mem::take(&mut self.filtered_responses))
91 } else {
92 QueryStep::Continue
93 }
94 }
95 Err(e) => QueryStep::Failure(e),
96 }
97 }
98}
99
100pub struct ThresholdConsensus<R> {
105 responses: BTreeMap<PeerId, R>,
106 retry: BTreeSet<PeerId>,
107 threshold: usize,
108}
109
110impl<R> ThresholdConsensus<R> {
111 pub fn new(num_peers: NumPeers) -> Self {
112 Self {
113 responses: BTreeMap::new(),
114 retry: BTreeSet::new(),
115 threshold: num_peers.threshold(),
116 }
117 }
118}
119
120impl<R: Eq + Clone> QueryStrategy<R> for ThresholdConsensus<R> {
121 fn process(&mut self, peer: PeerId, response: R) -> QueryStep<R> {
122 self.responses.insert(peer, response.clone());
123
124 if self.responses.values().filter(|r| **r == response).count() == self.threshold {
125 return QueryStep::Success(response);
126 }
127
128 assert!(self.retry.insert(peer));
129
130 if self.retry.len() == self.threshold {
131 QueryStep::Retry(mem::take(&mut self.retry))
132 } else {
133 QueryStep::Continue
134 }
135 }
136}
137
138#[test]
139fn test_threshold_consensus() {
140 let mut consensus = ThresholdConsensus::<u64>::new(NumPeers::from(4));
141
142 assert!(matches!(
143 consensus.process(PeerId::from(0), 1),
144 QueryStep::Continue
145 ));
146 assert!(matches!(
147 consensus.process(PeerId::from(1), 1),
148 QueryStep::Continue
149 ));
150 assert!(matches!(
151 consensus.process(PeerId::from(2), 0),
152 QueryStep::Retry(..)
153 ));
154
155 assert!(matches!(
156 consensus.process(PeerId::from(0), 1),
157 QueryStep::Continue
158 ));
159 assert!(matches!(
160 consensus.process(PeerId::from(1), 1),
161 QueryStep::Continue
162 ));
163 assert!(matches!(
164 consensus.process(PeerId::from(2), 1),
165 QueryStep::Success(1)
166 ));
167}