use std::collections::HashSet;
use std::fmt::Debug;
use std::future;
use std::ops::Range;
use std::time::{Duration, SystemTime};
use async_stream::stream;
use fedimint_core::core::OperationId;
use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
use fedimint_core::module::registry::ModuleDecoderRegistry;
use fedimint_core::task::{MaybeSend, MaybeSync};
use fedimint_core::time::now;
use fedimint_core::util::BoxStream;
use fedimint_logging::LOG_CLIENT;
use futures::{stream, Stream, StreamExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tracing::{error, instrument, warn};
use crate::db::{
ChronologicalOperationLogKey, ChronologicalOperationLogKeyPrefix, OperationLogKey,
};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(transparent)]
pub struct JsonStringed(pub serde_json::Value);
impl Encodable for JsonStringed {
fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
let json_str = serde_json::to_string(&self.0).expect("JSON serialization should not fail");
json_str.consensus_encode(writer)
}
}
impl Decodable for JsonStringed {
fn consensus_decode_partial<R: std::io::Read>(
r: &mut R,
modules: &ModuleDecoderRegistry,
) -> Result<Self, DecodeError> {
let json_str = String::consensus_decode_partial(r, modules)?;
let value = serde_json::from_str(&json_str).map_err(DecodeError::from_err)?;
Ok(JsonStringed(value))
}
}
#[derive(Debug, Clone)]
pub struct OperationLog {
db: Database,
oldest_entry: OnceCell<ChronologicalOperationLogKey>,
}
impl OperationLog {
pub fn new(db: Database) -> Self {
Self {
db,
oldest_entry: OnceCell::new(),
}
}
async fn get_oldest_operation_log_key(&self) -> Option<ChronologicalOperationLogKey> {
let mut dbtx = self.db.begin_transaction_nc().await;
self.oldest_entry
.get_or_try_init(move || async move {
dbtx.find_by_prefix(&ChronologicalOperationLogKeyPrefix)
.await
.map(|(key, ())| key)
.next()
.await
.ok_or(())
})
.await
.ok()
.copied()
}
pub async fn add_operation_log_entry(
&self,
dbtx: &mut DatabaseTransaction<'_>,
operation_id: OperationId,
operation_type: &str,
operation_meta: impl serde::Serialize,
) {
dbtx.insert_new_entry(
&OperationLogKey { operation_id },
&OperationLogEntry {
operation_module_kind: operation_type.to_string(),
meta: JsonStringed(
serde_json::to_value(operation_meta)
.expect("Can only fail if meta is not serializable"),
),
outcome: None,
},
)
.await;
dbtx.insert_new_entry(
&ChronologicalOperationLogKey {
creation_time: now(),
operation_id,
},
&(),
)
.await;
}
#[deprecated(since = "0.6.0", note = "Use `paginate_operations_rev` instead")]
pub async fn list_operations(
&self,
limit: usize,
last_seen: Option<ChronologicalOperationLogKey>,
) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> {
self.paginate_operations_rev(limit, last_seen).await
}
pub async fn paginate_operations_rev(
&self,
limit: usize,
last_seen: Option<ChronologicalOperationLogKey>,
) -> Vec<(ChronologicalOperationLogKey, OperationLogEntry)> {
const EPOCH_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 7);
let start_after_key = last_seen.unwrap_or_else(|| ChronologicalOperationLogKey {
creation_time: now() + Duration::from_secs(30),
operation_id: OperationId([0; 32]),
});
let Some(oldest_entry_key) = self.get_oldest_operation_log_key().await else {
return vec![];
};
let mut dbtx = self.db.begin_transaction_nc().await;
let mut operation_log_keys = Vec::with_capacity(limit);
'outer: for key_range_rev in
rev_epoch_ranges(start_after_key, oldest_entry_key, EPOCH_DURATION)
{
let epoch_operation_log_keys_rev = dbtx
.find_by_range(key_range_rev)
.await
.map(|(key, ())| key)
.collect::<Vec<_>>()
.await;
for operation_log_key in epoch_operation_log_keys_rev.into_iter().rev() {
operation_log_keys.push(operation_log_key);
if operation_log_keys.len() >= limit {
break 'outer;
}
}
}
debug_assert!(
operation_log_keys.iter().collect::<HashSet<_>>().len() == operation_log_keys.len(),
"Operation log keys returned are not unique"
);
let mut operation_log_entries = Vec::with_capacity(operation_log_keys.len());
for operation_log_key in operation_log_keys {
let operation_log_entry = dbtx
.get_value(&OperationLogKey {
operation_id: operation_log_key.operation_id,
})
.await
.expect("Inconsistent DB");
operation_log_entries.push((operation_log_key, operation_log_entry));
}
operation_log_entries
}
pub async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry> {
Self::get_operation_inner(
&mut self.db.begin_transaction_nc().await.into_nc(),
operation_id,
)
.await
}
async fn get_operation_inner(
dbtx: &mut DatabaseTransaction<'_>,
operation_id: OperationId,
) -> Option<OperationLogEntry> {
dbtx.get_value(&OperationLogKey { operation_id }).await
}
#[instrument(target = LOG_CLIENT, skip(db), level = "debug")]
pub async fn set_operation_outcome(
db: &Database,
operation_id: OperationId,
outcome: &(impl Serialize + Debug),
) -> anyhow::Result<()> {
let outcome_json =
JsonStringed(serde_json::to_value(outcome).expect("Outcome is not serializable"));
let mut dbtx = db.begin_transaction().await;
let mut operation = Self::get_operation_inner(&mut dbtx.to_ref_nc(), operation_id)
.await
.expect("Operation exists");
operation.outcome = Some(OperationOutcome {
time: fedimint_core::time::now(),
outcome: outcome_json,
});
dbtx.insert_entry(&OperationLogKey { operation_id }, &operation)
.await;
dbtx.commit_tx_result().await?;
Ok(())
}
pub async fn optimistically_set_operation_outcome(
db: &Database,
operation_id: OperationId,
outcome: &(impl Serialize + Debug),
) {
if let Err(e) = Self::set_operation_outcome(db, operation_id, outcome).await {
warn!(
target: LOG_CLIENT,
"Error setting operation outcome: {e}"
);
}
}
}
fn rev_epoch_ranges(
start_after: ChronologicalOperationLogKey,
last_entry: ChronologicalOperationLogKey,
epoch_duration: Duration,
) -> impl Iterator<Item = Range<ChronologicalOperationLogKey>> {
(0..)
.map(move |epoch| start_after.creation_time - epoch * epoch_duration)
.take_while(move |&start_time| start_time >= last_entry.creation_time)
.map(move |start_time| {
let end_time = start_time - epoch_duration;
let start_key = if start_time == start_after.creation_time {
start_after
} else {
ChronologicalOperationLogKey {
creation_time: start_time,
operation_id: OperationId([0; 32]),
}
};
let end_key = ChronologicalOperationLogKey {
creation_time: end_time,
operation_id: OperationId([0; 32]),
};
Range {
start: end_key,
end: start_key,
}
})
}
#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
pub struct OperationLogEntryV0 {
pub(crate) operation_module_kind: String,
pub(crate) meta: JsonStringed,
pub(crate) outcome: Option<JsonStringed>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable, PartialEq, Eq)]
pub(crate) struct OperationOutcome {
pub(crate) time: SystemTime,
pub(crate) outcome: JsonStringed,
}
#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
pub struct OperationLogEntry {
pub(crate) operation_module_kind: String,
pub(crate) meta: JsonStringed,
pub(crate) outcome: Option<OperationOutcome>,
}
impl OperationLogEntry {
pub fn operation_module_kind(&self) -> &str {
&self.operation_module_kind
}
pub fn meta<M: DeserializeOwned>(&self) -> M {
serde_json::from_value(self.meta.0.clone()).expect("JSON deserialization should not fail")
}
pub fn outcome<D: DeserializeOwned>(&self) -> Option<D> {
self.outcome.as_ref().map(|outcome| {
serde_json::from_value(outcome.outcome.0.clone())
.expect("JSON deserialization should not fail")
})
}
pub fn outcome_time(&self) -> Option<SystemTime> {
self.outcome.as_ref().map(|o| o.time)
}
pub fn outcome_or_updates<U, S>(
&self,
db: &Database,
operation_id: OperationId,
stream_gen: impl FnOnce() -> S,
) -> UpdateStreamOrOutcome<U>
where
U: Clone + Serialize + DeserializeOwned + Debug + MaybeSend + MaybeSync + 'static,
S: Stream<Item = U> + MaybeSend + 'static,
{
match self.outcome::<U>() {
Some(outcome) => UpdateStreamOrOutcome::Outcome(outcome),
None => UpdateStreamOrOutcome::UpdateStream(caching_operation_update_stream(
db.clone(),
operation_id,
stream_gen(),
)),
}
}
}
pub enum UpdateStreamOrOutcome<U> {
UpdateStream(BoxStream<'static, U>),
Outcome(U),
}
impl<U> UpdateStreamOrOutcome<U>
where
U: MaybeSend + MaybeSync + 'static,
{
pub fn into_stream(self) -> BoxStream<'static, U> {
match self {
UpdateStreamOrOutcome::UpdateStream(stream) => stream,
UpdateStreamOrOutcome::Outcome(outcome) => {
Box::pin(stream::once(future::ready(outcome)))
}
}
}
}
pub fn caching_operation_update_stream<'a, U, S>(
db: Database,
operation_id: OperationId,
stream: S,
) -> BoxStream<'a, U>
where
U: Clone + Serialize + Debug + MaybeSend + MaybeSync + 'static,
S: Stream<Item = U> + MaybeSend + 'a,
{
let mut stream = Box::pin(stream);
Box::pin(stream! {
let mut last_update = None;
while let Some(update) = stream.next().await {
yield update.clone();
last_update = Some(update);
}
let Some(last_update) = last_update else {
error!(
target: LOG_CLIENT,
"Stream ended without any updates, this should not happen!"
);
return;
};
OperationLog::optimistically_set_operation_outcome(&db, operation_id, &last_update).await;
})
}
#[cfg(test)]
mod tests {
use std::time::{Duration, SystemTime};
use fedimint_core::core::OperationId;
use fedimint_core::db::mem_impl::MemDatabase;
use fedimint_core::db::{
Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, IRawDatabaseExt,
};
use fedimint_core::module::registry::ModuleRegistry;
use futures::stream::StreamExt;
use serde::{Deserialize, Serialize};
use super::UpdateStreamOrOutcome;
use crate::db::{ChronologicalOperationLogKey, OperationLogKey};
use crate::oplog::{JsonStringed, OperationLog, OperationLogEntry, OperationOutcome};
#[test]
fn test_operation_log_entry_serde() {
let op_log = OperationLogEntry {
operation_module_kind: "test".to_string(),
meta: JsonStringed(serde_json::to_value(()).unwrap()),
outcome: None,
};
op_log.meta::<()>();
}
#[test]
fn test_operation_log_entry_serde_extra_meta() {
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
struct Meta {
foo: String,
extra_meta: serde_json::Value,
}
let meta = Meta {
foo: "bar".to_string(),
extra_meta: serde_json::to_value(()).unwrap(),
};
let op_log = OperationLogEntry {
operation_module_kind: "test".to_string(),
meta: JsonStringed(serde_json::to_value(meta.clone()).unwrap()),
outcome: Some(OperationOutcome {
time: fedimint_core::time::now(),
outcome: JsonStringed(serde_json::to_value("test_outcome").unwrap()),
}),
};
assert_eq!(op_log.meta::<Meta>(), meta);
}
#[tokio::test]
async fn test_operation_log_update() {
let op_id = OperationId([0x32; 32]);
let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
let op_log = OperationLog::new(db.clone());
let mut dbtx = db.begin_transaction().await;
op_log
.add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar")
.await;
dbtx.commit_tx().await;
let op = op_log.get_operation(op_id).await.expect("op exists");
assert_eq!(op.outcome, None);
OperationLog::set_operation_outcome(&db, op_id, &"baz")
.await
.unwrap();
let op = op_log.get_operation(op_id).await.expect("op exists");
assert_eq!(op.outcome::<String>(), Some("baz".to_string()));
assert!(op.outcome_time().is_some(), "outcome_time should be set");
let update_stream_or_outcome =
op.outcome_or_updates::<String, _>(&db, op_id, futures::stream::empty);
assert!(matches!(
&update_stream_or_outcome,
UpdateStreamOrOutcome::Outcome(s) if s == "baz"
));
let updates = update_stream_or_outcome
.into_stream()
.collect::<Vec<_>>()
.await;
assert_eq!(updates, vec!["baz"]);
}
#[tokio::test]
async fn test_operation_log_update_from_stream() {
let op_id = OperationId([0x32; 32]);
let db = MemDatabase::new().into_database();
let op_log = OperationLog::new(db.clone());
let mut dbtx = db.begin_transaction().await;
op_log
.add_operation_log_entry(&mut dbtx.to_ref_nc(), op_id, "foo", "bar")
.await;
dbtx.commit_tx().await;
let op = op_log.get_operation(op_id).await.expect("op exists");
let updates = vec!["bar".to_owned(), "bob".to_owned(), "baz".to_owned()];
let update_stream = op
.outcome_or_updates::<String, _>(&db, op_id, || futures::stream::iter(updates.clone()));
let received_updates = update_stream.into_stream().collect::<Vec<_>>().await;
assert_eq!(received_updates, updates);
let op_updated = op_log.get_operation(op_id).await.expect("op exists");
assert_eq!(op_updated.outcome::<String>(), Some("baz".to_string()));
assert!(
op_updated.outcome_time().is_some(),
"outcome_time should be set after stream completion"
);
}
#[tokio::test]
async fn test_pagination() {
fn assert_page_entries(
page: Vec<(ChronologicalOperationLogKey, OperationLogEntry)>,
page_idx: u8,
) {
for (entry_idx, (_key, entry)) in page.into_iter().enumerate() {
let actual_meta = entry.meta::<u8>();
let expected_meta = 97 - (page_idx * 10 + entry_idx as u8);
assert_eq!(actual_meta, expected_meta);
}
}
let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
let op_log = OperationLog::new(db.clone());
for operation_idx in 0u8..98 {
let mut dbtx = db.begin_transaction().await;
op_log
.add_operation_log_entry(
&mut dbtx.to_ref_nc(),
OperationId([operation_idx; 32]),
"foo",
operation_idx,
)
.await;
dbtx.commit_tx().await;
}
let mut previous_last_element = None;
for page_idx in 0u8..9 {
let page = op_log
.paginate_operations_rev(10, previous_last_element)
.await;
assert_eq!(page.len(), 10);
previous_last_element = Some(page[9].0);
assert_page_entries(page, page_idx);
}
let page = op_log
.paginate_operations_rev(10, previous_last_element)
.await;
assert_eq!(page.len(), 8);
assert_page_entries(page, 9);
}
#[tokio::test]
async fn test_pagination_empty() {
let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
let op_log = OperationLog::new(db.clone());
let page = op_log.paginate_operations_rev(10, None).await;
assert!(page.is_empty());
}
#[tokio::test]
async fn test_pagination_multiple_operations_same_time() {
async fn insert_oplog(dbtx: &mut DatabaseTransaction<'_>, idx: u8, time: u64) {
let operation_id = OperationId([idx; 32]);
let creation_time = SystemTime::UNIX_EPOCH
+ Duration::from_secs(60 * 60 * 24 * 365 * 40)
+ Duration::from_secs(time * 60 * 60 * 24);
dbtx.insert_new_entry(
&OperationLogKey { operation_id },
&OperationLogEntry {
operation_module_kind: "operation_type".to_string(),
meta: JsonStringed(serde_json::Value::Null),
outcome: None,
},
)
.await;
dbtx.insert_new_entry(
&ChronologicalOperationLogKey {
creation_time,
operation_id,
},
&(),
)
.await;
}
async fn assert_pages(operation_log: &OperationLog, pages: Vec<Vec<u8>>) {
let mut previous_last_element: Option<ChronologicalOperationLogKey> = None;
for reference_page in pages {
let page = operation_log
.paginate_operations_rev(10, previous_last_element)
.await;
assert_eq!(page.len(), reference_page.len());
assert_eq!(
page.iter()
.map(|(operation_log_key, _)| operation_log_key.operation_id)
.collect::<Vec<_>>(),
reference_page
.iter()
.map(|&x| OperationId([x; 32]))
.collect::<Vec<_>>()
);
previous_last_element = page.last().map(|(key, _)| key).copied();
}
}
let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
let op_log = OperationLog::new(db.clone());
let mut dbtx = db.begin_transaction().await;
for operation_idx in 0u8..10 {
insert_oplog(&mut dbtx.to_ref_nc(), operation_idx, 1).await;
}
dbtx.commit_tx().await;
assert_pages(&op_log, vec![vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0], vec![]]).await;
let mut dbtx = db.begin_transaction().await;
for operation_idx in 10u8..16 {
insert_oplog(&mut dbtx.to_ref_nc(), operation_idx, 2).await;
}
for operation_idx in 16u8..22 {
insert_oplog(&mut dbtx.to_ref_nc(), operation_idx, 3).await;
}
dbtx.commit_tx().await;
assert_pages(
&op_log,
vec![
vec![21, 20, 19, 18, 17, 16, 15, 14, 13, 12],
vec![11, 10, 9, 8, 7, 6, 5, 4, 3, 2],
vec![1, 0],
vec![],
],
)
.await;
let mut dbtx = db.begin_transaction().await;
for operation_idx in 22u8..31 {
insert_oplog(
&mut dbtx.to_ref_nc(),
operation_idx,
10 * u64::from(operation_idx),
)
.await;
}
dbtx.commit_tx().await;
assert_pages(
&op_log,
vec![
vec![30, 29, 28, 27, 26, 25, 24, 23, 22, 21],
vec![20, 19, 18, 17, 16, 15, 14, 13, 12, 11],
vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
vec![0],
vec![],
],
)
.await;
}
#[tokio::test]
async fn test_pagination_empty_then_not() {
let db = Database::new(MemDatabase::new(), ModuleRegistry::default());
let op_log = OperationLog::new(db.clone());
let page = op_log.paginate_operations_rev(10, None).await;
assert!(page.is_empty());
let mut dbtx = db.begin_transaction().await;
op_log
.add_operation_log_entry(&mut dbtx.to_ref_nc(), OperationId([0; 32]), "foo", "bar")
.await;
dbtx.commit_tx().await;
let page = op_log.paginate_operations_rev(10, None).await;
assert_eq!(page.len(), 1);
}
}