1use std::collections::BTreeMap;
2use std::pin::pin;
3use std::sync::Arc;
4
5use async_stream::stream;
6use fedimint_client_module::meta::{FetchKind, MetaSource, MetaValue, MetaValues};
7use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
8use fedimint_core::task::waiter::Waiter;
9use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
10use fedimint_logging::LOG_CLIENT;
11use futures::StreamExt as _;
12use serde::de::DeserializeOwned;
13use tokio::sync::Notify;
14use tokio_stream::Stream;
15use tracing::warn;
16
17use crate::Client;
18use crate::db::{
19 MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey,
20};
21
22pub struct MetaService<S: ?Sized = dyn MetaSource> {
25 initial_fetch_waiter: Waiter,
26 meta_update_notify: Notify,
27 source: S,
28}
29
30pub type MetaEntries = BTreeMap<String, serde_json::Value>;
31
32impl<S: MetaSource + ?Sized> MetaService<S> {
33 pub fn new(source: S) -> Arc<MetaService>
34 where
35 S: Sized,
36 {
37 Arc::new(MetaService {
39 initial_fetch_waiter: Waiter::new(),
40 meta_update_notify: Notify::new(),
41 source,
42 })
43 }
44
45 pub async fn get_field<V: DeserializeOwned + 'static>(
49 &self,
50 db: &Database,
51 field: &str,
52 ) -> Option<MetaValue<V>> {
53 match self.get_field_from_db(db, field).await {
54 Some(value) => {
55 Some(value)
58 }
59 _ => {
60 self.initial_fetch_waiter.wait().await;
62 self.get_field_from_db(db, field).await
63 }
64 }
65 }
66
67 async fn get_field_from_db<V: DeserializeOwned + 'static>(
68 &self,
69 db: &Database,
70 field: &str,
71 ) -> Option<MetaValue<V>> {
72 let dbtx = &mut db.begin_transaction_nc().await;
73 let info = dbtx.get_value(&MetaServiceInfoKey).await?;
74 let value = dbtx
75 .get_value(&MetaFieldKey(fedimint_client_module::meta::MetaFieldKey(
76 field.to_string(),
77 )))
78 .await
79 .and_then(|value| {
80 serde_json::from_value(value.0.0)
81 .inspect_err(|err| {
82 warn!(target: LOG_CLIENT,
83 %field,
84 type = std::any::type_name::<V>(),
85 err = %err.fmt_compact(),
86 "Failed to deserialize meta value as the requested type")
87 })
88 .ok()
89 });
90
91 Some(MetaValue {
92 fetch_time: info.last_updated,
93 value,
94 })
95 }
96
97 pub async fn entries(&self, db: &Database) -> Option<MetaEntries> {
102 if let Some(value) = self.entries_from_db(db).await {
103 Some(value)
106 } else {
107 self.wait_initialization().await;
109 self.entries_from_db(db).await
110 }
111 }
112
113 async fn entries_from_db(&self, db: &Database) -> Option<MetaEntries> {
114 let dbtx = &mut db.begin_transaction_nc().await;
115 let info = dbtx.get_value(&MetaServiceInfoKey).await;
116 #[allow(clippy::question_mark)] if info.is_none() {
118 return None;
119 }
120 let entries: MetaEntries = dbtx
121 .find_by_prefix(&MetaFieldPrefix)
122 .await
123 .map(|(k, v)| (k.0.0, v.0.0))
124 .collect()
125 .await;
126 Some(entries)
127 }
128
129 async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> {
130 dbtx.get_value(&MetaServiceInfoKey)
131 .await
132 .map(|x| x.revision)
133 }
134
135 pub async fn wait_initialization(&self) {
138 self.initial_fetch_waiter.wait().await;
139 }
140
141 pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ {
144 stream! {
145 let mut notify = pin!(self.meta_update_notify.notified());
146 loop {
147 notify.as_mut().await;
148 notify.set(self.meta_update_notify.notified());
149 notify.as_mut().enable();
152 yield ();
153 }
154 }
155 }
156
157 pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>(
166 &'a self,
167 db: &'a Database,
168 name: &'a str,
169 ) -> impl Stream<Item = Option<MetaValue<V>>> + 'a {
170 stream! {
171 let mut update_stream = pin!(self.subscribe_to_updates());
172 loop {
173 let value = self.get_field_from_db(db, name).await;
174 yield value;
175 if update_stream.next().await.is_none() {
176 break;
177 }
178 }
179 }
180 }
181
182 pub(crate) async fn update_continuously(&self, client: &Client) -> ! {
186 let mut current_revision = self
187 .current_revision(&mut client.db().begin_transaction_nc().await)
188 .await;
189 let client_config = client.config().await;
190 let meta_values = self
191 .source
192 .fetch(
193 &client_config,
194 &client.api,
195 FetchKind::Initial,
196 current_revision,
197 )
198 .await;
199 let failed_initial = meta_values.is_err();
200 match meta_values {
201 Ok(meta_values) => self.save_meta_values(client, &meta_values).await,
202 Err(error) => {
203 warn!(target: LOG_CLIENT, err = %error.fmt_compact_anyhow(), "failed to fetch source");
204 }
205 };
206 self.initial_fetch_waiter.done();
207
208 if !failed_initial {
210 self.source.wait_for_update().await;
211 }
212
213 loop {
215 if let Ok(meta_values) = self
216 .source
217 .fetch(
218 &client_config,
219 &client.api,
220 FetchKind::Background,
221 current_revision,
222 )
223 .await
224 {
225 current_revision = Some(meta_values.revision);
226 self.save_meta_values(client, &meta_values).await;
227 }
228 self.source.wait_for_update().await;
229 }
230 }
231
232 async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) {
233 let mut dbtx = client.db().begin_transaction().await;
234 dbtx.remove_by_prefix(&MetaFieldPrefix).await;
235 dbtx.insert_entry(
236 &MetaServiceInfoKey,
237 &MetaServiceInfo {
238 last_updated: fedimint_core::time::now(),
239 revision: meta_values.revision,
240 },
241 )
242 .await;
243 for (key, value) in &meta_values.values {
244 dbtx.insert_entry(&MetaFieldKey(key.clone()), &MetaFieldValue(value.clone()))
245 .await;
246 }
247 dbtx.commit_tx().await;
248 self.meta_update_notify.notify_waiters();
250 }
251}