p2p_connector.rsuse std::collections::BTreeMap;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use anyhow::{ensure, format_err, Context};
use async_trait::async_trait;
use fedimint_core::util::SafeUrl;
use fedimint_core::PeerId;
use futures::Stream;
use rustls::ServerName;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::net::{TcpListener, TcpStream};
use tokio_rustls::rustls::server::AllowAnyAuthenticatedClient;
use tokio_rustls::rustls::RootCertStore;
use tokio_rustls::{rustls, TlsAcceptor, TlsConnector, TlsStream};
use tokio_stream::wrappers::TcpListenerStream;
use tokio_stream::StreamExt;
use tokio_util::codec::LengthDelimitedCodec;
use crate::net::p2p_connection::{DynP2PConnection, IP2PConnection};
pub type DynP2PConnector<M> = Arc<dyn IP2PConnector<M>>;
pub type P2PConnectionResult<M> = anyhow::Result<(PeerId, DynP2PConnection<M>)>;
pub type P2PConnectionListener<M> = Pin<Box<dyn Stream<Item = P2PConnectionResult<M>> + Send>>;
pub trait IP2PConnector<M>: Send + Sync + 'static {
fn peers(&self) -> Vec<PeerId>;
async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>>;
async fn listen(&self) -> P2PConnectionListener<M>;
fn into_dyn(self) -> DynP2PConnector<M>
Self: Sized,
#[derive(Debug, Clone)]
pub struct TlsConfig {
pub private_key: rustls::PrivateKey,
pub certificates: BTreeMap<PeerId, rustls::Certificate>,
pub peer_names: BTreeMap<PeerId, String>,
pub struct TlsTcpConnector {
cfg: TlsConfig,
p2p_bind_addr: SocketAddr,
peers: BTreeMap<PeerId, SafeUrl>,
identity: PeerId,
impl TlsTcpConnector {
pub fn new(
cfg: TlsConfig,
p2p_bind_addr: SocketAddr,
peers: BTreeMap<PeerId, SafeUrl>,
identity: PeerId,
) -> TlsTcpConnector {
TlsTcpConnector {
impl<M> IP2PConnector<M> for TlsTcpConnector
M: Serialize + DeserializeOwned + Send + 'static,
fn peers(&self) -> Vec<PeerId> {
.filter(|peer| **peer != self.identity)
async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
let mut root_cert_store = RootCertStore::empty();
for cert in self.cfg.certificates.values() {
.expect("Could not add peer certificate");
let certificate = self
.expect("No certificate for ourself found")
let cfg = rustls::ClientConfig::builder()
.with_client_auth_cert(vec![certificate], self.cfg.private_key.clone())
.expect("Failed to create TLS config");
let domain = ServerName::try_from(dns_sanitize(&self.cfg.peer_names[&peer]).as_str())
.expect("Always a valid DNS name");
let destination = self
.expect("No url for peer {peer}")
let tls = TlsConnector::from(Arc::new(cfg))
let certificate = tls
.context("Peer did not authenticate itself")?
.context("Received certificate chain of length zero")?;
let auth_peer = self
.find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
.context("Unknown certificate")?;
ensure!(auth_peer == peer, "Connected to unexpected peer");
async fn listen(&self) -> P2PConnectionListener<M> {
let mut root_cert_store = RootCertStore::empty();
for cert in self.cfg.certificates.values() {
.expect("Could not add peer certificate");
let verifier = AllowAnyAuthenticatedClient::new(root_cert_store);
let certificate = self
.expect("No certificate for ourself found")
let config = rustls::ServerConfig::builder()
.with_single_cert(vec![certificate], self.cfg.private_key.clone())
.expect("Failed to create TLS config");
let listener = TcpListener::bind(self.p2p_bind_addr)
.expect("Could not bind to port");
let acceptor = TlsAcceptor::from(Arc::new(config.clone()));
let cfg = self.cfg.clone();
let stream = TcpListenerStream::new(listener).then(move |connection| {
let cfg = cfg.clone();
let acceptor = acceptor.clone();
async move {
let tls = acceptor.accept(connection?).await?;
let certificate = tls
.context("Peer did not authenticate itself")?
.context("Received certificate chain of length zero")?;
let auth_peer = cfg
.find_map(|(peer, c)| if c == certificate { Some(*peer) } else { None })
.context("Unknown certificate")?;
let framed = LengthDelimitedCodec::builder()
Ok((auth_peer, framed))
pub fn dns_sanitize(name: &str) -> String {
let sanitized = name.replace(|c: char| !c.is_ascii_alphanumeric(), "_");
pub fn parse_host_port(url: &SafeUrl) -> anyhow::Result<String> {
let host = url
.ok_or_else(|| format_err!("Missing host in {url}"))?;
let port = url
.ok_or_else(|| format_err!("Missing port in {url}"))?;
#[cfg(all(feature = "iroh", not(target_family = "wasm")))]
pub mod iroh {
use std::collections::BTreeMap;
use anyhow::Context;
use async_trait::async_trait;
use fedimint_core::encoding::{Decodable, Encodable};
use fedimint_core::PeerId;
use iroh::endpoint::Incoming;
use iroh::{Endpoint, NodeId, SecretKey};
use crate::net::p2p_connection::IP2PConnection;
use crate::net::p2p_connector::{
DynP2PConnection, IP2PConnector, P2PConnectionListener, P2PConnectionResult,
#[derive(Debug, Clone)]
pub struct IrohConnector {
pub node_ids: BTreeMap<PeerId, NodeId>,
pub endpoint: Endpoint,
const FEDIMINT_ALPN: &[u8] = "FEDIMINT_ALPN".as_bytes();
impl IrohConnector {
pub async fn new(
secret_key: SecretKey,
node_ids: BTreeMap<PeerId, NodeId>,
) -> anyhow::Result<Self> {
let identity = *node_ids
.find(|entry| entry.1 == &secret_key.public())
.expect("Our public key is not part of the keyset")
Ok(Self {
node_ids: node_ids
.filter(|entry| entry.0 != identity)
endpoint: Endpoint::builder()
impl<M> IP2PConnector<M> for IrohConnector
M: Encodable + Decodable + Send + 'static,
fn peers(&self) -> Vec<PeerId> {
async fn connect(&self, peer: PeerId) -> anyhow::Result<DynP2PConnection<M>> {
let node_id = *self
.expect("No node id found for peer {peer}");
let connection = self.endpoint.connect(node_id, FEDIMINT_ALPN).await?;
async fn listen(&self) -> P2PConnectionListener<M> {
let stream = futures::stream::unfold(self.clone(), move |endpoint| async move {
let stream = endpoint.endpoint.accept().await?;
let result = accept_connection(&endpoint.node_ids, stream).await;
Some((result, endpoint))
async fn accept_connection<M>(
peers: &BTreeMap<PeerId, NodeId>,
incoming: Incoming,
) -> P2PConnectionResult<M>
M: Encodable + Decodable + Send + 'static,
let connection = incoming.accept()?.await?;
let node_id = iroh::endpoint::get_remote_node_id(&connection)?;
let peer_id = peers
.find(|entry| entry.1 == &node_id)
.context("Node id {node_id} is unknown")?
Ok((*peer_id, connection.into_dyn()))