fedimint_client/
meta.rs

1use std::collections::BTreeMap;
2use std::pin::pin;
3use std::sync::Arc;
4
5use anyhow::Context as _;
6use async_stream::stream;
7use fedimint_client_module::meta::{FetchKind, MetaSource, MetaValue, MetaValues};
8use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
9use fedimint_core::task::waiter::Waiter;
10use fedimint_core::util::FmtCompactAnyhow as _;
11use fedimint_logging::LOG_CLIENT;
12use futures::StreamExt as _;
13use serde::de::DeserializeOwned;
14use tokio::sync::Notify;
15use tokio_stream::Stream;
16use tracing::{instrument, warn};
17
18use crate::Client;
19use crate::db::{
20    MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey,
21};
22
23/// Service for managing the caching of meta fields.
24// a fancy DST to save one allocation.
25pub struct MetaService<S: ?Sized = dyn MetaSource> {
26    initial_fetch_waiter: Waiter,
27    meta_update_notify: Notify,
28    source: S,
29}
30
31pub type MetaEntries = BTreeMap<String, String>;
32
33impl<S: MetaSource + ?Sized> MetaService<S> {
34    pub fn new(source: S) -> Arc<MetaService>
35    where
36        S: Sized,
37    {
38        // implicit cast `Arc<MetaService<S>>` to `Arc<MetaService<dyn MetaSource>>`
39        Arc::new(MetaService {
40            initial_fetch_waiter: Waiter::new(),
41            meta_update_notify: Notify::new(),
42            source,
43        })
44    }
45
46    /// Get the value for the meta field.
47    ///
48    /// This may wait for significant time on first run.
49    pub async fn get_field<V: DeserializeOwned + 'static>(
50        &self,
51        db: &Database,
52        field: &str,
53    ) -> Option<MetaValue<V>> {
54        match self.get_field_from_db(db, field).await {
55            Some(value) => {
56                // might be from in old cache.
57                // TODO: maybe old cache should have a ttl?
58                Some(value)
59            }
60            _ => {
61                // wait for initial value
62                self.initial_fetch_waiter.wait().await;
63                self.get_field_from_db(db, field).await
64            }
65        }
66    }
67
68    async fn get_field_from_db<V: DeserializeOwned + 'static>(
69        &self,
70        db: &Database,
71        field: &str,
72    ) -> Option<MetaValue<V>> {
73        let dbtx = &mut db.begin_transaction_nc().await;
74        let info = dbtx.get_value(&MetaServiceInfoKey).await?;
75        let value = dbtx
76            .get_value(&MetaFieldKey(fedimint_client_module::meta::MetaFieldKey(
77                field.to_string(),
78            )))
79            .await
80            .and_then(|value| parse_meta_value_static::<V>(&value.0.0).ok());
81        Some(MetaValue {
82            fetch_time: info.last_updated,
83            value,
84        })
85    }
86
87    /// Get all meta entries.
88    ///
89    /// This may wait for significant time on first run when there is no cached
90    /// data.
91    pub async fn entries(&self, db: &Database) -> Option<MetaEntries> {
92        if let Some(value) = self.entries_from_db(db).await {
93            // might be from in old cache.
94            // TODO: maybe old cache should have a ttl?
95            Some(value)
96        } else {
97            // wait for initial value
98            self.wait_initialization().await;
99            self.entries_from_db(db).await
100        }
101    }
102
103    async fn entries_from_db(&self, db: &Database) -> Option<MetaEntries> {
104        let dbtx = &mut db.begin_transaction_nc().await;
105        let info = dbtx.get_value(&MetaServiceInfoKey).await;
106        #[allow(clippy::question_mark)] // more readable
107        if info.is_none() {
108            return None;
109        }
110        let entries: MetaEntries = dbtx
111            .find_by_prefix(&MetaFieldPrefix)
112            .await
113            .map(|(k, v)| (k.0.0, v.0.0))
114            .collect()
115            .await;
116        Some(entries)
117    }
118
119    async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> {
120        dbtx.get_value(&MetaServiceInfoKey)
121            .await
122            .map(|x| x.revision)
123    }
124
125    /// Wait until Meta Service is initialized, after this `get_field` will not
126    /// block.
127    pub async fn wait_initialization(&self) {
128        self.initial_fetch_waiter.wait().await;
129    }
130
131    /// NOTE: this subscription never ends even after update task is shutdown.
132    /// You should consume this stream in a spawn_cancellable.
133    pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ {
134        stream! {
135            let mut notify = pin!(self.meta_update_notify.notified());
136            loop {
137                notify.as_mut().await;
138                notify.set(self.meta_update_notify.notified());
139                // enable waiting for next notification before yield so don't miss
140                // any notifications.
141                notify.as_mut().enable();
142                yield ();
143            }
144        }
145    }
146
147    /// NOTE: this subscription never ends even after update task is shutdown.
148    /// You should consume this stream in a spawn_cancellable.
149    ///
150    /// Stream will yield the first element immediately without blocking.
151    /// The first element will be initial value of the field.
152    ///
153    /// This may yield an outdated initial value if you didn't call
154    /// [`Self::wait_initialization`].
155    pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>(
156        &'a self,
157        db: &'a Database,
158        name: &'a str,
159    ) -> impl Stream<Item = Option<MetaValue<V>>> + 'a {
160        stream! {
161            let mut update_stream = pin!(self.subscribe_to_updates());
162            loop {
163                let value = self.get_field_from_db(db, name).await;
164                yield value;
165                if update_stream.next().await.is_none() {
166                    break;
167                }
168            }
169        }
170    }
171
172    /// Update all source in background.
173    ///
174    /// Caller should run this method in a task.
175    pub(crate) async fn update_continuously(&self, client: &Client) -> ! {
176        let mut current_revision = self
177            .current_revision(&mut client.db().begin_transaction_nc().await)
178            .await;
179        let client_config = client.config().await;
180        let meta_values = self
181            .source
182            .fetch(
183                &client_config,
184                &client.api,
185                FetchKind::Initial,
186                current_revision,
187            )
188            .await;
189        let failed_initial = meta_values.is_err();
190        match meta_values {
191            Ok(meta_values) => self.save_meta_values(client, &meta_values).await,
192            Err(error) => {
193                warn!(target: LOG_CLIENT, err = %error.fmt_compact_anyhow(), "failed to fetch source");
194            }
195        };
196        self.initial_fetch_waiter.done();
197
198        // don't wait if we failed first item
199        if !failed_initial {
200            self.source.wait_for_update().await;
201        }
202
203        // now keep updating slowly
204        loop {
205            if let Ok(meta_values) = self
206                .source
207                .fetch(
208                    &client_config,
209                    &client.api,
210                    FetchKind::Background,
211                    current_revision,
212                )
213                .await
214            {
215                current_revision = Some(meta_values.revision);
216                self.save_meta_values(client, &meta_values).await;
217            }
218            self.source.wait_for_update().await;
219        }
220    }
221
222    async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) {
223        let mut dbtx = client.db().begin_transaction().await;
224        dbtx.remove_by_prefix(&MetaFieldPrefix).await;
225        dbtx.insert_entry(
226            &MetaServiceInfoKey,
227            &MetaServiceInfo {
228                last_updated: fedimint_core::time::now(),
229                revision: meta_values.revision,
230            },
231        )
232        .await;
233        for (key, value) in &meta_values.values {
234            dbtx.insert_entry(&MetaFieldKey(key.clone()), &MetaFieldValue(value.clone()))
235                .await;
236        }
237        dbtx.commit_tx().await;
238        // notify everyone about changes
239        self.meta_update_notify.notify_waiters();
240    }
241}
242
243/// Tries to parse `str_value` as JSON. In the special case that `V` is `String`
244/// we return the raw `str_value` if JSON parsing fails. This necessary since
245/// the spec wasn't clear enough in the beginning.
246#[instrument(target = LOG_CLIENT, err)] // log on every failure
247pub fn parse_meta_value_static<V: DeserializeOwned + 'static>(
248    str_value: &str,
249) -> anyhow::Result<V> {
250    let res = serde_json::from_str(str_value)
251        .with_context(|| format!("Decoding meta field value '{str_value}' failed"));
252
253    // In the past we encoded some string fields as "just a string" without quotes,
254    // this code ensures that old meta values still parse since config is hard to
255    // change
256    if res.is_err() && std::any::TypeId::of::<V>() == std::any::TypeId::of::<String>() {
257        let string_ret = Box::new(str_value.to_owned());
258        let ret: Box<V> = unsafe {
259            // We can transmute a String to V because we know that V==String
260            std::mem::transmute(string_ret)
261        };
262        Ok(*ret)
263    } else {
264        res
265    }
266}