fedimint_server/net/
p2p_connection.rs
1use std::io::Cursor;
2use std::time::Duration;
3
4use anyhow::Context;
5use async_trait::async_trait;
6use bytes::Bytes;
7use fedimint_core::encoding::{Decodable, Encodable};
8use futures::{SinkExt, StreamExt};
9use iroh::endpoint::Connection;
10use serde::Serialize;
11use serde::de::DeserializeOwned;
12use tokio::net::TcpStream;
13use tokio_rustls::TlsStream;
14use tokio_util::codec::{Framed, LengthDelimitedCodec};
15
16pub type DynP2PConnection<M> = Box<dyn IP2PConnection<M>>;
17
18#[async_trait]
19pub trait IP2PConnection<M>: Send + 'static {
20 async fn send(&mut self, message: M) -> anyhow::Result<()>;
21
22 async fn receive(&mut self) -> anyhow::Result<M>;
23
24 fn rtt(&self) -> Duration;
25
26 fn into_dyn(self) -> DynP2PConnection<M>
27 where
28 Self: Sized,
29 {
30 Box::new(self)
31 }
32}
33
34#[async_trait]
35impl<M> IP2PConnection<M> for Framed<TlsStream<TcpStream>, LengthDelimitedCodec>
36where
37 M: Encodable + Decodable + Serialize + DeserializeOwned + Send + 'static,
38{
39 async fn send(&mut self, message: M) -> anyhow::Result<()> {
40 let mut bytes = Vec::new();
41
42 bincode::serialize_into(&mut bytes, &message)?;
43
44 SinkExt::send(self, Bytes::from_owner(bytes)).await?;
45
46 Ok(())
47 }
48
49 async fn receive(&mut self) -> anyhow::Result<M> {
50 Ok(bincode::deserialize_from(Cursor::new(
51 &self.next().await.context("Framed stream is closed")??,
52 ))?)
53 }
54
55 fn rtt(&self) -> Duration {
56 Duration::from_millis(0)
57 }
58}
59
60#[async_trait]
61impl<M> IP2PConnection<M> for Connection
62where
63 M: Serialize + DeserializeOwned + Send + 'static,
64{
65 async fn send(&mut self, message: M) -> anyhow::Result<()> {
66 let mut bytes = Vec::new();
67
68 bincode::serialize_into(&mut bytes, &message)?;
69
70 let mut sink = self.open_uni().await?;
71
72 sink.write_all(&bytes).await?;
73
74 sink.finish()?;
75
76 Ok(())
77 }
78
79 async fn receive(&mut self) -> anyhow::Result<M> {
80 Ok(bincode::deserialize_from(Cursor::new(
81 &self.accept_uni().await?.read_to_end(1_000_000_000).await?,
82 ))?)
83 }
84
85 fn rtt(&self) -> Duration {
86 self.rtt()
87 }
88}