1#![deny(clippy::pedantic)]
2#![allow(clippy::missing_errors_doc)]
3#![allow(clippy::module_name_repetitions)]
4
5pub mod api;
6#[cfg(feature = "cli")]
7pub mod cli;
8pub mod db;
9pub mod states;
10
11use std::collections::BTreeMap;
12use std::time::Duration;
13
14use anyhow::Context as _;
15use api::MetaFederationApi;
16use common::{KIND, MetaConsensusValue, MetaKey, MetaValue};
17use db::DbKeyPrefix;
18use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
19use fedimint_client_module::db::ClientModuleMigrationFn;
20use fedimint_client_module::meta::{FetchKind, LegacyMetaSource, MetaSource, MetaValues};
21use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
22use fedimint_client_module::module::recovery::NoModuleBackup;
23use fedimint_client_module::module::{ClientModule, IClientModule};
24use fedimint_client_module::sm::Context;
25use fedimint_core::config::ClientConfig;
26use fedimint_core::core::{Decoder, ModuleKind};
27use fedimint_core::db::{DatabaseTransaction, DatabaseVersion};
28use fedimint_core::module::{
29 Amounts, ApiAuth, ApiVersion, ModuleCommon, ModuleInit, MultiApiVersion,
30};
31use fedimint_core::util::backoff_util::FibonacciBackoff;
32use fedimint_core::util::{BoxStream, backoff_util, retry};
33use fedimint_core::{PeerId, apply, async_trait_maybe_send};
34use fedimint_logging::LOG_CLIENT_MODULE_META;
35pub use fedimint_meta_common as common;
36use fedimint_meta_common::{DEFAULT_META_KEY, MetaCommonInit, MetaModuleTypes};
37use futures::stream;
38use serde::Deserialize;
39use serde_json::json;
40use states::MetaStateMachine;
41use strum::IntoEnumIterator;
42use tracing::{debug, warn};
43
44#[derive(Debug)]
45pub struct MetaClientModule {
46 module_api: DynModuleApi,
47 admin_auth: Option<ApiAuth>,
48}
49
50impl MetaClientModule {
51 fn admin_auth(&self) -> anyhow::Result<ApiAuth> {
52 self.admin_auth
53 .clone()
54 .ok_or_else(|| anyhow::format_err!("Admin auth not set"))
55 }
56
57 pub async fn submit(&self, key: MetaKey, value: MetaValue) -> anyhow::Result<()> {
65 self.module_api
66 .submit(key, value, self.admin_auth()?)
67 .await?;
68
69 Ok(())
70 }
71
72 pub async fn get_consensus_value(
76 &self,
77 key: MetaKey,
78 ) -> anyhow::Result<Option<MetaConsensusValue>> {
79 Ok(self.module_api.get_consensus(key).await?)
80 }
81
82 pub async fn get_consensus_value_rev(&self, key: MetaKey) -> anyhow::Result<Option<u64>> {
88 Ok(self.module_api.get_consensus_rev(key).await?)
89 }
90
91 pub async fn get_submissions(
95 &self,
96 key: MetaKey,
97 ) -> anyhow::Result<BTreeMap<PeerId, MetaValue>> {
98 Ok(self
99 .module_api
100 .get_submissions(key, self.admin_auth()?)
101 .await?)
102 }
103}
104
105#[derive(Debug, Deserialize)]
106struct GetConsensusValueRequest {
107 key: MetaKey,
108}
109
110fn format_rpc_consensus_value_response(
111 maybe_consensus_value: Option<MetaConsensusValue>,
112) -> anyhow::Result<serde_json::Value> {
113 Ok(match maybe_consensus_value {
114 Some(MetaConsensusValue { revision, value }) => {
115 let value = value
116 .to_json_lossy()
117 .context("deserializing consensus value as json")?;
118
119 json!({
120 "revision": revision,
121 "value": value,
122 })
123 }
124 None => serde_json::Value::Null,
125 })
126}
127
128#[derive(Debug, Clone)]
130pub struct MetaClientContext {
131 pub meta_decoder: Decoder,
132}
133
134impl Context for MetaClientContext {
136 const KIND: Option<ModuleKind> = Some(KIND);
137}
138
139#[apply(async_trait_maybe_send!)]
140impl ClientModule for MetaClientModule {
141 type Init = MetaClientInit;
142 type Common = MetaModuleTypes;
143 type Backup = NoModuleBackup;
144 type ModuleStateMachineContext = MetaClientContext;
145 type States = MetaStateMachine;
146
147 fn context(&self) -> Self::ModuleStateMachineContext {
148 MetaClientContext {
149 meta_decoder: self.decoder(),
150 }
151 }
152
153 fn input_fee(
154 &self,
155 _amount: &Amounts,
156 _input: &<Self::Common as ModuleCommon>::Input,
157 ) -> Option<Amounts> {
158 unreachable!()
159 }
160
161 fn output_fee(
162 &self,
163 _amount: &Amounts,
164 _output: &<Self::Common as ModuleCommon>::Output,
165 ) -> Option<Amounts> {
166 unreachable!()
167 }
168
169 async fn handle_rpc(
170 &self,
171 method: String,
172 request: serde_json::Value,
173 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
174 Box::pin(stream::once(async move {
175 match method.as_str() {
176 "get_consensus_value" => {
177 let req: GetConsensusValueRequest = serde_json::from_value(request)?;
178 let maybe_consensus_value = self.get_consensus_value(req.key).await?;
179 format_rpc_consensus_value_response(maybe_consensus_value)
180 }
181 _ => Err(anyhow::format_err!("Unknown method: {method}")),
182 }
183 }))
184 }
185
186 #[cfg(feature = "cli")]
187 async fn handle_cli_command(
188 &self,
189 args: &[std::ffi::OsString],
190 ) -> anyhow::Result<serde_json::Value> {
191 cli::handle_cli_command(self, args).await
192 }
193}
194
195#[derive(Debug, Clone)]
196pub struct MetaClientInit;
197
198impl ModuleInit for MetaClientInit {
200 type Common = MetaCommonInit;
201
202 async fn dump_database(
203 &self,
204 _dbtx: &mut DatabaseTransaction<'_>,
205 prefix_names: Vec<String>,
206 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
207 let items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
208 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
209 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
210 });
211
212 #[allow(clippy::never_loop)]
213 for table in filtered_prefixes {
214 match table {}
215 }
216
217 Box::new(items.into_iter())
218 }
219}
220
221#[apply(async_trait_maybe_send!)]
223impl ClientModuleInit for MetaClientInit {
224 type Module = MetaClientModule;
225
226 fn supported_api_versions(&self) -> MultiApiVersion {
227 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
228 .expect("no version conflicts")
229 }
230
231 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
232 Ok(MetaClientModule {
233 module_api: args.module_api().clone(),
234 admin_auth: args.admin_auth().cloned(),
235 })
236 }
237
238 fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, ClientModuleMigrationFn> {
239 BTreeMap::new()
240 }
241}
242
243#[derive(Clone, Debug, Default)]
246pub struct MetaModuleMetaSourceWithFallback<S = LegacyMetaSource> {
247 legacy: S,
248}
249
250impl<S> MetaModuleMetaSourceWithFallback<S> {
251 pub fn new(legacy: S) -> Self {
252 Self { legacy }
253 }
254}
255
256#[apply(async_trait_maybe_send!)]
257impl<S: MetaSource> MetaSource for MetaModuleMetaSourceWithFallback<S> {
258 async fn wait_for_update(&self) {
259 fedimint_core::runtime::sleep(Duration::from_mins(10)).await;
260 }
261
262 async fn fetch(
263 &self,
264 client_config: &ClientConfig,
265 api: &DynGlobalApi,
266 fetch_kind: fedimint_client_module::meta::FetchKind,
267 last_revision: Option<u64>,
268 ) -> anyhow::Result<fedimint_client_module::meta::MetaValues> {
269 let backoff = match fetch_kind {
270 FetchKind::Initial => backoff_util::aggressive_backoff(),
272 FetchKind::Background => backoff_util::background_backoff(),
273 };
274
275 let maybe_meta_module_meta = get_meta_module_value(client_config, api, backoff)
276 .await
277 .map(|meta| {
278 Result::<_, anyhow::Error>::Ok(MetaValues {
279 values: serde_json::from_slice(meta.value.as_slice())?,
280 revision: meta.revision,
281 })
282 })
283 .transpose()?;
284
285 if let Some(maybe_meta_module_meta) = maybe_meta_module_meta {
288 Ok(maybe_meta_module_meta)
289 } else {
290 self.legacy
291 .fetch(client_config, api, fetch_kind, last_revision)
292 .await
293 }
294 }
295}
296
297async fn get_meta_module_value(
298 client_config: &ClientConfig,
299 api: &DynGlobalApi,
300 backoff: FibonacciBackoff,
301) -> Option<MetaConsensusValue> {
302 match client_config.get_first_module_by_kind_cfg(KIND) {
303 Ok((instance_id, _)) => {
304 let meta_api = api.with_module(instance_id);
305
306 let overrides_res = retry("fetch_meta_values", backoff, || async {
307 Ok(meta_api.get_consensus(DEFAULT_META_KEY).await?)
308 })
309 .await;
310
311 match overrides_res {
312 Ok(Some(consensus)) => Some(consensus),
313 Ok(None) => {
314 debug!(target: LOG_CLIENT_MODULE_META, "Meta module returned no consensus value");
315 None
316 }
317 Err(e) => {
318 warn!(target: LOG_CLIENT_MODULE_META, "Failed to fetch meta module consensus value: {}", e);
319 None
320 }
321 }
322 }
323 _ => None,
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use fedimint_meta_common::MetaValue;
330 use serde_json::json;
331
332 use super::{MetaConsensusValue, format_rpc_consensus_value_response};
333
334 #[test]
335 fn formats_consensus_value_as_json() {
336 let response = format_rpc_consensus_value_response(Some(MetaConsensusValue {
337 revision: 7,
338 value: MetaValue::from(br#"{"welcome_message":"hello"}"#.as_slice()),
339 }))
340 .expect("valid json meta value should format");
341
342 assert_eq!(
343 response,
344 json!({
345 "revision": 7,
346 "value": {
347 "welcome_message": "hello",
348 },
349 })
350 );
351 }
352
353 #[test]
354 fn formats_missing_consensus_value_as_null() {
355 let response =
356 format_rpc_consensus_value_response(None).expect("null response should format");
357
358 assert_eq!(response, serde_json::Value::Null);
359 }
360}