fedimint_server/net/
p2p_connection.rs1use std::io::Cursor;
2use std::time::Duration;
3
4use anyhow::Context;
5use async_trait::async_trait;
6use bytes::{Bytes, BytesMut};
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::module::registry::ModuleDecoderRegistry;
9use futures::{SinkExt, StreamExt};
10use iroh::endpoint::{Connection, RecvStream};
11use serde::Serialize;
12use serde::de::DeserializeOwned;
13use tokio::net::TcpStream;
14use tokio_rustls::TlsStream;
15use tokio_util::codec::{Framed, LengthDelimitedCodec};
16
17const MAX_P2P_MESSAGE_SIZE: usize = 10_000_000;
20
21pub type DynP2PConnection<M> = Box<dyn IP2PConnection<M>>;
22
23pub type DynIP2PFrame<M> = Box<dyn IP2PFrame<M>>;
24
25#[async_trait]
26pub trait IP2PFrame<M>: Send + 'static {
27 async fn read_to_end(&mut self) -> anyhow::Result<M>;
30
31 fn into_dyn(self) -> DynIP2PFrame<M>
32 where
33 Self: Sized,
34 {
35 Box::new(self)
36 }
37}
38
39#[async_trait]
40pub trait IP2PConnection<M>: Send + 'static {
41 async fn send(&mut self, message: M) -> anyhow::Result<()>;
44
45 async fn receive(&mut self) -> anyhow::Result<DynIP2PFrame<M>>;
48
49 fn rtt(&self) -> Duration;
51
52 fn into_dyn(self) -> DynP2PConnection<M>
53 where
54 Self: Sized,
55 {
56 Box::new(self)
57 }
58}
59
60#[async_trait]
63impl<M> IP2PFrame<M> for BytesMut
64where
65 M: Decodable + DeserializeOwned + Send + 'static,
66{
67 async fn read_to_end(&mut self) -> anyhow::Result<M> {
68 if let Ok(message) = M::consensus_decode_whole(self, &ModuleDecoderRegistry::default()) {
69 return Ok(message);
70 }
71
72 Ok(bincode::deserialize_from(Cursor::new(&**self))?)
73 }
74}
75
76#[async_trait]
77impl<M> IP2PConnection<M> for Framed<TlsStream<TcpStream>, LengthDelimitedCodec>
78where
79 M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
80{
81 async fn send(&mut self, message: M) -> anyhow::Result<()> {
82 let mut bytes = Vec::new();
83
84 bincode::serialize_into(&mut bytes, &message)?;
85
86 SinkExt::send(self, Bytes::from_owner(bytes)).await?;
87
88 Ok(())
89 }
90
91 async fn receive(&mut self) -> anyhow::Result<DynIP2PFrame<M>> {
92 let message = self
93 .next()
94 .await
95 .context("Framed stream is closed")??
96 .into_dyn();
97
98 Ok(message)
99 }
100
101 fn rtt(&self) -> Duration {
102 Duration::from_millis(0)
103 }
104}
105
106#[async_trait]
109impl<M> IP2PFrame<M> for RecvStream
110where
111 M: Decodable + DeserializeOwned + Send + 'static,
112{
113 async fn read_to_end(&mut self) -> anyhow::Result<M> {
114 let bytes = self.read_to_end(MAX_P2P_MESSAGE_SIZE).await?;
115
116 if let Ok(message) = M::consensus_decode_whole(&bytes, &ModuleDecoderRegistry::default()) {
117 return Ok(message);
118 }
119
120 Ok(bincode::deserialize_from(Cursor::new(&bytes))?)
121 }
122}
123
124#[async_trait]
125impl<M> IP2PConnection<M> for Connection
126where
127 M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
128{
129 async fn send(&mut self, message: M) -> anyhow::Result<()> {
130 let mut bytes = Vec::new();
131
132 bincode::serialize_into(&mut bytes, &message)?;
133
134 let mut sink = self.open_uni().await?;
135
136 sink.write_all(&bytes).await?;
137
138 sink.finish()?;
139
140 Ok(())
141 }
142
143 async fn receive(&mut self) -> anyhow::Result<DynIP2PFrame<M>> {
144 Ok(self.accept_uni().await?.into_dyn())
145 }
146
147 fn rtt(&self) -> Duration {
148 self.rtt()
149 }
150}