1use std::pin::pin;
2use std::sync::Arc;
3
4use anyhow::Context as _;
5use async_stream::stream;
6use fedimint_client_module::meta::{FetchKind, MetaSource, MetaValue, MetaValues};
7use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
8use fedimint_core::task::waiter::Waiter;
9use fedimint_core::util::FmtCompactAnyhow as _;
10use fedimint_logging::LOG_CLIENT;
11use serde::de::DeserializeOwned;
12use tokio::sync::Notify;
13use tokio_stream::{Stream, StreamExt as _};
14use tracing::{instrument, warn};
15
16use crate::Client;
17use crate::db::{
18 MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey,
19};
20
21pub struct MetaService<S: ?Sized = dyn MetaSource> {
24 initial_fetch_waiter: Waiter,
25 meta_update_notify: Notify,
26 source: S,
27}
28
29impl<S: MetaSource + ?Sized> MetaService<S> {
30 pub fn new(source: S) -> Arc<MetaService>
31 where
32 S: Sized,
33 {
34 Arc::new(MetaService {
36 initial_fetch_waiter: Waiter::new(),
37 meta_update_notify: Notify::new(),
38 source,
39 })
40 }
41
42 pub async fn get_field<V: DeserializeOwned + 'static>(
46 &self,
47 db: &Database,
48 field: &str,
49 ) -> Option<MetaValue<V>> {
50 match self.get_field_from_db(db, field).await {
51 Some(value) => {
52 Some(value)
55 }
56 _ => {
57 self.initial_fetch_waiter.wait().await;
59 self.get_field_from_db(db, field).await
60 }
61 }
62 }
63
64 async fn get_field_from_db<V: DeserializeOwned + 'static>(
65 &self,
66 db: &Database,
67 field: &str,
68 ) -> Option<MetaValue<V>> {
69 let dbtx = &mut db.begin_transaction_nc().await;
70 let info = dbtx.get_value(&MetaServiceInfoKey).await?;
71 let value = dbtx
72 .get_value(&MetaFieldKey(fedimint_client_module::meta::MetaFieldKey(
73 field.to_string(),
74 )))
75 .await
76 .and_then(|value| parse_meta_value_static::<V>(&value.0.0).ok());
77 Some(MetaValue {
78 fetch_time: info.last_updated,
79 value,
80 })
81 }
82
83 async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> {
84 dbtx.get_value(&MetaServiceInfoKey)
85 .await
86 .map(|x| x.revision)
87 }
88
89 pub async fn wait_initialization(&self) {
92 self.initial_fetch_waiter.wait().await;
93 }
94
95 pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ {
98 stream! {
99 let mut notify = pin!(self.meta_update_notify.notified());
100 loop {
101 notify.as_mut().await;
102 notify.set(self.meta_update_notify.notified());
103 notify.as_mut().enable();
106 yield ();
107 }
108 }
109 }
110
111 pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>(
120 &'a self,
121 db: &'a Database,
122 name: &'a str,
123 ) -> impl Stream<Item = Option<MetaValue<V>>> + 'a {
124 stream! {
125 let mut update_stream = pin!(self.subscribe_to_updates());
126 loop {
127 let value = self.get_field_from_db(db, name).await;
128 yield value;
129 if update_stream.next().await.is_none() {
130 break;
131 }
132 }
133 }
134 }
135
136 pub(crate) async fn update_continuously(&self, client: &Client) -> ! {
140 let mut current_revision = self
141 .current_revision(&mut client.db().begin_transaction_nc().await)
142 .await;
143 let client_config = client.config().await;
144 let meta_values = self
145 .source
146 .fetch(
147 &client_config,
148 &client.api,
149 FetchKind::Initial,
150 current_revision,
151 )
152 .await;
153 let failed_initial = meta_values.is_err();
154 match meta_values {
155 Ok(meta_values) => self.save_meta_values(client, &meta_values).await,
156 Err(error) => {
157 warn!(target: LOG_CLIENT, err = %error.fmt_compact_anyhow(), "failed to fetch source");
158 }
159 };
160 self.initial_fetch_waiter.done();
161
162 if !failed_initial {
164 self.source.wait_for_update().await;
165 }
166
167 loop {
169 if let Ok(meta_values) = self
170 .source
171 .fetch(
172 &client_config,
173 &client.api,
174 FetchKind::Background,
175 current_revision,
176 )
177 .await
178 {
179 current_revision = Some(meta_values.revision);
180 self.save_meta_values(client, &meta_values).await;
181 }
182 self.source.wait_for_update().await;
183 }
184 }
185
186 async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) {
187 let mut dbtx = client.db().begin_transaction().await;
188 dbtx.remove_by_prefix(&MetaFieldPrefix).await;
189 dbtx.insert_entry(
190 &MetaServiceInfoKey,
191 &MetaServiceInfo {
192 last_updated: fedimint_core::time::now(),
193 revision: meta_values.revision,
194 },
195 )
196 .await;
197 for (key, value) in &meta_values.values {
198 dbtx.insert_entry(&MetaFieldKey(key.clone()), &MetaFieldValue(value.clone()))
199 .await;
200 }
201 dbtx.commit_tx().await;
202 self.meta_update_notify.notify_waiters();
204 }
205}
206
207#[instrument(target = LOG_CLIENT, err)] pub fn parse_meta_value_static<V: DeserializeOwned + 'static>(
212 str_value: &str,
213) -> anyhow::Result<V> {
214 let res = serde_json::from_str(str_value)
215 .with_context(|| format!("Decoding meta field value '{str_value}' failed"));
216
217 if res.is_err() && std::any::TypeId::of::<V>() == std::any::TypeId::of::<String>() {
221 let string_ret = Box::new(str_value.to_owned());
222 let ret: Box<V> = unsafe {
223 std::mem::transmute(string_ret)
225 };
226 Ok(*ret)
227 } else {
228 res
229 }
230}