fedimint_core/net/peers/
fake.rsuse std::time::Duration;
use async_trait::async_trait;
use fedimint_core::net::peers::{IPeerConnections, PeerConnections};
use fedimint_core::runtime::sleep;
use fedimint_core::task::{Cancellable, Cancelled, TaskHandle};
use fedimint_core::PeerId;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::mpsc::{self, Receiver, Sender};
struct FakePeerConnections<Msg> {
tx: Sender<Msg>,
rx: Receiver<Msg>,
peer_id: PeerId,
task_handle: TaskHandle,
}
#[async_trait]
impl<Msg> IPeerConnections<Msg> for FakePeerConnections<Msg>
where
Msg: Serialize + DeserializeOwned + Unpin + Send,
{
async fn send(&mut self, peers: &[PeerId], msg: Msg) -> Cancellable<()> {
assert_eq!(peers, &[self.peer_id]);
let _ = self.tx.send(msg).await;
Ok(())
}
async fn receive(&mut self) -> Cancellable<(PeerId, Msg)> {
while !self.task_handle.is_shutting_down() {
if let Some(msg) = self.rx.recv().await {
return Ok((self.peer_id, msg));
}
sleep(Duration::from_secs(10)).await;
}
Err(Cancelled)
}
async fn ban_peer(&mut self, _peer: PeerId) {
unimplemented!();
}
}
pub fn make_fake_peer_connection<Msg>(
peer1: PeerId,
peer2: PeerId,
buf_size: usize,
task_handle: TaskHandle,
) -> (PeerConnections<Msg>, PeerConnections<Msg>)
where
Msg: Serialize + DeserializeOwned + Unpin + Send + 'static,
{
let (tx1, rx1) = mpsc::channel(buf_size);
let (tx2, rx2) = mpsc::channel(buf_size);
(
FakePeerConnections {
tx: tx1,
rx: rx2,
peer_id: peer2,
task_handle: task_handle.clone(),
}
.into_dyn(),
FakePeerConnections {
tx: tx2,
rx: rx1,
peer_id: peer1,
task_handle,
}
.into_dyn(),
)
}