fedimint_api_client/
query.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::mem;
4
5use fedimint_core::task::{MaybeSend, MaybeSync};
6use fedimint_core::{NumPeers, PeerId, maybe_add_send_sync};
7
8use crate::api::{PeerError, PeerResult};
9
10/// Fedimint query strategy
11///
12/// Due to federated security model each Fedimint client API call to the
13/// Federation might require a different way to process one or more required
14/// responses from the Federation members. This trait abstracts away the details
15/// of each specific strategy for the generic client Api code.
16pub trait QueryStrategy<IR, OR = IR> {
17    fn process(&mut self, peer_id: PeerId, response: IR) -> QueryStep<OR>;
18}
19
20/// Results from the strategy handling a response from a peer
21///
22/// Note that the implementation driving the [`QueryStrategy`] returning
23/// [`QueryStep`] is responsible from remembering and collecting errors
24/// for each peer.
25#[derive(Debug)]
26pub enum QueryStep<R> {
27    /// Retry requests to this peers
28    Retry(BTreeSet<PeerId>),
29    /// Do nothing yet, keep waiting for requests
30    Continue,
31    /// Return the successful result
32    Success(R),
33    /// A non-retryable failure has occurred
34    Failure(PeerError),
35}
36
37/// Returns when we obtain the first valid responses. RPC call errors or
38/// invalid responses are not retried.
39pub struct FilterMap<R, T> {
40    filter_map: Box<maybe_add_send_sync!(dyn Fn(R) -> PeerResult<T>)>,
41}
42
43impl<R, T> FilterMap<R, T> {
44    pub fn new(filter_map: impl Fn(R) -> PeerResult<T> + MaybeSend + MaybeSync + 'static) -> Self {
45        Self {
46            filter_map: Box::new(filter_map),
47        }
48    }
49}
50
51impl<R, T> QueryStrategy<R, T> for FilterMap<R, T> {
52    fn process(&mut self, _peer: PeerId, response: R) -> QueryStep<T> {
53        match (self.filter_map)(response) {
54            Ok(value) => QueryStep::Success(value),
55            Err(e) => QueryStep::Failure(e),
56        }
57    }
58}
59
60/// Returns when we obtain a threshold of valid responses. RPC call errors or
61/// invalid responses are not retried.
62pub struct FilterMapThreshold<R, T> {
63    filter_map: Box<maybe_add_send_sync!(dyn Fn(PeerId, R) -> PeerResult<T>)>,
64    filtered_responses: BTreeMap<PeerId, T>,
65    threshold: usize,
66}
67
68impl<R, T> FilterMapThreshold<R, T> {
69    pub fn new(
70        verifier: impl Fn(PeerId, R) -> PeerResult<T> + MaybeSend + MaybeSync + 'static,
71        num_peers: NumPeers,
72    ) -> Self {
73        Self {
74            filter_map: Box::new(verifier),
75            filtered_responses: BTreeMap::new(),
76            threshold: num_peers.threshold(),
77        }
78    }
79}
80
81impl<R, T> QueryStrategy<R, BTreeMap<PeerId, T>> for FilterMapThreshold<R, T> {
82    fn process(&mut self, peer: PeerId, response: R) -> QueryStep<BTreeMap<PeerId, T>> {
83        match (self.filter_map)(peer, response) {
84            Ok(response) => {
85                self.filtered_responses.insert(peer, response);
86
87                if self.filtered_responses.len() == self.threshold {
88                    QueryStep::Success(mem::take(&mut self.filtered_responses))
89                } else {
90                    QueryStep::Continue
91                }
92            }
93            Err(e) => QueryStep::Failure(e),
94        }
95    }
96}
97
98/// Returns when we obtain a threshold of identical responses. Responses are not
99/// assumed to be static and may be updated by the peers; on failure to
100/// establish consensus with a threshold of responses, we retry the requests.
101/// RPC call errors are not retried.
102pub struct ThresholdConsensus<R> {
103    responses: BTreeMap<PeerId, R>,
104    retry: BTreeSet<PeerId>,
105    threshold: usize,
106}
107
108impl<R> ThresholdConsensus<R> {
109    pub fn new(num_peers: NumPeers) -> Self {
110        Self {
111            responses: BTreeMap::new(),
112            retry: BTreeSet::new(),
113            threshold: num_peers.threshold(),
114        }
115    }
116}
117
118impl<R: Eq> QueryStrategy<R> for ThresholdConsensus<R> {
119    fn process(&mut self, peer: PeerId, response: R) -> QueryStep<R> {
120        let current_count = self.responses.values().filter(|r| **r == response).count();
121
122        if current_count + 1 >= self.threshold {
123            return QueryStep::Success(response);
124        }
125
126        self.responses.insert(peer, response);
127
128        assert!(self.retry.insert(peer));
129
130        if self.retry.len() == self.threshold {
131            QueryStep::Retry(mem::take(&mut self.retry))
132        } else {
133            QueryStep::Continue
134        }
135    }
136}