Skip to main content

fedimint_meta_client/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::module_name_repetitions)]
4
5pub mod api;
6#[cfg(feature = "cli")]
7pub mod cli;
8pub mod db;
9pub mod states;
10
11use std::collections::BTreeMap;
12use std::time::Duration;
13
14use anyhow::Context as _;
15use api::MetaFederationApi;
16use common::{KIND, MetaConsensusValue, MetaKey, MetaValue};
17use db::DbKeyPrefix;
18use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
19use fedimint_client_module::db::ClientModuleMigrationFn;
20use fedimint_client_module::meta::{FetchKind, LegacyMetaSource, MetaSource, MetaValues};
21use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
22use fedimint_client_module::module::recovery::NoModuleBackup;
23use fedimint_client_module::module::{ClientModule, IClientModule};
24use fedimint_client_module::sm::Context;
25use fedimint_core::config::ClientConfig;
26use fedimint_core::core::{Decoder, ModuleKind};
27use fedimint_core::db::{DatabaseTransaction, DatabaseVersion};
28use fedimint_core::module::{
29    Amounts, ApiAuth, ApiVersion, ModuleCommon, ModuleInit, MultiApiVersion,
30};
31use fedimint_core::util::backoff_util::FibonacciBackoff;
32use fedimint_core::util::{BoxStream, backoff_util, retry};
33use fedimint_core::{PeerId, apply, async_trait_maybe_send};
34use fedimint_logging::LOG_CLIENT_MODULE_META;
35pub use fedimint_meta_common as common;
36use fedimint_meta_common::{DEFAULT_META_KEY, MetaCommonInit, MetaModuleTypes};
37use futures::stream;
38use serde::Deserialize;
39use serde_json::json;
40use states::MetaStateMachine;
41use strum::IntoEnumIterator;
42use tracing::{debug, warn};
43
44#[derive(Debug)]
45pub struct MetaClientModule {
46    module_api: DynModuleApi,
47    admin_auth: Option<ApiAuth>,
48}
49
50impl MetaClientModule {
51    fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
52        self.admin_auth
53            .clone()
54            .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
55    }
56
57    /// Submit a meta consensus value
58    ///
59    /// When *threshold* amount of peers submits the exact same value it
60    /// becomes a new consensus value.
61    ///
62    /// To "cancel" previous vote, peer can submit a value equal to the current
63    /// consensus value.
64    pub async fn submit(&self, key: MetaKey, value: MetaValue) -> anyhow::Result<()> {
65        self.module_api
66            .submit(key, value, self.admin_auth()?)
67            .await?;
68
69        Ok(())
70    }
71
72    /// Get the current meta consensus value along with it's revision
73    ///
74    /// See [`Self::get_consensus_value_rev`] to use when checking for updates.
75    pub async fn get_consensus_value(
76        &self,
77        key: MetaKey,
78    ) -> anyhow::Result<Option<MetaConsensusValue>> {
79        Ok(self.module_api.get_consensus(key).await?)
80    }
81
82    /// Get the current meta consensus value revision
83    ///
84    /// Each time a meta consensus value changes, the revision increases,
85    /// so checking just the revision can save a lot of bandwidth in periodic
86    /// checks.
87    pub async fn get_consensus_value_rev(&self, key: MetaKey) -> anyhow::Result<Option<u64>> {
88        Ok(self.module_api.get_consensus_rev(key).await?)
89    }
90
91    /// Get current submissions to change the meta consensus value.
92    ///
93    /// Upon changing the consensus
94    pub async fn get_submissions(
95        &self,
96        key: MetaKey,
97    ) -> anyhow::Result<BTreeMap<PeerId, MetaValue>> {
98        Ok(self
99            .module_api
100            .get_submissions(key, self.admin_auth()?)
101            .await?)
102    }
103}
104
105#[derive(Debug, Deserialize)]
106struct GetConsensusValueRequest {
107    key: MetaKey,
108}
109
110fn format_rpc_consensus_value_response(
111    maybe_consensus_value: Option<MetaConsensusValue>,
112) -> anyhow::Result<serde_json::Value> {
113    Ok(match maybe_consensus_value {
114        Some(MetaConsensusValue { revision, value }) => {
115            let value = value
116                .to_json_lossy()
117                .context("deserializing consensus value as json")?;
118
119            json!({
120                "revision": revision,
121                "value": value,
122            })
123        }
124        None => serde_json::Value::Null,
125    })
126}
127
128/// Data needed by the state machine
129#[derive(Debug, Clone)]
130pub struct MetaClientContext {
131    pub meta_decoder: Decoder,
132}
133
134// TODO: Boiler-plate
135impl Context for MetaClientContext {
136    const KIND: Option<ModuleKind> = Some(KIND);
137}
138
139#[apply(async_trait_maybe_send!)]
140impl ClientModule for MetaClientModule {
141    type Init = MetaClientInit;
142    type Common = MetaModuleTypes;
143    type Backup = NoModuleBackup;
144    type ModuleStateMachineContext = MetaClientContext;
145    type States = MetaStateMachine;
146
147    fn context(&self) -> Self::ModuleStateMachineContext {
148        MetaClientContext {
149            meta_decoder: self.decoder(),
150        }
151    }
152
153    fn input_fee(
154        &self,
155        _amount: &Amounts,
156        _input: &<Self::Common as ModuleCommon>::Input,
157    ) -> Option<Amounts> {
158        unreachable!()
159    }
160
161    fn output_fee(
162        &self,
163        _amount: &Amounts,
164        _output: &<Self::Common as ModuleCommon>::Output,
165    ) -> Option<Amounts> {
166        unreachable!()
167    }
168
169    async fn handle_rpc(
170        &self,
171        method: String,
172        request: serde_json::Value,
173    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
174        Box::pin(stream::once(async move {
175            match method.as_str() {
176                "get_consensus_value" => {
177                    let req: GetConsensusValueRequest = serde_json::from_value(request)?;
178                    let maybe_consensus_value = self.get_consensus_value(req.key).await?;
179                    format_rpc_consensus_value_response(maybe_consensus_value)
180                }
181                _ => Err(anyhow::format_err!("Unknown method: {method}")),
182            }
183        }))
184    }
185
186    #[cfg(feature = "cli")]
187    async fn handle_cli_command(
188        &self,
189        args: &[std::ffi::OsString],
190    ) -> anyhow::Result<serde_json::Value> {
191        cli::handle_cli_command(self, args).await
192    }
193}
194
195#[derive(Debug, Clone)]
196pub struct MetaClientInit;
197
198// TODO: Boilerplate-code
199impl ModuleInit for MetaClientInit {
200    type Common = MetaCommonInit;
201
202    async fn dump_database(
203        &self,
204        _dbtx: &mut DatabaseTransaction<'_>,
205        prefix_names: Vec<String>,
206    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
207        let items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
208        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
209            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
210        });
211
212        #[allow(clippy::never_loop)]
213        for table in filtered_prefixes {
214            match table {}
215        }
216
217        Box::new(items.into_iter())
218    }
219}
220
221/// Generates the client module
222#[apply(async_trait_maybe_send!)]
223impl ClientModuleInit for MetaClientInit {
224    type Module = MetaClientModule;
225
226    fn supported_api_versions(&self) -> MultiApiVersion {
227        MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
228            .expect("no version conflicts")
229    }
230
231    async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
232        Ok(MetaClientModule {
233            module_api: args.module_api().clone(),
234            admin_auth: args.admin_auth().cloned(),
235        })
236    }
237
238    fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, ClientModuleMigrationFn> {
239        BTreeMap::new()
240    }
241}
242
243/// Meta source fetching meta values from the meta module if available or the
244/// legacy meta source otherwise.
245#[derive(Clone, Debug, Default)]
246pub struct MetaModuleMetaSourceWithFallback<S = LegacyMetaSource> {
247    legacy: S,
248}
249
250impl<S> MetaModuleMetaSourceWithFallback<S> {
251    pub fn new(legacy: S) -> Self {
252        Self { legacy }
253    }
254}
255
256#[apply(async_trait_maybe_send!)]
257impl<S: MetaSource> MetaSource for MetaModuleMetaSourceWithFallback<S> {
258    async fn wait_for_update(&self) {
259        fedimint_core::runtime::sleep(Duration::from_mins(10)).await;
260    }
261
262    async fn fetch(
263        &self,
264        client_config: &ClientConfig,
265        api: &DynGlobalApi,
266        fetch_kind: fedimint_client_module::meta::FetchKind,
267        last_revision: Option<u64>,
268    ) -> anyhow::Result<fedimint_client_module::meta::MetaValues> {
269        let backoff = match fetch_kind {
270            // need to be fast the first time.
271            FetchKind::Initial => backoff_util::aggressive_backoff(),
272            FetchKind::Background => backoff_util::background_backoff(),
273        };
274
275        let maybe_meta_module_meta = get_meta_module_value(client_config, api, backoff)
276            .await
277            .map(|meta| {
278                Result::<_, anyhow::Error>::Ok(MetaValues {
279                    values: serde_json::from_slice(meta.value.as_slice())?,
280                    revision: meta.revision,
281                })
282            })
283            .transpose()?;
284
285        // If we couldn't fetch valid meta values from the meta module for any reason,
286        // fall back to the legacy meta source
287        if let Some(maybe_meta_module_meta) = maybe_meta_module_meta {
288            Ok(maybe_meta_module_meta)
289        } else {
290            self.legacy
291                .fetch(client_config, api, fetch_kind, last_revision)
292                .await
293        }
294    }
295}
296
297async fn get_meta_module_value(
298    client_config: &ClientConfig,
299    api: &DynGlobalApi,
300    backoff: FibonacciBackoff,
301) -> Option<MetaConsensusValue> {
302    match client_config.get_first_module_by_kind_cfg(KIND) {
303        Ok((instance_id, _)) => {
304            let meta_api = api.with_module(instance_id);
305
306            let overrides_res = retry("fetch_meta_values", backoff, || async {
307                Ok(meta_api.get_consensus(DEFAULT_META_KEY).await?)
308            })
309            .await;
310
311            match overrides_res {
312                Ok(Some(consensus)) => Some(consensus),
313                Ok(None) => {
314                    debug!(target: LOG_CLIENT_MODULE_META, "Meta module returned no consensus value");
315                    None
316                }
317                Err(e) => {
318                    warn!(target: LOG_CLIENT_MODULE_META, "Failed to fetch meta module consensus value: {}", e);
319                    None
320                }
321            }
322        }
323        _ => None,
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use fedimint_meta_common::MetaValue;
330    use serde_json::json;
331
332    use super::{MetaConsensusValue, format_rpc_consensus_value_response};
333
334    #[test]
335    fn formats_consensus_value_as_json() {
336        let response = format_rpc_consensus_value_response(Some(MetaConsensusValue {
337            revision: 7,
338            value: MetaValue::from(br#"{"welcome_message":"hello"}"#.as_slice()),
339        }))
340        .expect("valid json meta value should format");
341
342        assert_eq!(
343            response,
344            json!({
345                "revision": 7,
346                "value": {
347                    "welcome_message": "hello",
348                },
349            })
350        );
351    }
352
353    #[test]
354    fn formats_missing_consensus_value_as_null() {
355        let response =
356            format_rpc_consensus_value_response(None).expect("null response should format");
357
358        assert_eq!(response, serde_json::Value::Null);
359    }
360}