fedimint_client/
meta.rs

1use std::pin::pin;
2use std::sync::Arc;
3
4use anyhow::Context as _;
5use async_stream::stream;
6use fedimint_client_module::meta::{FetchKind, MetaSource, MetaValue, MetaValues};
7use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
8use fedimint_core::task::waiter::Waiter;
9use fedimint_core::util::FmtCompactAnyhow as _;
10use fedimint_logging::LOG_CLIENT;
11use serde::de::DeserializeOwned;
12use tokio::sync::Notify;
13use tokio_stream::{Stream, StreamExt as _};
14use tracing::{instrument, warn};
15
16use crate::Client;
17use crate::db::{
18    MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey,
19};
20
21/// Service for managing the caching of meta fields.
22// a fancy DST to save one allocation.
23pub struct MetaService<S: ?Sized = dyn MetaSource> {
24    initial_fetch_waiter: Waiter,
25    meta_update_notify: Notify,
26    source: S,
27}
28
29impl<S: MetaSource + ?Sized> MetaService<S> {
30    pub fn new(source: S) -> Arc<MetaService>
31    where
32        S: Sized,
33    {
34        // implicit cast `Arc<MetaService<S>>` to `Arc<MetaService<dyn MetaSource>>`
35        Arc::new(MetaService {
36            initial_fetch_waiter: Waiter::new(),
37            meta_update_notify: Notify::new(),
38            source,
39        })
40    }
41
42    /// Get the value for the meta field.
43    ///
44    /// This may wait for significant time on first run.
45    pub async fn get_field<V: DeserializeOwned + 'static>(
46        &self,
47        db: &Database,
48        field: &str,
49    ) -> Option<MetaValue<V>> {
50        match self.get_field_from_db(db, field).await {
51            Some(value) => {
52                // might be from in old cache.
53                // TODO: maybe old cache should have a ttl?
54                Some(value)
55            }
56            _ => {
57                // wait for initial value
58                self.initial_fetch_waiter.wait().await;
59                self.get_field_from_db(db, field).await
60            }
61        }
62    }
63
64    async fn get_field_from_db<V: DeserializeOwned + 'static>(
65        &self,
66        db: &Database,
67        field: &str,
68    ) -> Option<MetaValue<V>> {
69        let dbtx = &mut db.begin_transaction_nc().await;
70        let info = dbtx.get_value(&MetaServiceInfoKey).await?;
71        let value = dbtx
72            .get_value(&MetaFieldKey(fedimint_client_module::meta::MetaFieldKey(
73                field.to_string(),
74            )))
75            .await
76            .and_then(|value| parse_meta_value_static::<V>(&value.0.0).ok());
77        Some(MetaValue {
78            fetch_time: info.last_updated,
79            value,
80        })
81    }
82
83    async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> {
84        dbtx.get_value(&MetaServiceInfoKey)
85            .await
86            .map(|x| x.revision)
87    }
88
89    /// Wait until Meta Service is initialized, after this `get_field` will not
90    /// block.
91    pub async fn wait_initialization(&self) {
92        self.initial_fetch_waiter.wait().await;
93    }
94
95    /// NOTE: this subscription never ends even after update task is shutdown.
96    /// You should consume this stream in a spawn_cancellable.
97    pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ {
98        stream! {
99            let mut notify = pin!(self.meta_update_notify.notified());
100            loop {
101                notify.as_mut().await;
102                notify.set(self.meta_update_notify.notified());
103                // enable waiting for next notification before yield so don't miss
104                // any notifications.
105                notify.as_mut().enable();
106                yield ();
107            }
108        }
109    }
110
111    /// NOTE: this subscription never ends even after update task is shutdown.
112    /// You should consume this stream in a spawn_cancellable.
113    ///
114    /// Stream will yield the first element immediately without blocking.
115    /// The first element will be initial value of the field.
116    ///
117    /// This may yield an outdated initial value if you didn't call
118    /// [`Self::wait_initialization`].
119    pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>(
120        &'a self,
121        db: &'a Database,
122        name: &'a str,
123    ) -> impl Stream<Item = Option<MetaValue<V>>> + 'a {
124        stream! {
125            let mut update_stream = pin!(self.subscribe_to_updates());
126            loop {
127                let value = self.get_field_from_db(db, name).await;
128                yield value;
129                if update_stream.next().await.is_none() {
130                    break;
131                }
132            }
133        }
134    }
135
136    /// Update all source in background.
137    ///
138    /// Caller should run this method in a task.
139    pub(crate) async fn update_continuously(&self, client: &Client) -> ! {
140        let mut current_revision = self
141            .current_revision(&mut client.db().begin_transaction_nc().await)
142            .await;
143        let client_config = client.config().await;
144        let meta_values = self
145            .source
146            .fetch(
147                &client_config,
148                &client.api,
149                FetchKind::Initial,
150                current_revision,
151            )
152            .await;
153        let failed_initial = meta_values.is_err();
154        match meta_values {
155            Ok(meta_values) => self.save_meta_values(client, &meta_values).await,
156            Err(error) => {
157                warn!(target: LOG_CLIENT, err = %error.fmt_compact_anyhow(), "failed to fetch source");
158            }
159        };
160        self.initial_fetch_waiter.done();
161
162        // don't wait if we failed first item
163        if !failed_initial {
164            self.source.wait_for_update().await;
165        }
166
167        // now keep updating slowly
168        loop {
169            if let Ok(meta_values) = self
170                .source
171                .fetch(
172                    &client_config,
173                    &client.api,
174                    FetchKind::Background,
175                    current_revision,
176                )
177                .await
178            {
179                current_revision = Some(meta_values.revision);
180                self.save_meta_values(client, &meta_values).await;
181            }
182            self.source.wait_for_update().await;
183        }
184    }
185
186    async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) {
187        let mut dbtx = client.db().begin_transaction().await;
188        dbtx.remove_by_prefix(&MetaFieldPrefix).await;
189        dbtx.insert_entry(
190            &MetaServiceInfoKey,
191            &MetaServiceInfo {
192                last_updated: fedimint_core::time::now(),
193                revision: meta_values.revision,
194            },
195        )
196        .await;
197        for (key, value) in &meta_values.values {
198            dbtx.insert_entry(&MetaFieldKey(key.clone()), &MetaFieldValue(value.clone()))
199                .await;
200        }
201        dbtx.commit_tx().await;
202        // notify everyone about changes
203        self.meta_update_notify.notify_waiters();
204    }
205}
206
207/// Tries to parse `str_value` as JSON. In the special case that `V` is `String`
208/// we return the raw `str_value` if JSON parsing fails. This necessary since
209/// the spec wasn't clear enough in the beginning.
210#[instrument(target = LOG_CLIENT, err)] // log on every failure
211pub fn parse_meta_value_static<V: DeserializeOwned + 'static>(
212    str_value: &str,
213) -> anyhow::Result<V> {
214    let res = serde_json::from_str(str_value)
215        .with_context(|| format!("Decoding meta field value '{str_value}' failed"));
216
217    // In the past we encoded some string fields as "just a string" without quotes,
218    // this code ensures that old meta values still parse since config is hard to
219    // change
220    if res.is_err() && std::any::TypeId::of::<V>() == std::any::TypeId::of::<String>() {
221        let string_ret = Box::new(str_value.to_owned());
222        let ret: Box<V> = unsafe {
223            // We can transmute a String to V because we know that V==String
224            std::mem::transmute(string_ret)
225        };
226        Ok(*ret)
227    } else {
228        res
229    }
230}