pub mod audit;
pub mod registry;
use std::collections::BTreeMap;
use std::fmt::{self, Debug, Formatter};
use std::marker::{self, PhantomData};
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use fedimint_logging::LOG_NET_API;
use futures::Future;
use jsonrpsee_core::JsonValue;
use registry::ModuleRegistry;
use serde::{Deserialize, Serialize};
use tracing::Instrument;
mod version;
pub use self::version::*;
use crate::config::{
ClientModuleConfig, ConfigGenModuleParams, DkgPeerMsg, ModuleInitParams, ServerModuleConfig,
ServerModuleConsensusConfig,
};
use crate::core::{
ClientConfig, Decoder, DecoderBuilder, Input, InputError, ModuleConsensusItem,
ModuleInstanceId, ModuleKind, Output, OutputError, OutputOutcome,
};
use crate::db::{
Committable, CoreMigrationFn, Database, DatabaseKey, DatabaseKeyWithNotify, DatabaseRecord,
DatabaseTransaction, DatabaseVersion,
};
use crate::encoding::{Decodable, DecodeError, Encodable};
use crate::fmt_utils::AbbreviateHexBytes;
use crate::module::audit::Audit;
use crate::net::peers::MuxPeerConnections;
use crate::server::DynServerModule;
use crate::task::{MaybeSend, TaskGroup};
use crate::{
apply, async_trait_maybe_send, maybe_add_send, maybe_add_send_sync, Amount, NumPeers, OutPoint,
PeerId,
};
#[derive(Debug, PartialEq, Eq)]
pub struct InputMeta {
pub amount: TransactionItemAmount,
pub pub_key: secp256k1_zkp::PublicKey,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct TransactionItemAmount {
pub amount: Amount,
pub fee: Amount,
}
impl TransactionItemAmount {
pub const ZERO: Self = Self {
amount: Amount::ZERO,
fee: Amount::ZERO,
};
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ApiRequest<T> {
pub auth: Option<ApiAuth>,
pub params: T,
}
pub type ApiRequestErased = ApiRequest<JsonValue>;
impl Default for ApiRequestErased {
fn default() -> Self {
Self {
auth: None,
params: JsonValue::Null,
}
}
}
impl ApiRequestErased {
pub fn new<T: Serialize>(params: T) -> Self {
Self {
auth: None,
params: serde_json::to_value(params)
.expect("parameter serialization error - this should not happen"),
}
}
pub fn to_json(&self) -> JsonValue {
serde_json::to_value(self).expect("parameter serialization error - this should not happen")
}
pub fn with_auth(self, auth: ApiAuth) -> Self {
Self {
auth: Some(auth),
params: self.params,
}
}
pub fn to_typed<T: serde::de::DeserializeOwned>(
self,
) -> Result<ApiRequest<T>, serde_json::Error> {
Ok(ApiRequest {
auth: self.auth,
params: serde_json::from_value::<T>(self.params)?,
})
}
}
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ApiAuth(pub String);
impl Debug for ApiAuth {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ApiAuth(****)")
}
}
#[derive(Debug, Clone)]
pub struct ApiError {
pub code: i32,
pub message: String,
}
impl ApiError {
pub fn new(code: i32, message: String) -> Self {
Self { code, message }
}
pub fn not_found(message: String) -> Self {
Self::new(404, message)
}
pub fn bad_request(message: String) -> Self {
Self::new(400, message)
}
pub fn unauthorized() -> Self {
Self::new(401, "Invalid authorization".to_string())
}
pub fn server_error(message: String) -> Self {
Self::new(500, message)
}
}
pub struct ApiEndpointContext<'dbtx> {
db: Database,
dbtx: DatabaseTransaction<'dbtx, Committable>,
has_auth: bool,
request_auth: Option<ApiAuth>,
}
impl<'a> ApiEndpointContext<'a> {
pub fn new(
db: Database,
dbtx: DatabaseTransaction<'a, Committable>,
has_auth: bool,
request_auth: Option<ApiAuth>,
) -> Self {
Self {
db,
dbtx,
has_auth,
request_auth,
}
}
pub fn dbtx<'s, 'mtx>(&'s mut self) -> DatabaseTransaction<'mtx, Committable>
where
'a: 'mtx,
's: 'mtx,
{
self.dbtx.to_ref()
}
pub fn request_auth(&self) -> Option<ApiAuth> {
self.request_auth.clone()
}
pub fn has_auth(&self) -> bool {
self.has_auth
}
pub fn db(&self) -> Database {
self.db.clone()
}
pub fn wait_key_exists<K>(&self, key: K) -> impl Future<Output = K::Value>
where
K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
{
let db = self.db.clone();
async move { db.wait_key_exists(&key).await }
}
pub fn wait_value_matches<K>(
&self,
key: K,
matcher: impl Fn(&K::Value) -> bool + Copy,
) -> impl Future<Output = K::Value>
where
K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
{
let db = self.db.clone();
async move { db.wait_key_check(&key, |v| v.filter(matcher)).await.0 }
}
pub async fn commit_tx_result(self, path: &'static str) -> Result<(), ApiError> {
self.dbtx.commit_tx_result().await.map_err(|err| {
tracing::warn!(
target: fedimint_logging::LOG_NET_API,
path,
"API server error when writing to database: {:?}",
err
);
ApiError {
code: 500,
message: "API server error when writing to database".to_string(),
}
})
}
}
#[apply(async_trait_maybe_send!)]
pub trait TypedApiEndpoint {
type State: Sync;
const PATH: &'static str;
type Param: serde::de::DeserializeOwned + Send;
type Response: serde::Serialize;
async fn handle<'state, 'context, 'dbtx>(
state: &'state Self::State,
context: &'context mut ApiEndpointContext<'dbtx>,
request: Self::Param,
) -> Result<Self::Response, ApiError>
where
'dbtx: 'context;
}
pub use serde_json;
#[macro_export]
macro_rules! __api_endpoint {
(
$path:expr,
$version_introduced:expr,
async |$state:ident: &$state_ty:ty, $context:ident, $param:ident: $param_ty:ty| -> $resp_ty:ty $body:block
) => {{
struct Endpoint;
#[$crate::apply($crate::async_trait_maybe_send!)]
impl $crate::module::TypedApiEndpoint for Endpoint {
const PATH: &'static str = $path;
type State = $state_ty;
type Param = $param_ty;
type Response = $resp_ty;
async fn handle<'state, 'context, 'dbtx>(
$state: &'state Self::State,
$context: &'context mut $crate::module::ApiEndpointContext<'dbtx>,
$param: Self::Param,
) -> ::std::result::Result<Self::Response, $crate::module::ApiError> {
{
const __API_VERSION: $crate::module::ApiVersion = $version_introduced;
}
$body
}
}
$crate::module::ApiEndpoint::from_typed::<Endpoint>()
}};
}
pub use __api_endpoint as api_endpoint;
use fedimint_core::config::DkgResult;
use self::registry::ModuleDecoderRegistry;
type HandlerFnReturn<'a> =
Pin<Box<maybe_add_send!(dyn Future<Output = Result<serde_json::Value, ApiError>> + 'a)>>;
type HandlerFn<M> = Box<
maybe_add_send_sync!(
dyn for<'a> Fn(&'a M, ApiEndpointContext<'a>, ApiRequestErased) -> HandlerFnReturn<'a>
),
>;
pub struct ApiEndpoint<M> {
pub path: &'static str,
pub handler: HandlerFn<M>,
}
static REQ_ID: AtomicU64 = AtomicU64::new(0);
impl ApiEndpoint<()> {
pub fn from_typed<E: TypedApiEndpoint>() -> ApiEndpoint<E::State>
where
<E as TypedApiEndpoint>::Response: MaybeSend,
E::Param: Debug,
E::Response: Debug,
{
async fn handle_request<'state, 'context, 'dbtx, E>(
state: &'state E::State,
context: &'context mut ApiEndpointContext<'dbtx>,
request: ApiRequest<E::Param>,
) -> Result<E::Response, ApiError>
where
'dbtx: 'context,
E: TypedApiEndpoint,
E::Param: Debug,
E::Response: Debug,
{
tracing::debug!(target: LOG_NET_API, path = E::PATH, ?request, "received api request");
let result = E::handle(state, context, request.params).await;
if let Err(error) = &result {
tracing::warn!(target: LOG_NET_API, path = E::PATH, ?error, "api request error");
} else {
tracing::debug!(target: LOG_NET_API, path = E::PATH, "api request complete");
}
result
}
ApiEndpoint {
path: E::PATH,
handler: Box::new(|m, mut context, request| {
Box::pin(async {
let request = request
.to_typed()
.map_err(|e| ApiError::bad_request(e.to_string()))?;
let span = tracing::info_span!(
target: LOG_NET_API,
"api_req",
id = REQ_ID.fetch_add(1, Ordering::SeqCst),
method = E::PATH,
);
let ret = handle_request::<E>(m, &mut context, request)
.instrument(span)
.await?;
context.commit_tx_result(E::PATH).await?;
Ok(serde_json::to_value(ret).expect("encoding error"))
})
}),
}
}
}
#[apply(async_trait_maybe_send!)]
pub trait IDynCommonModuleInit: Debug {
fn decoder(&self) -> Decoder;
fn module_kind(&self) -> ModuleKind;
fn to_dyn_common(&self) -> DynCommonModuleInit;
fn database_version(&self) -> DatabaseVersion;
async fn dump_database(
&self,
dbtx: &mut DatabaseTransaction<'_>,
prefix_names: Vec<String>,
) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_>;
}
pub trait ModuleInit: Debug + Clone + Send + Sync + 'static {
type Common: CommonModuleInit;
const DATABASE_VERSION: DatabaseVersion;
fn dump_database(
&self,
dbtx: &mut DatabaseTransaction<'_>,
prefix_names: Vec<String>,
) -> maybe_add_send!(
impl Future<
Output = Box<
dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_,
>,
>
);
}
#[apply(async_trait_maybe_send!)]
impl<T> IDynCommonModuleInit for T
where
T: ModuleInit,
{
fn decoder(&self) -> Decoder {
T::Common::decoder()
}
fn module_kind(&self) -> ModuleKind {
T::Common::KIND
}
fn to_dyn_common(&self) -> DynCommonModuleInit {
DynCommonModuleInit::from_inner(Arc::new(self.clone()))
}
fn database_version(&self) -> DatabaseVersion {
<Self as ModuleInit>::DATABASE_VERSION
}
async fn dump_database(
&self,
dbtx: &mut DatabaseTransaction<'_>,
prefix_names: Vec<String>,
) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
<Self as ModuleInit>::dump_database(self, dbtx, prefix_names).await
}
}
#[apply(async_trait_maybe_send!)]
pub trait IServerModuleInit: IDynCommonModuleInit {
fn as_common(&self) -> &(dyn IDynCommonModuleInit + Send + Sync + 'static);
fn supported_api_versions(&self) -> SupportedModuleApiVersions;
async fn init(
&self,
peer_num: NumPeers,
cfg: ServerModuleConfig,
db: Database,
task_group: &TaskGroup,
our_peer_id: PeerId,
) -> anyhow::Result<DynServerModule>;
fn validate_params(&self, params: &ConfigGenModuleParams) -> anyhow::Result<()>;
fn trusted_dealer_gen(
&self,
peers: &[PeerId],
params: &ConfigGenModuleParams,
) -> BTreeMap<PeerId, ServerModuleConfig>;
async fn distributed_gen(
&self,
peers: &PeerHandle,
params: &ConfigGenModuleParams,
) -> DkgResult<ServerModuleConfig>;
fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()>;
fn get_client_config(
&self,
module_instance_id: ModuleInstanceId,
config: &ServerModuleConsensusConfig,
) -> anyhow::Result<ClientModuleConfig>;
fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, CoreMigrationFn>;
}
dyn_newtype_define!(
#[derive(Clone)]
pub DynCommonModuleInit(Arc<IDynCommonModuleInit>)
);
impl AsRef<maybe_add_send_sync!(dyn IDynCommonModuleInit + 'static)> for DynCommonModuleInit {
fn as_ref(&self) -> &(maybe_add_send_sync!(dyn IDynCommonModuleInit + 'static)) {
self.inner.as_ref()
}
}
impl DynCommonModuleInit {
pub fn from_inner(
inner: Arc<maybe_add_send_sync!(dyn IDynCommonModuleInit + 'static)>,
) -> Self {
Self { inner }
}
}
dyn_newtype_define!(
#[derive(Clone)]
pub DynServerModuleInit(Arc<IServerModuleInit>)
);
impl AsRef<dyn IDynCommonModuleInit + Send + Sync + 'static> for DynServerModuleInit {
fn as_ref(&self) -> &(dyn IDynCommonModuleInit + Send + Sync + 'static) {
self.inner.as_common()
}
}
#[apply(async_trait_maybe_send!)]
pub trait CommonModuleInit: Debug + Sized {
const CONSENSUS_VERSION: ModuleConsensusVersion;
const KIND: ModuleKind;
type ClientConfig: ClientConfig;
fn decoder() -> Decoder;
}
pub struct ServerModuleInitArgs<S>
where
S: ServerModuleInit,
{
cfg: ServerModuleConfig,
db: Database,
task_group: TaskGroup,
our_peer_id: PeerId,
num_peers: NumPeers,
_marker: marker::PhantomData<S>,
}
impl<S> ServerModuleInitArgs<S>
where
S: ServerModuleInit,
{
pub fn cfg(&self) -> &ServerModuleConfig {
&self.cfg
}
pub fn db(&self) -> &Database {
&self.db
}
pub fn num_peers(&self) -> NumPeers {
self.num_peers
}
pub fn task_group(&self) -> &TaskGroup {
&self.task_group
}
pub fn our_peer_id(&self) -> PeerId {
self.our_peer_id
}
}
#[apply(async_trait_maybe_send!)]
pub trait ServerModuleInit: ModuleInit + Sized {
type Params: ModuleInitParams;
fn versions(&self, core: CoreConsensusVersion) -> &[ModuleConsensusVersion];
fn supported_api_versions(&self) -> SupportedModuleApiVersions;
fn kind() -> ModuleKind {
<Self as ModuleInit>::Common::KIND
}
async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<DynServerModule>;
fn parse_params(&self, params: &ConfigGenModuleParams) -> anyhow::Result<Self::Params> {
params.to_typed::<Self::Params>()
}
fn trusted_dealer_gen(
&self,
peers: &[PeerId],
params: &ConfigGenModuleParams,
) -> BTreeMap<PeerId, ServerModuleConfig>;
async fn distributed_gen(
&self,
peer: &PeerHandle,
params: &ConfigGenModuleParams,
) -> DkgResult<ServerModuleConfig>;
fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()>;
fn get_client_config(
&self,
config: &ServerModuleConsensusConfig,
) -> anyhow::Result<<<Self as ModuleInit>::Common as CommonModuleInit>::ClientConfig>;
fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
BTreeMap::new()
}
}
#[apply(async_trait_maybe_send!)]
impl<T> IServerModuleInit for T
where
T: ServerModuleInit + 'static + Sync,
{
fn as_common(&self) -> &(dyn IDynCommonModuleInit + Send + Sync + 'static) {
self
}
fn supported_api_versions(&self) -> SupportedModuleApiVersions {
<Self as ServerModuleInit>::supported_api_versions(self)
}
async fn init(
&self,
num_peers: NumPeers,
cfg: ServerModuleConfig,
db: Database,
task_group: &TaskGroup,
our_peer_id: PeerId,
) -> anyhow::Result<DynServerModule> {
<Self as ServerModuleInit>::init(
self,
&ServerModuleInitArgs {
num_peers,
cfg,
db,
task_group: task_group.clone(),
our_peer_id,
_marker: PhantomData,
},
)
.await
}
fn validate_params(&self, params: &ConfigGenModuleParams) -> anyhow::Result<()> {
<Self as ServerModuleInit>::parse_params(self, params)?;
Ok(())
}
fn trusted_dealer_gen(
&self,
peers: &[PeerId],
params: &ConfigGenModuleParams,
) -> BTreeMap<PeerId, ServerModuleConfig> {
<Self as ServerModuleInit>::trusted_dealer_gen(self, peers, params)
}
async fn distributed_gen(
&self,
peers: &PeerHandle,
params: &ConfigGenModuleParams,
) -> DkgResult<ServerModuleConfig> {
<Self as ServerModuleInit>::distributed_gen(self, peers, params).await
}
fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
<Self as ServerModuleInit>::validate_config(self, identity, config)
}
fn get_client_config(
&self,
module_instance_id: ModuleInstanceId,
config: &ServerModuleConsensusConfig,
) -> anyhow::Result<ClientModuleConfig> {
ClientModuleConfig::from_typed(
module_instance_id,
<Self as ServerModuleInit>::kind(),
config.version,
<Self as ServerModuleInit>::get_client_config(self, config)?,
)
}
fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
<Self as ServerModuleInit>::get_database_migrations(self)
}
}
pub trait ModuleCommon {
type ClientConfig: ClientConfig;
type Input: Input;
type Output: Output;
type OutputOutcome: OutputOutcome;
type ConsensusItem: ModuleConsensusItem;
type InputError: InputError;
type OutputError: OutputError;
fn decoder_builder() -> DecoderBuilder {
let mut decoder_builder = Decoder::builder();
decoder_builder.with_decodable_type::<Self::ClientConfig>();
decoder_builder.with_decodable_type::<Self::Input>();
decoder_builder.with_decodable_type::<Self::Output>();
decoder_builder.with_decodable_type::<Self::OutputOutcome>();
decoder_builder.with_decodable_type::<Self::ConsensusItem>();
decoder_builder.with_decodable_type::<Self::InputError>();
decoder_builder.with_decodable_type::<Self::OutputError>();
decoder_builder
}
fn decoder() -> Decoder {
Self::decoder_builder().build()
}
}
#[apply(async_trait_maybe_send!)]
pub trait ServerModule: Debug + Sized {
type Common: ModuleCommon;
type Init: ServerModuleInit;
fn module_kind() -> ModuleKind {
<Self::Init as ModuleInit>::Common::KIND
}
fn decoder() -> Decoder {
Self::Common::decoder_builder().build()
}
async fn consensus_proposal<'a>(
&'a self,
dbtx: &mut DatabaseTransaction<'_>,
) -> Vec<<Self::Common as ModuleCommon>::ConsensusItem>;
async fn process_consensus_item<'a, 'b>(
&'a self,
dbtx: &mut DatabaseTransaction<'b>,
consensus_item: <Self::Common as ModuleCommon>::ConsensusItem,
peer_id: PeerId,
) -> anyhow::Result<()>;
fn verify_input(
&self,
_input: &<Self::Common as ModuleCommon>::Input,
) -> Result<(), <Self::Common as ModuleCommon>::InputError> {
Ok(())
}
async fn process_input<'a, 'b, 'c>(
&'a self,
dbtx: &mut DatabaseTransaction<'c>,
input: &'b <Self::Common as ModuleCommon>::Input,
) -> Result<InputMeta, <Self::Common as ModuleCommon>::InputError>;
async fn process_output<'a, 'b>(
&'a self,
dbtx: &mut DatabaseTransaction<'b>,
output: &'a <Self::Common as ModuleCommon>::Output,
out_point: OutPoint,
) -> Result<TransactionItemAmount, <Self::Common as ModuleCommon>::OutputError>;
async fn output_status(
&self,
dbtx: &mut DatabaseTransaction<'_>,
out_point: OutPoint,
) -> Option<<Self::Common as ModuleCommon>::OutputOutcome>;
async fn audit(
&self,
dbtx: &mut DatabaseTransaction<'_>,
audit: &mut Audit,
module_instance_id: ModuleInstanceId,
);
fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>>;
}
#[derive(Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct SerdeModuleEncoding<T: Encodable + Decodable>(
#[serde(with = "::fedimint_core::encoding::as_hex")] Vec<u8>,
#[serde(skip)] PhantomData<T>,
);
impl<T> fmt::Debug for SerdeModuleEncoding<T>
where
T: Encodable + Decodable,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str("SerdeModuleEncoding(")?;
fmt::Debug::fmt(&AbbreviateHexBytes(&self.0), f)?;
f.write_str(")")?;
Ok(())
}
}
impl<T: Encodable + Decodable> From<&T> for SerdeModuleEncoding<T> {
fn from(value: &T) -> Self {
let mut bytes = vec![];
fedimint_core::encoding::Encodable::consensus_encode(value, &mut bytes)
.expect("Writing to buffer can never fail");
Self(bytes, PhantomData)
}
}
impl<T: Encodable + Decodable + 'static> SerdeModuleEncoding<T> {
pub fn try_into_inner(&self, modules: &ModuleDecoderRegistry) -> Result<T, DecodeError> {
let mut reader = std::io::Cursor::new(&self.0);
Decodable::consensus_decode(&mut reader, modules)
}
pub fn try_into_inner_known_module_kind(&self, decoder: &Decoder) -> Result<T, DecodeError> {
let mut reader = std::io::Cursor::new(&self.0);
let module_instance =
ModuleInstanceId::consensus_decode(&mut reader, &ModuleDecoderRegistry::default())?;
let total_len = u64::consensus_decode(&mut reader, &ModuleDecoderRegistry::default())?;
decoder.decode_complete(
&mut reader,
total_len,
module_instance,
&ModuleRegistry::default(),
)
}
}
#[non_exhaustive]
pub struct PeerHandle<'a> {
#[doc(hidden)]
pub connections: &'a MuxPeerConnections<(ModuleInstanceId, String), DkgPeerMsg>,
#[doc(hidden)]
pub module_instance_id: ModuleInstanceId,
#[doc(hidden)]
pub our_id: PeerId,
#[doc(hidden)]
pub peers: Vec<PeerId>,
}
impl<'a> PeerHandle<'a> {
pub fn new(
connections: &'a MuxPeerConnections<(ModuleInstanceId, String), DkgPeerMsg>,
module_instance_id: ModuleInstanceId,
our_id: PeerId,
peers: Vec<PeerId>,
) -> Self {
Self {
connections,
module_instance_id,
our_id,
peers,
}
}
pub fn peer_ids(&self) -> &[PeerId] {
self.peers.as_slice()
}
}