1#![deny(clippy::pedantic)]
2#![allow(clippy::module_name_repetitions)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::missing_errors_doc)]
5
6pub mod db;
7
8use std::collections::BTreeMap;
9use std::future;
10
11use async_trait::async_trait;
12use db::{
13 MetaConsensusKey, MetaDesiredKey, MetaDesiredValue, MetaSubmissionsByKeyPrefix,
14 MetaSubmissionsKey,
15};
16use fedimint_core::config::{
17 ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
18 TypedServerModuleConfig, TypedServerModuleConsensusConfig,
19};
20use fedimint_core::core::ModuleInstanceId;
21use fedimint_core::db::{
22 Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
23 NonCommittable,
24};
25use fedimint_core::module::audit::Audit;
26use fedimint_core::module::serde_json::Value;
27use fedimint_core::module::{
28 ApiAuth, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
29 InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
30 TransactionItemAmount, api_endpoint, serde_json,
31};
32use fedimint_core::{InPoint, NumPeers, OutPoint, PeerId, push_db_pair_items};
33use fedimint_logging::LOG_MODULE_META;
34use fedimint_meta_common::config::{
35 MetaClientConfig, MetaConfig, MetaConfigConsensus, MetaConfigPrivate,
36};
37pub use fedimint_meta_common::config::{MetaGenParams, MetaGenParamsConsensus, MetaGenParamsLocal};
38use fedimint_meta_common::endpoint::{
39 GET_CONSENSUS_ENDPOINT, GET_CONSENSUS_REV_ENDPOINT, GET_SUBMISSIONS_ENDPOINT,
40 GetConsensusRequest, GetSubmissionResponse, GetSubmissionsRequest, SUBMIT_ENDPOINT,
41 SubmitRequest,
42};
43use fedimint_meta_common::{
44 DEFAULT_META_KEY, MODULE_CONSENSUS_VERSION, MetaCommonInit, MetaConsensusItem,
45 MetaConsensusValue, MetaInput, MetaInputError, MetaKey, MetaModuleTypes, MetaOutput,
46 MetaOutputError, MetaOutputOutcome, MetaValue,
47};
48use fedimint_server_core::config::PeerHandleOps;
49use fedimint_server_core::migration::ServerModuleDbMigrationFn;
50use fedimint_server_core::{ServerModule, ServerModuleInit, ServerModuleInitArgs};
51use futures::StreamExt;
52use rand::{Rng, thread_rng};
53use strum::IntoEnumIterator;
54use tracing::{debug, info, trace};
55
56use crate::db::{
57 DbKeyPrefix, MetaConsensusKeyPrefix, MetaDesiredKeyPrefix, MetaSubmissionValue,
58 MetaSubmissionsKeyPrefix,
59};
60
61#[derive(Debug, Clone)]
63pub struct MetaInit;
64
65impl ModuleInit for MetaInit {
67 type Common = MetaCommonInit;
68
69 async fn dump_database(
71 &self,
72 dbtx: &mut DatabaseTransaction<'_>,
73 prefix_names: Vec<String>,
74 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
75 let mut items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
77 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
78 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
79 });
80
81 for table in filtered_prefixes {
82 match table {
83 DbKeyPrefix::Desired => {
84 push_db_pair_items!(
85 dbtx,
86 MetaDesiredKeyPrefix,
87 MetaDesiredKey,
88 MetaDesiredValue,
89 items,
90 "Meta Desired"
91 );
92 }
93 DbKeyPrefix::Consensus => {
94 push_db_pair_items!(
95 dbtx,
96 MetaConsensusKeyPrefix,
97 MetaConsensusKey,
98 MetaConsensusValue,
99 items,
100 "Meta Consensus"
101 );
102 }
103 DbKeyPrefix::Submissions => {
104 push_db_pair_items!(
105 dbtx,
106 MetaSubmissionsKeyPrefix,
107 MetaSubmissionsKey,
108 MetaSubmissionValue,
109 items,
110 "Meta Submissions"
111 );
112 }
113 }
114 }
115
116 Box::new(items.into_iter())
117 }
118}
119
120#[async_trait]
122impl ServerModuleInit for MetaInit {
123 type Module = Meta;
124 type Params = MetaGenParams;
125
126 fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
128 &[MODULE_CONSENSUS_VERSION]
129 }
130
131 fn supported_api_versions(&self) -> SupportedModuleApiVersions {
132 SupportedModuleApiVersions::from_raw(
133 (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
134 (
135 MODULE_CONSENSUS_VERSION.major,
136 MODULE_CONSENSUS_VERSION.minor,
137 ),
138 &[(0, 0)],
139 )
140 }
141
142 async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
144 Ok(Meta {
145 cfg: args.cfg().to_typed()?,
146 our_peer_id: args.our_peer_id(),
147 num_peers: args.num_peers(),
148 db: args.db().clone(),
149 })
150 }
151
152 fn trusted_dealer_gen(
154 &self,
155 peers: &[PeerId],
156 params: &ConfigGenModuleParams,
157 ) -> BTreeMap<PeerId, ServerModuleConfig> {
158 let _params = self.parse_params(params).unwrap();
159 peers
161 .iter()
162 .map(|&peer| {
163 let config = MetaConfig {
164 private: MetaConfigPrivate,
165 consensus: MetaConfigConsensus {},
166 };
167 (peer, config.to_erased())
168 })
169 .collect()
170 }
171
172 async fn distributed_gen(
174 &self,
175 _peers: &(dyn PeerHandleOps + Send + Sync),
176 params: &ConfigGenModuleParams,
177 ) -> anyhow::Result<ServerModuleConfig> {
178 let _params = self.parse_params(params).unwrap();
179
180 Ok(MetaConfig {
181 private: MetaConfigPrivate,
182 consensus: MetaConfigConsensus {},
183 }
184 .to_erased())
185 }
186
187 fn get_client_config(
189 &self,
190 config: &ServerModuleConsensusConfig,
191 ) -> anyhow::Result<MetaClientConfig> {
192 let _config = MetaConfigConsensus::from_erased(config)?;
193 Ok(MetaClientConfig {})
194 }
195
196 fn validate_config(
197 &self,
198 _identity: &PeerId,
199 _config: ServerModuleConfig,
200 ) -> anyhow::Result<()> {
201 Ok(())
202 }
203
204 fn get_database_migrations(
206 &self,
207 ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Meta>> {
208 BTreeMap::new()
209 }
210}
211
212#[derive(Debug)]
214pub struct Meta {
215 pub cfg: MetaConfig,
216 pub our_peer_id: PeerId,
217 pub num_peers: NumPeers,
218 pub db: Database,
219}
220
221impl Meta {
222 async fn get_desired(dbtx: &mut DatabaseTransaction<'_>) -> Vec<(MetaKey, MetaDesiredValue)> {
223 dbtx.find_by_prefix(&MetaDesiredKeyPrefix)
224 .await
225 .map(|(k, v)| (k.0, v))
226 .collect()
227 .await
228 }
229
230 async fn get_submission(
231 dbtx: &mut DatabaseTransaction<'_>,
232 key: MetaKey,
233 peer_id: PeerId,
234 ) -> Option<MetaSubmissionValue> {
235 dbtx.get_value(&MetaSubmissionsKey { key, peer_id }).await
236 }
237
238 async fn get_consensus(dbtx: &mut DatabaseTransaction<'_>, key: MetaKey) -> Option<MetaValue> {
239 dbtx.get_value(&MetaConsensusKey(key))
240 .await
241 .map(|consensus_value| consensus_value.value)
242 }
243
244 async fn change_consensus(
245 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
246 key: MetaKey,
247 value: MetaValue,
248 matching_submissions: Vec<PeerId>,
249 ) {
250 let value_len = value.as_slice().len();
251 let revision = dbtx
252 .get_value(&MetaConsensusKey(key))
253 .await
254 .map(|cv| cv.revision);
255 let revision = revision.map(|r| r.wrapping_add(1)).unwrap_or_default();
256 dbtx.insert_entry(
257 &MetaConsensusKey(key),
258 &MetaConsensusValue { revision, value },
259 )
260 .await;
261
262 info!(target: LOG_MODULE_META, %key, rev = %revision, len = %value_len, "New consensus value");
263
264 for peer_id in matching_submissions {
265 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
266 .await;
267 }
268 }
269}
270
271#[async_trait]
273impl ServerModule for Meta {
274 type Common = MetaModuleTypes;
276 type Init = MetaInit;
277
278 async fn consensus_proposal(
283 &self,
284 dbtx: &mut DatabaseTransaction<'_>,
285 ) -> Vec<MetaConsensusItem> {
286 let desired: Vec<_> = Self::get_desired(dbtx).await;
287
288 let mut to_submit = vec![];
289
290 for (
291 key,
292 MetaDesiredValue {
293 value: desired_value,
294 salt,
295 },
296 ) in desired
297 {
298 let consensus_value = &Self::get_consensus(dbtx, key).await;
299 let consensus_submission_value =
300 Self::get_submission(dbtx, key, self.our_peer_id).await;
301 if consensus_submission_value.as_ref()
302 == Some(&MetaSubmissionValue {
303 value: desired_value.clone(),
304 salt,
305 })
306 {
307 } else if consensus_value.as_ref() == Some(&desired_value) {
309 if consensus_submission_value.is_none() {
310 } else {
314 to_submit.push(MetaConsensusItem {
318 key,
319 value: desired_value,
320 salt,
321 });
322 }
323 } else {
324 to_submit.push(MetaConsensusItem {
325 key,
326 value: desired_value,
327 salt,
328 });
329 }
330 }
331
332 trace!(target: LOG_MODULE_META, ?to_submit, "Desired actions");
333 to_submit
334 }
335
336 async fn process_consensus_item<'a, 'b>(
341 &'a self,
342 dbtx: &mut DatabaseTransaction<'b>,
343 MetaConsensusItem { key, value, salt }: MetaConsensusItem,
344 peer_id: PeerId,
345 ) -> anyhow::Result<()> {
346 trace!(target: LOG_MODULE_META, %key, %value, %salt, "Processing consensus item proposal");
347
348 let new_value = MetaSubmissionValue { salt, value };
349 if let Some(prev_value) = Self::get_submission(dbtx, key, peer_id).await {
351 if prev_value != new_value {
352 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
353 .await;
354 }
355 }
356 if Some(&new_value.value) == Self::get_consensus(dbtx, key).await.as_ref() {
358 debug!(target: LOG_MODULE_META, %peer_id, %key, "Peer submitted a redundant value");
359 return Ok(());
360 }
361
362 dbtx.insert_entry(&MetaSubmissionsKey { key, peer_id }, &new_value)
364 .await;
365
366 let matching_submissions: Vec<PeerId> = dbtx
368 .find_by_prefix(&MetaSubmissionsByKeyPrefix(key))
369 .await
370 .filter(|(_submission_key, submission_value)| {
371 future::ready(new_value.value == submission_value.value)
372 })
373 .map(|(submission_key, _)| submission_key.peer_id)
374 .collect()
375 .await;
376
377 let threshold = self.num_peers.threshold();
378 info!(target: LOG_MODULE_META,
379 %peer_id,
380 %key,
381 value_len = %new_value.value.as_slice().len(),
382 matching = %matching_submissions.len(),
383 %threshold, "Peer submitted a value");
384
385 if threshold <= matching_submissions.len() {
387 Self::change_consensus(dbtx, key, new_value.value, matching_submissions).await;
388 }
389
390 Ok(())
391 }
392
393 async fn process_input<'a, 'b, 'c>(
394 &'a self,
395 _dbtx: &mut DatabaseTransaction<'c>,
396 _input: &'b MetaInput,
397 _in_point: InPoint,
398 ) -> Result<InputMeta, MetaInputError> {
399 Err(MetaInputError::NotSupported)
400 }
401
402 async fn process_output<'a, 'b>(
403 &'a self,
404 _dbtx: &mut DatabaseTransaction<'b>,
405 _output: &'a MetaOutput,
406 _out_point: OutPoint,
407 ) -> Result<TransactionItemAmount, MetaOutputError> {
408 Err(MetaOutputError::NotSupported)
409 }
410
411 async fn output_status(
412 &self,
413 _dbtx: &mut DatabaseTransaction<'_>,
414 _out_point: OutPoint,
415 ) -> Option<MetaOutputOutcome> {
416 None
417 }
418
419 async fn audit(
420 &self,
421 _dbtx: &mut DatabaseTransaction<'_>,
422 _audit: &mut Audit,
423 _module_instance_id: ModuleInstanceId,
424 ) {
425 }
426
427 fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
428 vec![
429 api_endpoint! {
430 SUBMIT_ENDPOINT,
431 ApiVersion::new(0, 0),
432 async |module: &Meta, context, request: SubmitRequest| -> () {
433
434 match context.request_auth() {
435 None => return Err(ApiError::bad_request("Missing password".to_string())),
436 Some(auth) => {
437 module.handle_submit_request(&mut context.dbtx(), &auth, &request).await?;
438 }
439 }
440
441 Ok(())
442 }
443 },
444 api_endpoint! {
445 GET_CONSENSUS_ENDPOINT,
446 ApiVersion::new(0, 0),
447 async |module: &Meta, context, request: GetConsensusRequest| -> Option<MetaConsensusValue> {
448 module.handle_get_consensus_request(&mut context.dbtx().into_nc(), &request).await
449 }
450 },
451 api_endpoint! {
452 GET_CONSENSUS_REV_ENDPOINT,
453 ApiVersion::new(0, 0),
454 async |module: &Meta, context, request: GetConsensusRequest| -> Option<u64> {
455 module.handle_get_consensus_revision_request(&mut context.dbtx().into_nc(), &request).await
456 }
457 },
458 api_endpoint! {
459 GET_SUBMISSIONS_ENDPOINT,
460 ApiVersion::new(0, 0),
461 async |module: &Meta, context, request: GetSubmissionsRequest| -> GetSubmissionResponse {
462 match context.request_auth() {
463 None => return Err(ApiError::bad_request("Missing password".to_string())),
464 Some(auth) => {
465 module.handle_get_submissions_request(&mut context.dbtx().into_nc(),&auth, &request).await
466 }
467 }
468 }
469 },
470 ]
471 }
472}
473
474impl Meta {
475 async fn handle_submit_request(
476 &self,
477 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
478 _auth: &ApiAuth,
479 req: &SubmitRequest,
480 ) -> Result<(), ApiError> {
481 let salt = thread_rng().r#gen();
482
483 info!(target: LOG_MODULE_META,
484 key = %req.key,
485 peer_id = %self.our_peer_id,
486 value_len = %req.value.as_slice().len(),
487 "Our own guardian submitted a value");
488
489 dbtx.insert_entry(
490 &MetaDesiredKey(req.key),
491 &MetaDesiredValue {
492 value: req.value.clone(),
493 salt,
494 },
495 )
496 .await;
497
498 Ok(())
499 }
500
501 async fn handle_get_consensus_request(
502 &self,
503 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
504 req: &GetConsensusRequest,
505 ) -> Result<Option<MetaConsensusValue>, ApiError> {
506 Ok(dbtx.get_value(&MetaConsensusKey(req.0)).await)
507 }
508
509 async fn handle_get_consensus_revision_request(
510 &self,
511 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
512 req: &GetConsensusRequest,
513 ) -> Result<Option<u64>, ApiError> {
514 Ok(dbtx
515 .get_value(&MetaConsensusKey(req.0))
516 .await
517 .map(|cv| cv.revision))
518 }
519
520 async fn handle_get_submissions_request(
521 &self,
522 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
523 _auth: &ApiAuth,
524 req: &GetSubmissionsRequest,
525 ) -> Result<BTreeMap<PeerId, MetaValue>, ApiError> {
526 Ok(dbtx
527 .find_by_prefix(&MetaSubmissionsByKeyPrefix(req.0))
528 .await
529 .collect::<Vec<_>>()
530 .await
531 .into_iter()
532 .map(|(k, v)| (k.peer_id, v.value))
533 .collect())
534 }
535}
536
537impl Meta {
540 pub async fn handle_submit_request_ui(&self, value: Value) -> Result<(), ApiError> {
542 let mut dbtx = self.db.begin_transaction().await;
543
544 self.handle_submit_request(
545 &mut dbtx.to_ref_nc(),
546 &ApiAuth(String::new()),
547 &SubmitRequest {
548 key: DEFAULT_META_KEY,
549 value: MetaValue::from(serde_json::to_vec(&value).unwrap().as_slice()),
550 },
551 )
552 .await?;
553
554 dbtx.commit_tx_result()
555 .await
556 .map_err(|e| ApiError::server_error(e.to_string()))?;
557
558 Ok(())
559 }
560
561 pub async fn handle_get_consensus_request_ui(&self) -> Result<Option<Value>, ApiError> {
563 self.handle_get_consensus_request(
564 &mut self.db.begin_transaction_nc().await,
565 &GetConsensusRequest(DEFAULT_META_KEY),
566 )
567 .await?
568 .map(|value| serde_json::from_slice(value.value.as_slice()))
569 .transpose()
570 .map_err(|e| ApiError::server_error(e.to_string()))
571 }
572
573 pub async fn handle_get_consensus_revision_request_ui(&self) -> Result<u64, ApiError> {
575 self.handle_get_consensus_revision_request(
576 &mut self.db.begin_transaction_nc().await,
577 &GetConsensusRequest(DEFAULT_META_KEY),
578 )
579 .await
580 .map(|r| r.unwrap_or(0))
581 }
582
583 pub async fn handle_get_submissions_request_ui(
585 &self,
586 ) -> Result<BTreeMap<PeerId, Value>, ApiError> {
587 let mut submissions = BTreeMap::new();
588
589 let mut dbtx = self.db.begin_transaction_nc().await;
590
591 for (peer_id, value) in self
592 .handle_get_submissions_request(
593 &mut dbtx.to_ref_nc(),
594 &ApiAuth(String::new()),
595 &GetSubmissionsRequest(DEFAULT_META_KEY),
596 )
597 .await?
598 {
599 if let Ok(value) = serde_json::from_slice(value.as_slice()) {
600 submissions.insert(peer_id, value);
601 }
602 }
603
604 Ok(submissions)
605 }
606}