fedimint_client/
meta.rs

1use std::collections::BTreeMap;
2use std::pin::pin;
3use std::sync::Arc;
4
5use async_stream::stream;
6use fedimint_client_module::meta::{FetchKind, MetaSource, MetaValue, MetaValues};
7use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
8use fedimint_core::task::waiter::Waiter;
9use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
10use fedimint_logging::LOG_CLIENT;
11use futures::StreamExt as _;
12use serde::de::DeserializeOwned;
13use tokio::sync::Notify;
14use tokio_stream::Stream;
15use tracing::warn;
16
17use crate::Client;
18use crate::db::{
19    MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey,
20};
21
22/// Service for managing the caching of meta fields.
23// a fancy DST to save one allocation.
24pub struct MetaService<S: ?Sized = dyn MetaSource> {
25    initial_fetch_waiter: Waiter,
26    meta_update_notify: Notify,
27    source: S,
28}
29
30pub type MetaEntries = BTreeMap<String, serde_json::Value>;
31
32impl<S: MetaSource + ?Sized> MetaService<S> {
33    pub fn new(source: S) -> Arc<MetaService>
34    where
35        S: Sized,
36    {
37        // implicit cast `Arc<MetaService<S>>` to `Arc<MetaService<dyn MetaSource>>`
38        Arc::new(MetaService {
39            initial_fetch_waiter: Waiter::new(),
40            meta_update_notify: Notify::new(),
41            source,
42        })
43    }
44
45    /// Get the value for the meta field.
46    ///
47    /// This may wait for significant time on first run.
48    pub async fn get_field<V: DeserializeOwned + 'static>(
49        &self,
50        db: &Database,
51        field: &str,
52    ) -> Option<MetaValue<V>> {
53        match self.get_field_from_db(db, field).await {
54            Some(value) => {
55                // might be from in old cache.
56                // TODO: maybe old cache should have a ttl?
57                Some(value)
58            }
59            _ => {
60                // wait for initial value
61                self.initial_fetch_waiter.wait().await;
62                self.get_field_from_db(db, field).await
63            }
64        }
65    }
66
67    async fn get_field_from_db<V: DeserializeOwned + 'static>(
68        &self,
69        db: &Database,
70        field: &str,
71    ) -> Option<MetaValue<V>> {
72        let dbtx = &mut db.begin_transaction_nc().await;
73        let info = dbtx.get_value(&MetaServiceInfoKey).await?;
74        let value = dbtx
75            .get_value(&MetaFieldKey(fedimint_client_module::meta::MetaFieldKey(
76                field.to_string(),
77            )))
78            .await
79            .and_then(|value| {
80                serde_json::from_value(value.0.0)
81                    .inspect_err(|err| {
82                        warn!(target: LOG_CLIENT,
83                        %field,
84                        type = std::any::type_name::<V>(),
85                        err = %err.fmt_compact(),
86                        "Failed to deserialize meta value as the requested type")
87                    })
88                    .ok()
89            });
90
91        Some(MetaValue {
92            fetch_time: info.last_updated,
93            value,
94        })
95    }
96
97    /// Get all meta entries.
98    ///
99    /// This may wait for significant time on first run when there is no cached
100    /// data.
101    pub async fn entries(&self, db: &Database) -> Option<MetaEntries> {
102        if let Some(value) = self.entries_from_db(db).await {
103            // might be from in old cache.
104            // TODO: maybe old cache should have a ttl?
105            Some(value)
106        } else {
107            // wait for initial value
108            self.wait_initialization().await;
109            self.entries_from_db(db).await
110        }
111    }
112
113    async fn entries_from_db(&self, db: &Database) -> Option<MetaEntries> {
114        let dbtx = &mut db.begin_transaction_nc().await;
115        let info = dbtx.get_value(&MetaServiceInfoKey).await;
116        #[allow(clippy::question_mark)] // more readable
117        if info.is_none() {
118            return None;
119        }
120        let entries: MetaEntries = dbtx
121            .find_by_prefix(&MetaFieldPrefix)
122            .await
123            .map(|(k, v)| (k.0.0, v.0.0))
124            .collect()
125            .await;
126        Some(entries)
127    }
128
129    async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> {
130        dbtx.get_value(&MetaServiceInfoKey)
131            .await
132            .map(|x| x.revision)
133    }
134
135    /// Wait until Meta Service is initialized, after this `get_field` will not
136    /// block.
137    pub async fn wait_initialization(&self) {
138        self.initial_fetch_waiter.wait().await;
139    }
140
141    /// NOTE: this subscription never ends even after update task is shutdown.
142    /// You should consume this stream in a spawn_cancellable.
143    pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ {
144        stream! {
145            let mut notify = pin!(self.meta_update_notify.notified());
146            loop {
147                notify.as_mut().await;
148                notify.set(self.meta_update_notify.notified());
149                // enable waiting for next notification before yield so don't miss
150                // any notifications.
151                notify.as_mut().enable();
152                yield ();
153            }
154        }
155    }
156
157    /// NOTE: this subscription never ends even after update task is shutdown.
158    /// You should consume this stream in a spawn_cancellable.
159    ///
160    /// Stream will yield the first element immediately without blocking.
161    /// The first element will be initial value of the field.
162    ///
163    /// This may yield an outdated initial value if you didn't call
164    /// [`Self::wait_initialization`].
165    pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>(
166        &'a self,
167        db: &'a Database,
168        name: &'a str,
169    ) -> impl Stream<Item = Option<MetaValue<V>>> + 'a {
170        stream! {
171            let mut update_stream = pin!(self.subscribe_to_updates());
172            loop {
173                let value = self.get_field_from_db(db, name).await;
174                yield value;
175                if update_stream.next().await.is_none() {
176                    break;
177                }
178            }
179        }
180    }
181
182    /// Update all source in background.
183    ///
184    /// Caller should run this method in a task.
185    pub(crate) async fn update_continuously(&self, client: &Client) -> ! {
186        let mut current_revision = self
187            .current_revision(&mut client.db().begin_transaction_nc().await)
188            .await;
189        let client_config = client.config().await;
190        let meta_values = self
191            .source
192            .fetch(
193                &client_config,
194                &client.api,
195                FetchKind::Initial,
196                current_revision,
197            )
198            .await;
199        let failed_initial = meta_values.is_err();
200        match meta_values {
201            Ok(meta_values) => self.save_meta_values(client, &meta_values).await,
202            Err(error) => {
203                warn!(target: LOG_CLIENT, err = %error.fmt_compact_anyhow(), "failed to fetch source");
204            }
205        };
206        self.initial_fetch_waiter.done();
207
208        // don't wait if we failed first item
209        if !failed_initial {
210            self.source.wait_for_update().await;
211        }
212
213        // now keep updating slowly
214        loop {
215            if let Ok(meta_values) = self
216                .source
217                .fetch(
218                    &client_config,
219                    &client.api,
220                    FetchKind::Background,
221                    current_revision,
222                )
223                .await
224            {
225                current_revision = Some(meta_values.revision);
226                self.save_meta_values(client, &meta_values).await;
227            }
228            self.source.wait_for_update().await;
229        }
230    }
231
232    async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) {
233        let mut dbtx = client.db().begin_transaction().await;
234        dbtx.remove_by_prefix(&MetaFieldPrefix).await;
235        dbtx.insert_entry(
236            &MetaServiceInfoKey,
237            &MetaServiceInfo {
238                last_updated: fedimint_core::time::now(),
239                revision: meta_values.revision,
240            },
241        )
242        .await;
243        for (key, value) in &meta_values.values {
244            dbtx.insert_entry(&MetaFieldKey(key.clone()), &MetaFieldValue(value.clone()))
245                .await;
246        }
247        dbtx.commit_tx().await;
248        // notify everyone about changes
249        self.meta_update_notify.notify_waiters();
250    }
251}