fedimint_client/
meta.rs
1use std::collections::BTreeMap;
2use std::pin::pin;
3use std::sync::Arc;
4
5use anyhow::Context as _;
6use async_stream::stream;
7use fedimint_client_module::meta::{FetchKind, MetaSource, MetaValue, MetaValues};
8use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
9use fedimint_core::task::waiter::Waiter;
10use fedimint_core::util::FmtCompactAnyhow as _;
11use fedimint_logging::LOG_CLIENT;
12use futures::StreamExt as _;
13use serde::de::DeserializeOwned;
14use tokio::sync::Notify;
15use tokio_stream::Stream;
16use tracing::{instrument, warn};
17
18use crate::Client;
19use crate::db::{
20 MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey,
21};
22
23pub struct MetaService<S: ?Sized = dyn MetaSource> {
26 initial_fetch_waiter: Waiter,
27 meta_update_notify: Notify,
28 source: S,
29}
30
31pub type MetaEntries = BTreeMap<String, String>;
32
33impl<S: MetaSource + ?Sized> MetaService<S> {
34 pub fn new(source: S) -> Arc<MetaService>
35 where
36 S: Sized,
37 {
38 Arc::new(MetaService {
40 initial_fetch_waiter: Waiter::new(),
41 meta_update_notify: Notify::new(),
42 source,
43 })
44 }
45
46 pub async fn get_field<V: DeserializeOwned + 'static>(
50 &self,
51 db: &Database,
52 field: &str,
53 ) -> Option<MetaValue<V>> {
54 match self.get_field_from_db(db, field).await {
55 Some(value) => {
56 Some(value)
59 }
60 _ => {
61 self.initial_fetch_waiter.wait().await;
63 self.get_field_from_db(db, field).await
64 }
65 }
66 }
67
68 async fn get_field_from_db<V: DeserializeOwned + 'static>(
69 &self,
70 db: &Database,
71 field: &str,
72 ) -> Option<MetaValue<V>> {
73 let dbtx = &mut db.begin_transaction_nc().await;
74 let info = dbtx.get_value(&MetaServiceInfoKey).await?;
75 let value = dbtx
76 .get_value(&MetaFieldKey(fedimint_client_module::meta::MetaFieldKey(
77 field.to_string(),
78 )))
79 .await
80 .and_then(|value| parse_meta_value_static::<V>(&value.0.0).ok());
81 Some(MetaValue {
82 fetch_time: info.last_updated,
83 value,
84 })
85 }
86
87 pub async fn entries(&self, db: &Database) -> Option<MetaEntries> {
92 if let Some(value) = self.entries_from_db(db).await {
93 Some(value)
96 } else {
97 self.wait_initialization().await;
99 self.entries_from_db(db).await
100 }
101 }
102
103 async fn entries_from_db(&self, db: &Database) -> Option<MetaEntries> {
104 let dbtx = &mut db.begin_transaction_nc().await;
105 let info = dbtx.get_value(&MetaServiceInfoKey).await;
106 #[allow(clippy::question_mark)] if info.is_none() {
108 return None;
109 }
110 let entries: MetaEntries = dbtx
111 .find_by_prefix(&MetaFieldPrefix)
112 .await
113 .map(|(k, v)| (k.0.0, v.0.0))
114 .collect()
115 .await;
116 Some(entries)
117 }
118
119 async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> {
120 dbtx.get_value(&MetaServiceInfoKey)
121 .await
122 .map(|x| x.revision)
123 }
124
125 pub async fn wait_initialization(&self) {
128 self.initial_fetch_waiter.wait().await;
129 }
130
131 pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ {
134 stream! {
135 let mut notify = pin!(self.meta_update_notify.notified());
136 loop {
137 notify.as_mut().await;
138 notify.set(self.meta_update_notify.notified());
139 notify.as_mut().enable();
142 yield ();
143 }
144 }
145 }
146
147 pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>(
156 &'a self,
157 db: &'a Database,
158 name: &'a str,
159 ) -> impl Stream<Item = Option<MetaValue<V>>> + 'a {
160 stream! {
161 let mut update_stream = pin!(self.subscribe_to_updates());
162 loop {
163 let value = self.get_field_from_db(db, name).await;
164 yield value;
165 if update_stream.next().await.is_none() {
166 break;
167 }
168 }
169 }
170 }
171
172 pub(crate) async fn update_continuously(&self, client: &Client) -> ! {
176 let mut current_revision = self
177 .current_revision(&mut client.db().begin_transaction_nc().await)
178 .await;
179 let client_config = client.config().await;
180 let meta_values = self
181 .source
182 .fetch(
183 &client_config,
184 &client.api,
185 FetchKind::Initial,
186 current_revision,
187 )
188 .await;
189 let failed_initial = meta_values.is_err();
190 match meta_values {
191 Ok(meta_values) => self.save_meta_values(client, &meta_values).await,
192 Err(error) => {
193 warn!(target: LOG_CLIENT, err = %error.fmt_compact_anyhow(), "failed to fetch source");
194 }
195 };
196 self.initial_fetch_waiter.done();
197
198 if !failed_initial {
200 self.source.wait_for_update().await;
201 }
202
203 loop {
205 if let Ok(meta_values) = self
206 .source
207 .fetch(
208 &client_config,
209 &client.api,
210 FetchKind::Background,
211 current_revision,
212 )
213 .await
214 {
215 current_revision = Some(meta_values.revision);
216 self.save_meta_values(client, &meta_values).await;
217 }
218 self.source.wait_for_update().await;
219 }
220 }
221
222 async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) {
223 let mut dbtx = client.db().begin_transaction().await;
224 dbtx.remove_by_prefix(&MetaFieldPrefix).await;
225 dbtx.insert_entry(
226 &MetaServiceInfoKey,
227 &MetaServiceInfo {
228 last_updated: fedimint_core::time::now(),
229 revision: meta_values.revision,
230 },
231 )
232 .await;
233 for (key, value) in &meta_values.values {
234 dbtx.insert_entry(&MetaFieldKey(key.clone()), &MetaFieldValue(value.clone()))
235 .await;
236 }
237 dbtx.commit_tx().await;
238 self.meta_update_notify.notify_waiters();
240 }
241}
242
243#[instrument(target = LOG_CLIENT, err)] pub fn parse_meta_value_static<V: DeserializeOwned + 'static>(
248 str_value: &str,
249) -> anyhow::Result<V> {
250 let res = serde_json::from_str(str_value)
251 .with_context(|| format!("Decoding meta field value '{str_value}' failed"));
252
253 if res.is_err() && std::any::TypeId::of::<V>() == std::any::TypeId::of::<String>() {
257 let string_ret = Box::new(str_value.to_owned());
258 let ret: Box<V> = unsafe {
259 std::mem::transmute(string_ret)
261 };
262 Ok(*ret)
263 } else {
264 res
265 }
266}