fedimint_core/net/peers.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use async_trait::async_trait;
use fedimint_core::PeerId;
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate::task::Cancellable;
#[cfg(not(target_family = "wasm"))]
pub mod fake;
/// Owned [`PeerConnections`] trait object type
pub struct PeerConnections<Msg>(Box<dyn IPeerConnections<Msg> + Send + Unpin + 'static>);
impl<Msg> Deref for PeerConnections<Msg> {
type Target = dyn IPeerConnections<Msg> + Send + Unpin + 'static;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
impl<Msg> DerefMut for PeerConnections<Msg> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut *self.0
}
}
/// Connection manager that tries to keep connections open to all peers
///
/// Production implementations of this trait have to ensure that:
/// * Connections to peers are authenticated and encrypted
/// * Messages are received exactly once and in the order they were sent
/// * Connections are reopened when closed
/// * Messages are cached in case of short-lived network interruptions and
/// resent on reconnect, this avoids the need to rejoin the consensus, which
/// is more tricky.
///
/// In case of longer term interruptions the message cache has to be dropped to
/// avoid DoS attacks. The thus disconnected peer will need to rejoin the
/// consensus at a later time.
#[async_trait]
pub trait IPeerConnections<Msg>
where
Msg: Serialize + DeserializeOwned + Unpin + Send,
{
/// Send a message to a specific peer.
///
/// The message is sent immediately and cached if the peer is reachable and
/// only cached otherwise.
async fn send(&mut self, peers: &[PeerId], msg: Msg) -> Cancellable<()>;
/// Await receipt of a message from any connected peer.
async fn receive(&mut self) -> Cancellable<(PeerId, Msg)>;
/// Removes a peer connection in case of misbehavior
async fn ban_peer(&mut self, peer: PeerId);
/// Converts the struct to a `PeerConnection` trait object
fn into_dyn(self) -> PeerConnections<Msg>
where
Self: Sized + Send + Unpin + 'static,
{
PeerConnections(Box::new(self))
}
}
/// Owned [`MuxPeerConnections`] trait object type
#[derive(Clone)]
pub struct MuxPeerConnections<MuxKey, Msg>(
Arc<dyn IMuxPeerConnections<MuxKey, Msg> + Send + Sync + Unpin + 'static>,
);
impl<MuxKey, Msg> Deref for MuxPeerConnections<MuxKey, Msg> {
type Target = dyn IMuxPeerConnections<MuxKey, Msg> + Send + Sync + Unpin + 'static;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
#[async_trait]
/// Like [`IPeerConnections`] but with an ability to handle multiple
/// destinations (like modules) per each peer-connection.
///
/// Notably, unlike [`IPeerConnections`] implementations need to be thread-safe,
/// as the primary intended use should support multiple threads using
/// multiplexed channel at the same time.
pub trait IMuxPeerConnections<MuxKey, Msg>
where
Msg: Serialize + DeserializeOwned + Unpin + Send,
MuxKey: Serialize + DeserializeOwned + Unpin + Send,
{
/// Send a message to a specific destination at specific peer.
async fn send(&self, peers: &[PeerId], mux_key: MuxKey, msg: Msg) -> Cancellable<()>;
/// Await receipt of a message from any connected peer.
async fn receive(&self, mux_key: MuxKey) -> Cancellable<(PeerId, Msg)>;
/// Removes a peer connection in case of misbehavior
async fn ban_peer(&self, peer: PeerId);
/// Converts the struct to a `PeerConnection` trait object
fn into_dyn(self) -> MuxPeerConnections<MuxKey, Msg>
where
Self: Sized + Send + Sync + Unpin + 'static,
{
MuxPeerConnections(Arc::new(self))
}
}