1#![deny(clippy::pedantic)]
2#![allow(clippy::module_name_repetitions)]
3
4pub mod db;
5
6use std::collections::BTreeMap;
7use std::future;
8
9use async_trait::async_trait;
10use db::{
11 MetaConsensusKey, MetaDesiredKey, MetaDesiredValue, MetaSubmissionsByKeyPrefix,
12 MetaSubmissionsKey,
13};
14use fedimint_core::config::{
15 ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
16 TypedServerModuleConfig, TypedServerModuleConsensusConfig,
17};
18use fedimint_core::core::ModuleInstanceId;
19use fedimint_core::db::{
20 CoreMigrationFn, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
21 NonCommittable,
22};
23use fedimint_core::module::audit::Audit;
24use fedimint_core::module::{
25 ApiAuth, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
26 InputMeta, ModuleConsensusVersion, ModuleInit, PeerHandle, SupportedModuleApiVersions,
27 TransactionItemAmount, api_endpoint,
28};
29use fedimint_core::{InPoint, NumPeers, OutPoint, PeerId, push_db_pair_items};
30use fedimint_logging::LOG_MODULE_META;
31use fedimint_meta_common::config::{
32 MetaClientConfig, MetaConfig, MetaConfigConsensus, MetaConfigLocal, MetaConfigPrivate,
33};
34pub use fedimint_meta_common::config::{MetaGenParams, MetaGenParamsConsensus, MetaGenParamsLocal};
35use fedimint_meta_common::endpoint::{
36 GET_CONSENSUS_ENDPOINT, GET_CONSENSUS_REV_ENDPOINT, GET_SUBMISSIONS_ENDPOINT,
37 GetConsensusRequest, GetSubmissionResponse, GetSubmissionsRequest, SUBMIT_ENDPOINT,
38 SubmitRequest,
39};
40use fedimint_meta_common::{
41 MODULE_CONSENSUS_VERSION, MetaCommonInit, MetaConsensusItem, MetaConsensusValue, MetaInput,
42 MetaInputError, MetaKey, MetaModuleTypes, MetaOutput, MetaOutputError, MetaOutputOutcome,
43 MetaValue,
44};
45use fedimint_server::core::{
46 DynServerModule, ServerModule, ServerModuleInit, ServerModuleInitArgs,
47};
48use futures::StreamExt;
49use rand::{Rng, thread_rng};
50use strum::IntoEnumIterator;
51use tracing::{debug, info, trace};
52
53use crate::db::{
54 DbKeyPrefix, MetaConsensusKeyPrefix, MetaDesiredKeyPrefix, MetaSubmissionValue,
55 MetaSubmissionsKeyPrefix,
56};
57
58#[derive(Debug, Clone)]
60pub struct MetaInit;
61
62impl ModuleInit for MetaInit {
64 type Common = MetaCommonInit;
65
66 async fn dump_database(
68 &self,
69 dbtx: &mut DatabaseTransaction<'_>,
70 prefix_names: Vec<String>,
71 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
72 let mut items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
74 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
75 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
76 });
77
78 for table in filtered_prefixes {
79 match table {
80 DbKeyPrefix::Desired => {
81 push_db_pair_items!(
82 dbtx,
83 MetaDesiredKeyPrefix,
84 MetaDesiredKey,
85 MetaDesiredValue,
86 items,
87 "Meta Desired"
88 );
89 }
90 DbKeyPrefix::Consensus => {
91 push_db_pair_items!(
92 dbtx,
93 MetaConsensusKeyPrefix,
94 MetaConsensusKey,
95 MetaConsensusValue,
96 items,
97 "Meta Consensus"
98 );
99 }
100 DbKeyPrefix::Submissions => {
101 push_db_pair_items!(
102 dbtx,
103 MetaSubmissionsKeyPrefix,
104 MetaSubmissionsKey,
105 MetaSubmissionValue,
106 items,
107 "Meta Submissions"
108 );
109 }
110 }
111 }
112
113 Box::new(items.into_iter())
114 }
115}
116
117#[async_trait]
119impl ServerModuleInit for MetaInit {
120 type Params = MetaGenParams;
121
122 fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
124 &[MODULE_CONSENSUS_VERSION]
125 }
126
127 fn supported_api_versions(&self) -> SupportedModuleApiVersions {
128 SupportedModuleApiVersions::from_raw(
129 (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
130 (
131 MODULE_CONSENSUS_VERSION.major,
132 MODULE_CONSENSUS_VERSION.minor,
133 ),
134 &[(0, 0)],
135 )
136 }
137
138 async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<DynServerModule> {
140 Ok(Meta {
141 cfg: args.cfg().to_typed()?,
142 our_peer_id: args.our_peer_id(),
143 num_peers: args.num_peers(),
144 }
145 .into())
146 }
147
148 fn trusted_dealer_gen(
150 &self,
151 peers: &[PeerId],
152 params: &ConfigGenModuleParams,
153 ) -> BTreeMap<PeerId, ServerModuleConfig> {
154 let _params = self.parse_params(params).unwrap();
155 peers
157 .iter()
158 .map(|&peer| {
159 let config = MetaConfig {
160 local: MetaConfigLocal {},
161 private: MetaConfigPrivate,
162 consensus: MetaConfigConsensus {},
163 };
164 (peer, config.to_erased())
165 })
166 .collect()
167 }
168
169 async fn distributed_gen(
171 &self,
172 _peers: &PeerHandle,
173 params: &ConfigGenModuleParams,
174 ) -> anyhow::Result<ServerModuleConfig> {
175 let _params = self.parse_params(params).unwrap();
176
177 Ok(MetaConfig {
178 local: MetaConfigLocal {},
179 private: MetaConfigPrivate,
180 consensus: MetaConfigConsensus {},
181 }
182 .to_erased())
183 }
184
185 fn get_client_config(
187 &self,
188 config: &ServerModuleConsensusConfig,
189 ) -> anyhow::Result<MetaClientConfig> {
190 let _config = MetaConfigConsensus::from_erased(config)?;
191 Ok(MetaClientConfig {})
192 }
193
194 fn validate_config(
195 &self,
196 _identity: &PeerId,
197 _config: ServerModuleConfig,
198 ) -> anyhow::Result<()> {
199 Ok(())
200 }
201
202 fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
204 BTreeMap::new()
205 }
206}
207
208#[derive(Debug)]
210pub struct Meta {
211 pub cfg: MetaConfig,
212 pub our_peer_id: PeerId,
213 pub num_peers: NumPeers,
214}
215
216impl Meta {
217 async fn get_desired(dbtx: &mut DatabaseTransaction<'_>) -> Vec<(MetaKey, MetaDesiredValue)> {
218 dbtx.find_by_prefix(&MetaDesiredKeyPrefix)
219 .await
220 .map(|(k, v)| (k.0, v))
221 .collect()
222 .await
223 }
224
225 async fn get_submission(
226 dbtx: &mut DatabaseTransaction<'_>,
227 key: MetaKey,
228 peer_id: PeerId,
229 ) -> Option<MetaSubmissionValue> {
230 dbtx.get_value(&MetaSubmissionsKey { key, peer_id }).await
231 }
232
233 async fn get_consensus(dbtx: &mut DatabaseTransaction<'_>, key: MetaKey) -> Option<MetaValue> {
234 dbtx.get_value(&MetaConsensusKey(key))
235 .await
236 .map(|consensus_value| consensus_value.value)
237 }
238
239 async fn change_consensus(
240 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
241 key: MetaKey,
242 value: MetaValue,
243 matching_submissions: Vec<PeerId>,
244 ) {
245 let value_len = value.as_slice().len();
246 let revision = dbtx
247 .get_value(&MetaConsensusKey(key))
248 .await
249 .map(|cv| cv.revision);
250 let revision = revision.map(|r| r.wrapping_add(1)).unwrap_or_default();
251 dbtx.insert_entry(
252 &MetaConsensusKey(key),
253 &MetaConsensusValue { revision, value },
254 )
255 .await;
256
257 info!(target: LOG_MODULE_META, %key, rev = %revision, len = %value_len, "New consensus value");
258
259 for peer_id in matching_submissions {
260 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
261 .await;
262 }
263 }
264}
265
266#[async_trait]
268impl ServerModule for Meta {
269 type Common = MetaModuleTypes;
271 type Init = MetaInit;
272
273 async fn consensus_proposal(
278 &self,
279 dbtx: &mut DatabaseTransaction<'_>,
280 ) -> Vec<MetaConsensusItem> {
281 let desired: Vec<_> = Self::get_desired(dbtx).await;
282
283 let mut to_submit = vec![];
284
285 for (
286 key,
287 MetaDesiredValue {
288 value: desired_value,
289 salt,
290 },
291 ) in desired
292 {
293 let consensus_value = &Self::get_consensus(dbtx, key).await;
294 let consensus_submission_value =
295 Self::get_submission(dbtx, key, self.our_peer_id).await;
296 if consensus_submission_value.as_ref()
297 == Some(&MetaSubmissionValue {
298 value: desired_value.clone(),
299 salt,
300 })
301 {
302 } else if consensus_value.as_ref() == Some(&desired_value) {
304 if consensus_submission_value.is_none() {
305 } else {
309 to_submit.push(MetaConsensusItem {
313 key,
314 value: desired_value,
315 salt,
316 });
317 }
318 } else {
319 to_submit.push(MetaConsensusItem {
320 key,
321 value: desired_value,
322 salt,
323 });
324 }
325 }
326
327 trace!(target: LOG_MODULE_META, ?to_submit, "Desired actions");
328 to_submit
329 }
330
331 async fn process_consensus_item<'a, 'b>(
336 &'a self,
337 dbtx: &mut DatabaseTransaction<'b>,
338 MetaConsensusItem { key, value, salt }: MetaConsensusItem,
339 peer_id: PeerId,
340 ) -> anyhow::Result<()> {
341 trace!(target: LOG_MODULE_META, %key, %value, %salt, "Processing consensus item proposal");
342
343 let new_value = MetaSubmissionValue { salt, value };
344 if let Some(prev_value) = Self::get_submission(dbtx, key, peer_id).await {
346 if prev_value != new_value {
347 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
348 .await;
349 }
350 }
351 if Some(&new_value.value) == Self::get_consensus(dbtx, key).await.as_ref() {
353 debug!(target: LOG_MODULE_META, %peer_id, %key, "Peer submitted a redundant value");
354 return Ok(());
355 }
356
357 dbtx.insert_entry(&MetaSubmissionsKey { key, peer_id }, &new_value)
359 .await;
360
361 let matching_submissions: Vec<PeerId> = dbtx
363 .find_by_prefix(&MetaSubmissionsByKeyPrefix(key))
364 .await
365 .filter(|(_submission_key, submission_value)| {
366 future::ready(new_value.value == submission_value.value)
367 })
368 .map(|(submission_key, _)| submission_key.peer_id)
369 .collect()
370 .await;
371
372 let threshold = self.num_peers.threshold();
373 info!(target: LOG_MODULE_META,
374 %peer_id,
375 %key,
376 value_len = %new_value.value.as_slice().len(),
377 matching = %matching_submissions.len(),
378 %threshold, "Peer submitted a value");
379
380 if threshold <= matching_submissions.len() {
382 Self::change_consensus(dbtx, key, new_value.value, matching_submissions).await;
383 }
384
385 Ok(())
386 }
387
388 async fn process_input<'a, 'b, 'c>(
389 &'a self,
390 _dbtx: &mut DatabaseTransaction<'c>,
391 _input: &'b MetaInput,
392 _in_point: InPoint,
393 ) -> Result<InputMeta, MetaInputError> {
394 Err(MetaInputError::NotSupported)
395 }
396
397 async fn process_output<'a, 'b>(
398 &'a self,
399 _dbtx: &mut DatabaseTransaction<'b>,
400 _output: &'a MetaOutput,
401 _out_point: OutPoint,
402 ) -> Result<TransactionItemAmount, MetaOutputError> {
403 Err(MetaOutputError::NotSupported)
404 }
405
406 async fn output_status(
407 &self,
408 _dbtx: &mut DatabaseTransaction<'_>,
409 _out_point: OutPoint,
410 ) -> Option<MetaOutputOutcome> {
411 None
412 }
413
414 async fn audit(
415 &self,
416 _dbtx: &mut DatabaseTransaction<'_>,
417 _audit: &mut Audit,
418 _module_instance_id: ModuleInstanceId,
419 ) {
420 }
421
422 fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
423 vec![
424 api_endpoint! {
425 SUBMIT_ENDPOINT,
426 ApiVersion::new(0, 0),
427 async |module: &Meta, context, request: SubmitRequest| -> () {
428
429 match context.request_auth() {
430 None => return Err(ApiError::bad_request("Missing password".to_string())),
431 Some(auth) => {
432 module.handle_submit_request(&mut context.dbtx(), &auth, &request).await?;
433 }
434 }
435
436 Ok(())
437 }
438 },
439 api_endpoint! {
440 GET_CONSENSUS_ENDPOINT,
441 ApiVersion::new(0, 0),
442 async |module: &Meta, context, request: GetConsensusRequest| -> Option<MetaConsensusValue> {
443 module.handle_get_consensus_request(&mut context.dbtx().into_nc(), &request).await
444 }
445 },
446 api_endpoint! {
447 GET_CONSENSUS_REV_ENDPOINT,
448 ApiVersion::new(0, 0),
449 async |module: &Meta, context, request: GetConsensusRequest| -> Option<u64> {
450 module.handle_get_consensus_revision_request(&mut context.dbtx().into_nc(), &request).await
451 }
452 },
453 api_endpoint! {
454 GET_SUBMISSIONS_ENDPOINT,
455 ApiVersion::new(0, 0),
456 async |module: &Meta, context, request: GetSubmissionsRequest| -> GetSubmissionResponse {
457 match context.request_auth() {
458 None => return Err(ApiError::bad_request("Missing password".to_string())),
459 Some(auth) => {
460 module.handle_get_submissions_request(&mut context.dbtx().into_nc(),&auth, &request).await
461 }
462 }
463 }
464 },
465 ]
466 }
467}
468
469impl Meta {
470 async fn handle_submit_request(
471 &self,
472 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
473 _auth: &ApiAuth,
474 req: &SubmitRequest,
475 ) -> Result<(), ApiError> {
476 let salt = thread_rng().r#gen();
477
478 info!(target: LOG_MODULE_META,
479 key = %req.key,
480 peer_id = %self.our_peer_id,
481 value_len = %req.value.as_slice().len(),
482 "Our own guardian submitted a value");
483
484 dbtx.insert_entry(
485 &MetaDesiredKey(req.key),
486 &MetaDesiredValue {
487 value: req.value.clone(),
488 salt,
489 },
490 )
491 .await;
492
493 Ok(())
494 }
495
496 async fn handle_get_consensus_request(
497 &self,
498 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
499 req: &GetConsensusRequest,
500 ) -> Result<Option<MetaConsensusValue>, ApiError> {
501 Ok(dbtx.get_value(&MetaConsensusKey(req.0)).await)
502 }
503
504 async fn handle_get_consensus_revision_request(
505 &self,
506 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
507 req: &GetConsensusRequest,
508 ) -> Result<Option<u64>, ApiError> {
509 Ok(dbtx
510 .get_value(&MetaConsensusKey(req.0))
511 .await
512 .map(|cv| cv.revision))
513 }
514
515 async fn handle_get_submissions_request(
516 &self,
517 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
518 _auth: &ApiAuth,
519 req: &GetSubmissionsRequest,
520 ) -> Result<BTreeMap<PeerId, MetaValue>, ApiError> {
521 Ok(dbtx
522 .find_by_prefix(&MetaSubmissionsByKeyPrefix(req.0))
523 .await
524 .collect::<Vec<_>>()
525 .await
526 .into_iter()
527 .map(|(k, v)| (k.peer_id, v.value))
528 .collect())
529 }
530}