fedimint_server/net/
p2p_connection.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use std::fmt::Debug;
use std::io::Cursor;

use anyhow::Context;
use async_trait::async_trait;
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::net::TcpStream;
use tokio_rustls::TlsStream;
use tokio_util::codec::{Framed, LengthDelimitedCodec};

pub type DynP2PConnection<M> = Box<dyn IP2PConnection<M>>;

#[async_trait]
pub trait IP2PConnection<M>: Send + 'static {
    async fn send(&mut self, message: M) -> anyhow::Result<()>;

    async fn receive(&mut self) -> anyhow::Result<M>;

    fn into_dyn(self) -> DynP2PConnection<M>
    where
        Self: Sized,
    {
        Box::new(self)
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LegacyMessage<M> {
    Message(M),
    Ping,
}

#[async_trait]
impl<M> IP2PConnection<M> for Framed<TlsStream<TcpStream>, LengthDelimitedCodec>
where
    M: Serialize + DeserializeOwned + Send + 'static,
{
    async fn send(&mut self, message: M) -> anyhow::Result<()> {
        let mut bytes = Vec::new();

        bincode::serialize_into(&mut bytes, &LegacyMessage::Message(message))?;

        SinkExt::send(self, Bytes::from_owner(bytes)).await?;

        Ok(())
    }

    async fn receive(&mut self) -> anyhow::Result<M> {
        loop {
            let bytes = self.next().await.context("Framed stream is closed")??;

            if let Ok(legacy_message) = bincode::deserialize_from(Cursor::new(&bytes)) {
                match legacy_message {
                    LegacyMessage::Message(message) => return Ok(message),
                    LegacyMessage::Ping => continue,
                }
            }

            return Ok(bincode::deserialize_from(Cursor::new(&bytes))?);
        }
    }
}

#[cfg(all(feature = "iroh", not(target_family = "wasm")))]
pub mod iroh {
    use async_trait::async_trait;
    use fedimint_core::encoding::{Decodable, Encodable};
    use fedimint_core::module::registry::ModuleDecoderRegistry;
    use iroh::endpoint::Connection;

    use crate::net::p2p_connection::IP2PConnection;

    #[async_trait]
    impl<M> IP2PConnection<M> for Connection
    where
        M: Encodable + Decodable + Send + 'static,
    {
        async fn send(&mut self, message: M) -> anyhow::Result<()> {
            let mut sink = self.open_uni().await?;

            sink.write_all(&message.consensus_encode_to_vec()).await?;

            sink.finish()?;

            Ok(())
        }

        async fn receive(&mut self) -> anyhow::Result<M> {
            let bytes = self.accept_uni().await?.read_to_end(1_000_000_000).await?;

            Ok(Decodable::consensus_decode_whole(
                &bytes,
                &ModuleDecoderRegistry::default(),
            )?)
        }
    }
}