use std::collections::BTreeMap;
use std::pin::pin;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use anyhow::{bail, Context as _};
use async_stream::stream;
use fedimint_api_client::api::DynGlobalApi;
use fedimint_core::config::ClientConfig;
use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
use fedimint_core::task::waiter::Waiter;
use fedimint_core::task::{MaybeSend, MaybeSync};
use fedimint_core::util::{backoff_util, retry};
use fedimint_core::{apply, async_trait_maybe_send};
use fedimint_logging::LOG_CLIENT;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;
use tokio_stream::{Stream, StreamExt as _};
use tracing::{debug, instrument, warn};
use crate::db::{
MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey,
};
use crate::Client;
#[apply(async_trait_maybe_send!)]
pub trait MetaSource: MaybeSend + MaybeSync + 'static {
async fn wait_for_update(&self);
async fn fetch(
&self,
client_config: &ClientConfig,
api: &DynGlobalApi,
fetch_kind: FetchKind,
last_revision: Option<u64>,
) -> anyhow::Result<MetaValues>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FetchKind {
Initial,
Background,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct MetaValues {
pub values: BTreeMap<MetaFieldKey, MetaFieldValue>,
pub revision: u64,
}
#[derive(Debug, Clone, Copy)]
pub struct MetaValue<T> {
pub fetch_time: SystemTime,
pub value: Option<T>,
}
pub struct MetaService<S: ?Sized = dyn MetaSource> {
initial_fetch_waiter: Waiter,
meta_update_notify: Notify,
source: S,
}
impl<S: MetaSource + ?Sized> MetaService<S> {
pub fn new(source: S) -> Arc<MetaService>
where
S: Sized,
{
Arc::new(MetaService {
initial_fetch_waiter: Waiter::new(),
meta_update_notify: Notify::new(),
source,
})
}
pub async fn get_field<V: DeserializeOwned + 'static>(
&self,
db: &Database,
field: &str,
) -> Option<MetaValue<V>> {
if let Some(value) = self.get_field_from_db(db, field).await {
Some(value)
} else {
self.initial_fetch_waiter.wait().await;
self.get_field_from_db(db, field).await
}
}
async fn get_field_from_db<V: DeserializeOwned + 'static>(
&self,
db: &Database,
field: &str,
) -> Option<MetaValue<V>> {
let dbtx = &mut db.begin_transaction_nc().await;
let info = dbtx.get_value(&MetaServiceInfoKey).await?;
let value = dbtx
.get_value(&MetaFieldKey(field.to_string()))
.await
.and_then(|value| parse_meta_value_static::<V>(&value.0).ok());
Some(MetaValue {
fetch_time: info.last_updated,
value,
})
}
async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> {
dbtx.get_value(&MetaServiceInfoKey)
.await
.map(|x| x.revision)
}
pub async fn wait_initialization(&self) {
self.initial_fetch_waiter.wait().await;
}
pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ {
stream! {
let mut notify = pin!(self.meta_update_notify.notified());
loop {
notify.as_mut().await;
notify.set(self.meta_update_notify.notified());
notify.as_mut().enable();
yield ();
}
}
}
pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>(
&'a self,
db: &'a Database,
name: &'a str,
) -> impl Stream<Item = Option<MetaValue<V>>> + 'a {
stream! {
let mut update_stream = pin!(self.subscribe_to_updates());
loop {
let value = self.get_field_from_db(db, name).await;
yield value;
if update_stream.next().await.is_none() {
break;
}
}
}
}
pub(crate) async fn update_continuously(&self, client: &Client) -> ! {
let mut current_revision = self
.current_revision(&mut client.db().begin_transaction_nc().await)
.await;
let client_config = client.config().await;
let meta_values = self
.source
.fetch(
&client_config,
&client.api,
FetchKind::Initial,
current_revision,
)
.await;
let failed_initial = meta_values.is_err();
match meta_values {
Ok(meta_values) => self.save_meta_values(client, &meta_values).await,
Err(error) => warn!(target: LOG_CLIENT, %error, "failed to fetch source"),
};
self.initial_fetch_waiter.done();
if !failed_initial {
self.source.wait_for_update().await;
}
loop {
if let Ok(meta_values) = self
.source
.fetch(
&client_config,
&client.api,
FetchKind::Background,
current_revision,
)
.await
{
current_revision = Some(meta_values.revision);
self.save_meta_values(client, &meta_values).await;
}
self.source.wait_for_update().await;
}
}
async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) {
let mut dbtx = client.db().begin_transaction().await;
dbtx.remove_by_prefix(&MetaFieldPrefix).await;
dbtx.insert_entry(
&MetaServiceInfoKey,
&MetaServiceInfo {
last_updated: fedimint_core::time::now(),
revision: meta_values.revision,
},
)
.await;
for (key, value) in &meta_values.values {
dbtx.insert_entry(key, value).await;
}
dbtx.commit_tx().await;
self.meta_update_notify.notify_waiters();
}
}
#[derive(Clone, Debug, Default)]
#[non_exhaustive]
pub struct LegacyMetaSource {
reqwest: reqwest::Client,
}
#[apply(async_trait_maybe_send!)]
impl MetaSource for LegacyMetaSource {
async fn wait_for_update(&self) {
fedimint_core::runtime::sleep(Duration::from_secs(10 * 60)).await;
}
async fn fetch(
&self,
client_config: &ClientConfig,
_api: &DynGlobalApi,
fetch_kind: FetchKind,
last_revision: Option<u64>,
) -> anyhow::Result<MetaValues> {
let config_iter = client_config
.global
.meta
.iter()
.map(|(key, value)| (MetaFieldKey(key.clone()), MetaFieldValue(value.clone())));
let backoff = match fetch_kind {
FetchKind::Initial => backoff_util::aggressive_backoff(),
FetchKind::Background => backoff_util::background_backoff(),
};
let overrides = retry("fetch_meta_overrides", backoff, || {
fetch_meta_overrides(&self.reqwest, client_config, "meta_override_url")
})
.await?;
Ok(MetaValues {
values: config_iter.chain(overrides).collect(),
revision: last_revision.map_or(0, |r| r + 1),
})
}
}
pub async fn fetch_meta_overrides(
reqwest: &reqwest::Client,
client_config: &ClientConfig,
field_name: &str,
) -> anyhow::Result<BTreeMap<MetaFieldKey, MetaFieldValue>> {
let Some(url) = client_config.meta::<String>(field_name)? else {
return Ok(BTreeMap::new());
};
let response = reqwest
.get(&url)
.send()
.await
.context("Meta override source could not be fetched")?;
debug!("Meta override source returned status: {response:?}");
if response.status() != reqwest::StatusCode::OK {
bail!(
"Meta override request returned non-OK status code: {}",
response.status()
);
}
let mut federation_map = response
.json::<BTreeMap<String, BTreeMap<String, serde_json::Value>>>()
.await
.context("Meta override could not be parsed as JSON")?;
let federation_id = client_config.calculate_federation_id().to_string();
let meta_fields = federation_map
.remove(&federation_id)
.with_context(|| anyhow::format_err!("No entry for federation {federation_id} in {url}"))?
.into_iter()
.filter_map(|(key, value)| {
if let serde_json::Value::String(value_str) = value {
Some((MetaFieldKey(key), MetaFieldValue(value_str)))
} else {
warn!(target: LOG_CLIENT, "Meta override map contained non-string key: {key}, ignoring");
None
}
})
.collect::<BTreeMap<_, _>>();
Ok(meta_fields)
}
#[instrument(err)] pub fn parse_meta_value_static<V: DeserializeOwned + 'static>(
str_value: &str,
) -> anyhow::Result<V> {
let res = serde_json::from_str(str_value)
.with_context(|| format!("Decoding meta field value '{str_value}' failed"));
if res.is_err() && std::any::TypeId::of::<V>() == std::any::TypeId::of::<String>() {
let string_ret = Box::new(str_value.to_owned());
let ret: Box<V> = unsafe {
std::mem::transmute(string_ret)
};
Ok(*ret)
} else {
res
}
}