fedimint_core/net/peers/
fake.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/// Fake (channel-based) implementation of [`super::PeerConnections`].
use std::time::Duration;

use async_trait::async_trait;
use fedimint_core::net::peers::{IPeerConnections, PeerConnections};
use fedimint_core::runtime::sleep;
use fedimint_core::task::{Cancellable, Cancelled, TaskHandle};
use fedimint_core::PeerId;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::mpsc::{self, Receiver, Sender};

struct FakePeerConnections<Msg> {
    tx: Sender<Msg>,
    rx: Receiver<Msg>,
    peer_id: PeerId,
    task_handle: TaskHandle,
}

#[async_trait]
impl<Msg> IPeerConnections<Msg> for FakePeerConnections<Msg>
where
    Msg: Serialize + DeserializeOwned + Unpin + Send,
{
    async fn send(&mut self, peers: &[PeerId], msg: Msg) -> Cancellable<()> {
        assert_eq!(peers, &[self.peer_id]);

        // If the peer is gone, just pretend we are going to resend
        // the msg eventually, even if it will never happen.
        let _ = self.tx.send(msg).await;
        Ok(())
    }

    async fn receive(&mut self) -> Cancellable<(PeerId, Msg)> {
        // Just like a real implementation, do not return
        // if the peer is gone.
        while !self.task_handle.is_shutting_down() {
            if let Some(msg) = self.rx.recv().await {
                return Ok((self.peer_id, msg));
            }

            sleep(Duration::from_secs(10)).await;
        }
        Err(Cancelled)
    }

    /// Removes a peer connection in case of misbehavior
    async fn ban_peer(&mut self, _peer: PeerId) {
        unimplemented!();
    }
}

/// Create a fake link between `peer1` and `peer2` for test purposes
///
/// `buf_size` controls the size of the `tokio::mpsc::channel` used
/// under the hood (both ways).
pub fn make_fake_peer_connection<Msg>(
    peer1: PeerId,
    peer2: PeerId,
    buf_size: usize,
    task_handle: TaskHandle,
) -> (PeerConnections<Msg>, PeerConnections<Msg>)
where
    Msg: Serialize + DeserializeOwned + Unpin + Send + 'static,
{
    let (tx1, rx1) = mpsc::channel(buf_size);
    let (tx2, rx2) = mpsc::channel(buf_size);

    (
        FakePeerConnections {
            tx: tx1,
            rx: rx2,
            peer_id: peer2,
            task_handle: task_handle.clone(),
        }
        .into_dyn(),
        FakePeerConnections {
            tx: tx2,
            rx: rx1,
            peer_id: peer1,
            task_handle,
        }
        .into_dyn(),
    )
}