fedimint_api_client/
query.rsuse std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use std::mem;
use fedimint_core::task::{MaybeSend, MaybeSync};
use fedimint_core::{maybe_add_send_sync, NumPeers, PeerId};
pub trait QueryStrategy<IR, OR = IR> {
fn process(&mut self, peer_id: PeerId, response: IR) -> QueryStep<OR>;
}
#[derive(Debug)]
pub enum QueryStep<R> {
Retry(BTreeSet<PeerId>),
Continue,
Success(R),
Failure(anyhow::Error),
}
pub struct FilterMap<R, T> {
filter_map: Box<maybe_add_send_sync!(dyn Fn(R) -> anyhow::Result<T>)>,
}
impl<R, T> FilterMap<R, T> {
pub fn new(
filter_map: impl Fn(R) -> anyhow::Result<T> + MaybeSend + MaybeSync + 'static,
) -> Self {
Self {
filter_map: Box::new(filter_map),
}
}
}
impl<R, T> QueryStrategy<R, T> for FilterMap<R, T> {
fn process(&mut self, _peer: PeerId, response: R) -> QueryStep<T> {
match (self.filter_map)(response) {
Ok(value) => QueryStep::Success(value),
Err(e) => QueryStep::Failure(e),
}
}
}
pub struct FilterMapThreshold<R, T> {
filter_map: Box<maybe_add_send_sync!(dyn Fn(PeerId, R) -> anyhow::Result<T>)>,
filtered_responses: BTreeMap<PeerId, T>,
threshold: usize,
}
impl<R, T> FilterMapThreshold<R, T> {
pub fn new(
verifier: impl Fn(PeerId, R) -> anyhow::Result<T> + MaybeSend + MaybeSync + 'static,
num_peers: NumPeers,
) -> Self {
Self {
filter_map: Box::new(verifier),
filtered_responses: BTreeMap::new(),
threshold: num_peers.threshold(),
}
}
}
impl<R, T> QueryStrategy<R, BTreeMap<PeerId, T>> for FilterMapThreshold<R, T> {
fn process(&mut self, peer: PeerId, response: R) -> QueryStep<BTreeMap<PeerId, T>> {
match (self.filter_map)(peer, response) {
Ok(response) => {
self.filtered_responses.insert(peer, response);
if self.filtered_responses.len() == self.threshold {
QueryStep::Success(mem::take(&mut self.filtered_responses))
} else {
QueryStep::Continue
}
}
Err(e) => QueryStep::Failure(e),
}
}
}
pub struct ThresholdConsensus<R> {
responses: BTreeMap<PeerId, R>,
retry: BTreeSet<PeerId>,
threshold: usize,
}
impl<R> ThresholdConsensus<R> {
pub fn new(num_peers: NumPeers) -> Self {
Self {
responses: BTreeMap::new(),
retry: BTreeSet::new(),
threshold: num_peers.threshold(),
}
}
}
impl<R: Eq> QueryStrategy<R> for ThresholdConsensus<R> {
fn process(&mut self, peer: PeerId, response: R) -> QueryStep<R> {
let current_count = self.responses.values().filter(|r| **r == response).count();
if current_count + 1 >= self.threshold {
return QueryStep::Success(response);
}
self.responses.insert(peer, response);
assert!(self.retry.insert(peer));
if self.retry.len() == self.threshold {
QueryStep::Retry(mem::take(&mut self.retry))
} else {
QueryStep::Continue
}
}
}