fedimint_server/net/
p2p_connection.rs

1use std::io::Cursor;
2use std::time::Duration;
3
4use anyhow::Context;
5use async_trait::async_trait;
6use bytes::{Bytes, BytesMut};
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::module::registry::ModuleDecoderRegistry;
9use futures::{SinkExt, StreamExt};
10use iroh::endpoint::{Connection, RecvStream};
11use serde::Serialize;
12use serde::de::DeserializeOwned;
13use tokio::net::TcpStream;
14use tokio_rustls::TlsStream;
15use tokio_util::codec::{Framed, LengthDelimitedCodec};
16
17/// Maximum size of a p2p message in bytes. The largest message we expect to
18/// receive is a signed session outcome.
19const MAX_P2P_MESSAGE_SIZE: usize = 10_000_000;
20
21pub type DynP2PConnection<M> = Box<dyn IP2PConnection<M>>;
22
23pub type DynIP2PFrame<M> = Box<dyn IP2PFrame<M>>;
24
25#[async_trait]
26pub trait IP2PFrame<M>: Send + 'static {
27    /// Read the entire frame from the connection and deserialize it into a
28    /// message. This is *not* required to be cancel-safe.
29    async fn read_to_end(&mut self) -> anyhow::Result<M>;
30
31    fn into_dyn(self) -> DynIP2PFrame<M>
32    where
33        Self: Sized,
34    {
35        Box::new(self)
36    }
37}
38
39#[async_trait]
40pub trait IP2PConnection<M>: Send + 'static {
41    /// Send a message over the connection. This is *not* required to be
42    /// cancel-safe.
43    async fn send(&mut self, message: M) -> anyhow::Result<()>;
44
45    /// Receive a p2p frame from the connection. This is *required* to be
46    /// cancel-safe.
47    async fn receive(&mut self) -> anyhow::Result<DynIP2PFrame<M>>;
48
49    /// Get the round-trip time of the connection.
50    fn rtt(&self) -> Duration;
51
52    fn into_dyn(self) -> DynP2PConnection<M>
53    where
54        Self: Sized,
55    {
56        Box::new(self)
57    }
58}
59
60/// Implementations of the IP2PFrame and IP2PConnection traits for TLS
61
62#[async_trait]
63impl<M> IP2PFrame<M> for BytesMut
64where
65    M: Decodable + DeserializeOwned + Send + 'static,
66{
67    async fn read_to_end(&mut self) -> anyhow::Result<M> {
68        if let Ok(message) = M::consensus_decode_whole(self, &ModuleDecoderRegistry::default()) {
69            return Ok(message);
70        }
71
72        Ok(bincode::deserialize_from(Cursor::new(&**self))?)
73    }
74}
75
76#[async_trait]
77impl<M> IP2PConnection<M> for Framed<TlsStream<TcpStream>, LengthDelimitedCodec>
78where
79    M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
80{
81    async fn send(&mut self, message: M) -> anyhow::Result<()> {
82        let mut bytes = Vec::new();
83
84        bincode::serialize_into(&mut bytes, &message)?;
85
86        SinkExt::send(self, Bytes::from_owner(bytes)).await?;
87
88        Ok(())
89    }
90
91    async fn receive(&mut self) -> anyhow::Result<DynIP2PFrame<M>> {
92        let message = self
93            .next()
94            .await
95            .context("Framed stream is closed")??
96            .into_dyn();
97
98        Ok(message)
99    }
100
101    fn rtt(&self) -> Duration {
102        Duration::from_millis(0)
103    }
104}
105
106/// Implementations of the IP2PFrame and IP2PConnection traits for Iroh
107
108#[async_trait]
109impl<M> IP2PFrame<M> for RecvStream
110where
111    M: Decodable + DeserializeOwned + Send + 'static,
112{
113    async fn read_to_end(&mut self) -> anyhow::Result<M> {
114        let bytes = self.read_to_end(MAX_P2P_MESSAGE_SIZE).await?;
115
116        if let Ok(message) = M::consensus_decode_whole(&bytes, &ModuleDecoderRegistry::default()) {
117            return Ok(message);
118        }
119
120        Ok(bincode::deserialize_from(Cursor::new(&bytes))?)
121    }
122}
123
124#[async_trait]
125impl<M> IP2PConnection<M> for Connection
126where
127    M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
128{
129    async fn send(&mut self, message: M) -> anyhow::Result<()> {
130        let mut bytes = Vec::new();
131
132        bincode::serialize_into(&mut bytes, &message)?;
133
134        let mut sink = self.open_uni().await?;
135
136        sink.write_all(&bytes).await?;
137
138        sink.finish()?;
139
140        Ok(())
141    }
142
143    async fn receive(&mut self) -> anyhow::Result<DynIP2PFrame<M>> {
144        Ok(self.accept_uni().await?.into_dyn())
145    }
146
147    fn rtt(&self) -> Duration {
148        self.rtt()
149    }
150}