fedimint_api_client/
query.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::mem;
4
5use fedimint_connectors::ServerResult;
6use fedimint_connectors::error::ServerError;
7use fedimint_core::task::{MaybeSend, MaybeSync};
8use fedimint_core::{NumPeers, PeerId, maybe_add_send_sync};
9
10/// Fedimint query strategy
11///
12/// Due to federated security model each Fedimint client API call to the
13/// Federation might require a different way to process one or more required
14/// responses from the Federation members. This trait abstracts away the details
15/// of each specific strategy for the generic client Api code.
16pub trait QueryStrategy<IR, OR = IR> {
17    fn process(&mut self, peer_id: PeerId, response: IR) -> QueryStep<OR>;
18}
19
20/// Results from the strategy handling a response from a peer
21///
22/// Note that the implementation driving the [`QueryStrategy`] returning
23/// [`QueryStep`] is responsible from remembering and collecting errors
24/// for each peer.
25#[derive(Debug)]
26pub enum QueryStep<R> {
27    /// Retry requests to this peers
28    Retry(BTreeSet<PeerId>),
29    /// Do nothing yet, keep waiting for requests
30    Continue,
31    /// Return the successful result
32    Success(R),
33    /// A non-retryable failure has occurred
34    Failure(ServerError),
35}
36
37/// Returns when we obtain the first valid responses. RPC call errors or
38/// invalid responses are not retried.
39pub struct FilterMap<R, T> {
40    filter_map: Box<maybe_add_send_sync!(dyn Fn(R) -> ServerResult<T>)>,
41}
42
43impl<R, T> FilterMap<R, T> {
44    pub fn new(
45        filter_map: impl Fn(R) -> ServerResult<T> + MaybeSend + MaybeSync + 'static,
46    ) -> Self {
47        Self {
48            filter_map: Box::new(filter_map),
49        }
50    }
51}
52
53impl<R, T> QueryStrategy<R, T> for FilterMap<R, T> {
54    fn process(&mut self, _peer: PeerId, response: R) -> QueryStep<T> {
55        match (self.filter_map)(response) {
56            Ok(value) => QueryStep::Success(value),
57            Err(e) => QueryStep::Failure(e),
58        }
59    }
60}
61
62/// Returns when we obtain a threshold of valid responses. RPC call errors or
63/// invalid responses are not retried.
64pub struct FilterMapThreshold<R, T> {
65    filter_map: Box<maybe_add_send_sync!(dyn Fn(PeerId, R) -> ServerResult<T>)>,
66    filtered_responses: BTreeMap<PeerId, T>,
67    threshold: usize,
68}
69
70impl<R, T> FilterMapThreshold<R, T> {
71    pub fn new(
72        verifier: impl Fn(PeerId, R) -> ServerResult<T> + MaybeSend + MaybeSync + 'static,
73        num_peers: NumPeers,
74    ) -> Self {
75        Self {
76            filter_map: Box::new(verifier),
77            filtered_responses: BTreeMap::new(),
78            threshold: num_peers.threshold(),
79        }
80    }
81}
82
83impl<R, T> QueryStrategy<R, BTreeMap<PeerId, T>> for FilterMapThreshold<R, T> {
84    fn process(&mut self, peer: PeerId, response: R) -> QueryStep<BTreeMap<PeerId, T>> {
85        match (self.filter_map)(peer, response) {
86            Ok(response) => {
87                self.filtered_responses.insert(peer, response);
88
89                if self.filtered_responses.len() == self.threshold {
90                    QueryStep::Success(mem::take(&mut self.filtered_responses))
91                } else {
92                    QueryStep::Continue
93                }
94            }
95            Err(e) => QueryStep::Failure(e),
96        }
97    }
98}
99
100/// Returns when we obtain a threshold of identical responses. Responses are not
101/// assumed to be static and may be updated by the peers; on failure to
102/// establish consensus with a threshold of responses, we retry the requests.
103/// RPC call errors are not retried.
104pub struct ThresholdConsensus<R> {
105    responses: BTreeMap<PeerId, R>,
106    retry: BTreeSet<PeerId>,
107    threshold: usize,
108}
109
110impl<R> ThresholdConsensus<R> {
111    pub fn new(num_peers: NumPeers) -> Self {
112        Self {
113            responses: BTreeMap::new(),
114            retry: BTreeSet::new(),
115            threshold: num_peers.threshold(),
116        }
117    }
118}
119
120impl<R: Eq + Clone> QueryStrategy<R> for ThresholdConsensus<R> {
121    fn process(&mut self, peer: PeerId, response: R) -> QueryStep<R> {
122        self.responses.insert(peer, response.clone());
123
124        if self.responses.values().filter(|r| **r == response).count() == self.threshold {
125            return QueryStep::Success(response);
126        }
127
128        assert!(self.retry.insert(peer));
129
130        if self.retry.len() == self.threshold {
131            QueryStep::Retry(mem::take(&mut self.retry))
132        } else {
133            QueryStep::Continue
134        }
135    }
136}
137
138#[test]
139fn test_threshold_consensus() {
140    let mut consensus = ThresholdConsensus::<u64>::new(NumPeers::from(4));
141
142    assert!(matches!(
143        consensus.process(PeerId::from(0), 1),
144        QueryStep::Continue
145    ));
146    assert!(matches!(
147        consensus.process(PeerId::from(1), 1),
148        QueryStep::Continue
149    ));
150    assert!(matches!(
151        consensus.process(PeerId::from(2), 0),
152        QueryStep::Retry(..)
153    ));
154
155    assert!(matches!(
156        consensus.process(PeerId::from(0), 1),
157        QueryStep::Continue
158    ));
159    assert!(matches!(
160        consensus.process(PeerId::from(1), 1),
161        QueryStep::Continue
162    ));
163    assert!(matches!(
164        consensus.process(PeerId::from(2), 1),
165        QueryStep::Success(1)
166    ));
167}