fedimint_api_client/
query.rs1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::mem;
4
5use fedimint_core::task::{MaybeSend, MaybeSync};
6use fedimint_core::{NumPeers, PeerId, maybe_add_send_sync};
7
8use crate::api::{PeerError, PeerResult};
9
10pub trait QueryStrategy<IR, OR = IR> {
17 fn process(&mut self, peer_id: PeerId, response: IR) -> QueryStep<OR>;
18}
19
20#[derive(Debug)]
26pub enum QueryStep<R> {
27 Retry(BTreeSet<PeerId>),
29 Continue,
31 Success(R),
33 Failure(PeerError),
35}
36
37pub struct FilterMap<R, T> {
40 filter_map: Box<maybe_add_send_sync!(dyn Fn(R) -> PeerResult<T>)>,
41}
42
43impl<R, T> FilterMap<R, T> {
44 pub fn new(filter_map: impl Fn(R) -> PeerResult<T> + MaybeSend + MaybeSync + 'static) -> Self {
45 Self {
46 filter_map: Box::new(filter_map),
47 }
48 }
49}
50
51impl<R, T> QueryStrategy<R, T> for FilterMap<R, T> {
52 fn process(&mut self, _peer: PeerId, response: R) -> QueryStep<T> {
53 match (self.filter_map)(response) {
54 Ok(value) => QueryStep::Success(value),
55 Err(e) => QueryStep::Failure(e),
56 }
57 }
58}
59
60pub struct FilterMapThreshold<R, T> {
63 filter_map: Box<maybe_add_send_sync!(dyn Fn(PeerId, R) -> PeerResult<T>)>,
64 filtered_responses: BTreeMap<PeerId, T>,
65 threshold: usize,
66}
67
68impl<R, T> FilterMapThreshold<R, T> {
69 pub fn new(
70 verifier: impl Fn(PeerId, R) -> PeerResult<T> + MaybeSend + MaybeSync + 'static,
71 num_peers: NumPeers,
72 ) -> Self {
73 Self {
74 filter_map: Box::new(verifier),
75 filtered_responses: BTreeMap::new(),
76 threshold: num_peers.threshold(),
77 }
78 }
79}
80
81impl<R, T> QueryStrategy<R, BTreeMap<PeerId, T>> for FilterMapThreshold<R, T> {
82 fn process(&mut self, peer: PeerId, response: R) -> QueryStep<BTreeMap<PeerId, T>> {
83 match (self.filter_map)(peer, response) {
84 Ok(response) => {
85 self.filtered_responses.insert(peer, response);
86
87 if self.filtered_responses.len() == self.threshold {
88 QueryStep::Success(mem::take(&mut self.filtered_responses))
89 } else {
90 QueryStep::Continue
91 }
92 }
93 Err(e) => QueryStep::Failure(e),
94 }
95 }
96}
97
98pub struct ThresholdConsensus<R> {
103 responses: BTreeMap<PeerId, R>,
104 retry: BTreeSet<PeerId>,
105 threshold: usize,
106}
107
108impl<R> ThresholdConsensus<R> {
109 pub fn new(num_peers: NumPeers) -> Self {
110 Self {
111 responses: BTreeMap::new(),
112 retry: BTreeSet::new(),
113 threshold: num_peers.threshold(),
114 }
115 }
116}
117
118impl<R: Eq> QueryStrategy<R> for ThresholdConsensus<R> {
119 fn process(&mut self, peer: PeerId, response: R) -> QueryStep<R> {
120 let current_count = self.responses.values().filter(|r| **r == response).count();
121
122 if current_count + 1 >= self.threshold {
123 return QueryStep::Success(response);
124 }
125
126 self.responses.insert(peer, response);
127
128 assert!(self.retry.insert(peer));
129
130 if self.retry.len() == self.threshold {
131 QueryStep::Retry(mem::take(&mut self.retry))
132 } else {
133 QueryStep::Continue
134 }
135 }
136}