fedimint_server/net/
p2p_connection.rs

1use std::io::Cursor;
2use std::time::Duration;
3
4use anyhow::Context;
5use async_trait::async_trait;
6use bytes::Bytes;
7use fedimint_core::encoding::{Decodable, Encodable};
8use futures::{SinkExt, StreamExt};
9use iroh::endpoint::Connection;
10use serde::Serialize;
11use serde::de::DeserializeOwned;
12use tokio::net::TcpStream;
13use tokio_rustls::TlsStream;
14use tokio_util::codec::{Framed, LengthDelimitedCodec};
15
16pub type DynP2PConnection<M> = Box<dyn IP2PConnection<M>>;
17
18#[async_trait]
19pub trait IP2PConnection<M>: Send + 'static {
20    async fn send(&mut self, message: M) -> anyhow::Result<()>;
21
22    async fn receive(&mut self) -> anyhow::Result<M>;
23
24    fn rtt(&self) -> Duration;
25
26    fn into_dyn(self) -> DynP2PConnection<M>
27    where
28        Self: Sized,
29    {
30        Box::new(self)
31    }
32}
33
34#[async_trait]
35impl<M> IP2PConnection<M> for Framed<TlsStream<TcpStream>, LengthDelimitedCodec>
36where
37    M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
38{
39    async fn send(&mut self, message: M) -> anyhow::Result<()> {
40        let mut bytes = Vec::new();
41
42        bincode::serialize_into(&mut bytes, &message)?;
43
44        SinkExt::send(self, Bytes::from_owner(bytes)).await?;
45
46        Ok(())
47    }
48
49    async fn receive(&mut self) -> anyhow::Result<M> {
50        Ok(bincode::deserialize_from(Cursor::new(
51            &self.next().await.context("Framed stream is closed")??,
52        ))?)
53    }
54
55    fn rtt(&self) -> Duration {
56        Duration::from_millis(0)
57    }
58}
59
60#[async_trait]
61impl<M> IP2PConnection<M> for Connection
62where
63    M: Serialize + DeserializeOwned + Send + 'static,
64{
65    async fn send(&mut self, message: M) -> anyhow::Result<()> {
66        let mut bytes = Vec::new();
67
68        bincode::serialize_into(&mut bytes, &message)?;
69
70        let mut sink = self.open_uni().await?;
71
72        sink.write_all(&bytes).await?;
73
74        sink.finish()?;
75
76        Ok(())
77    }
78
79    async fn receive(&mut self) -> anyhow::Result<M> {
80        Ok(bincode::deserialize_from(Cursor::new(
81            &self.accept_uni().await?.read_to_end(1_000_000_000).await?,
82        ))?)
83    }
84
85    fn rtt(&self) -> Duration {
86        self.rtt()
87    }
88}