1#![deny(clippy::pedantic)]
2#![allow(clippy::module_name_repetitions)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::missing_errors_doc)]
5
6pub mod db;
7
8use std::collections::BTreeMap;
9use std::future;
10
11use async_trait::async_trait;
12use db::{
13 MetaConsensusKey, MetaDesiredKey, MetaDesiredValue, MetaSubmissionsByKeyPrefix,
14 MetaSubmissionsKey,
15};
16use fedimint_core::config::{
17 ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
18 TypedServerModuleConfig, TypedServerModuleConsensusConfig,
19};
20use fedimint_core::core::ModuleInstanceId;
21use fedimint_core::db::{
22 Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
23 NonCommittable,
24};
25use fedimint_core::module::audit::Audit;
26use fedimint_core::module::serde_json::Value;
27use fedimint_core::module::{
28 ApiAuth, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
29 InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
30 TransactionItemAmount, api_endpoint, serde_json,
31};
32use fedimint_core::{InPoint, NumPeers, OutPoint, PeerId, push_db_pair_items};
33use fedimint_logging::LOG_MODULE_META;
34use fedimint_meta_common::config::{
35 MetaClientConfig, MetaConfig, MetaConfigConsensus, MetaConfigLocal, MetaConfigPrivate,
36};
37pub use fedimint_meta_common::config::{MetaGenParams, MetaGenParamsConsensus, MetaGenParamsLocal};
38use fedimint_meta_common::endpoint::{
39 GET_CONSENSUS_ENDPOINT, GET_CONSENSUS_REV_ENDPOINT, GET_SUBMISSIONS_ENDPOINT,
40 GetConsensusRequest, GetSubmissionResponse, GetSubmissionsRequest, SUBMIT_ENDPOINT,
41 SubmitRequest,
42};
43use fedimint_meta_common::{
44 DEFAULT_META_KEY, MODULE_CONSENSUS_VERSION, MetaCommonInit, MetaConsensusItem,
45 MetaConsensusValue, MetaInput, MetaInputError, MetaKey, MetaModuleTypes, MetaOutput,
46 MetaOutputError, MetaOutputOutcome, MetaValue,
47};
48use fedimint_server_core::config::PeerHandleOps;
49use fedimint_server_core::migration::ServerModuleDbMigrationFn;
50use fedimint_server_core::{ServerModule, ServerModuleInit, ServerModuleInitArgs};
51use futures::StreamExt;
52use rand::{Rng, thread_rng};
53use strum::IntoEnumIterator;
54use tracing::{debug, info, trace};
55
56use crate::db::{
57 DbKeyPrefix, MetaConsensusKeyPrefix, MetaDesiredKeyPrefix, MetaSubmissionValue,
58 MetaSubmissionsKeyPrefix,
59};
60
61#[derive(Debug, Clone)]
63pub struct MetaInit;
64
65impl ModuleInit for MetaInit {
67 type Common = MetaCommonInit;
68
69 async fn dump_database(
71 &self,
72 dbtx: &mut DatabaseTransaction<'_>,
73 prefix_names: Vec<String>,
74 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
75 let mut items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
77 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
78 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
79 });
80
81 for table in filtered_prefixes {
82 match table {
83 DbKeyPrefix::Desired => {
84 push_db_pair_items!(
85 dbtx,
86 MetaDesiredKeyPrefix,
87 MetaDesiredKey,
88 MetaDesiredValue,
89 items,
90 "Meta Desired"
91 );
92 }
93 DbKeyPrefix::Consensus => {
94 push_db_pair_items!(
95 dbtx,
96 MetaConsensusKeyPrefix,
97 MetaConsensusKey,
98 MetaConsensusValue,
99 items,
100 "Meta Consensus"
101 );
102 }
103 DbKeyPrefix::Submissions => {
104 push_db_pair_items!(
105 dbtx,
106 MetaSubmissionsKeyPrefix,
107 MetaSubmissionsKey,
108 MetaSubmissionValue,
109 items,
110 "Meta Submissions"
111 );
112 }
113 }
114 }
115
116 Box::new(items.into_iter())
117 }
118}
119
120#[async_trait]
122impl ServerModuleInit for MetaInit {
123 type Module = Meta;
124 type Params = MetaGenParams;
125
126 fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
128 &[MODULE_CONSENSUS_VERSION]
129 }
130
131 fn supported_api_versions(&self) -> SupportedModuleApiVersions {
132 SupportedModuleApiVersions::from_raw(
133 (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
134 (
135 MODULE_CONSENSUS_VERSION.major,
136 MODULE_CONSENSUS_VERSION.minor,
137 ),
138 &[(0, 0)],
139 )
140 }
141
142 async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
144 Ok(Meta {
145 cfg: args.cfg().to_typed()?,
146 our_peer_id: args.our_peer_id(),
147 num_peers: args.num_peers(),
148 db: args.db().clone(),
149 })
150 }
151
152 fn trusted_dealer_gen(
154 &self,
155 peers: &[PeerId],
156 params: &ConfigGenModuleParams,
157 ) -> BTreeMap<PeerId, ServerModuleConfig> {
158 let _params = self.parse_params(params).unwrap();
159 peers
161 .iter()
162 .map(|&peer| {
163 let config = MetaConfig {
164 local: MetaConfigLocal {},
165 private: MetaConfigPrivate,
166 consensus: MetaConfigConsensus {},
167 };
168 (peer, config.to_erased())
169 })
170 .collect()
171 }
172
173 async fn distributed_gen(
175 &self,
176 _peers: &(dyn PeerHandleOps + Send + Sync),
177 params: &ConfigGenModuleParams,
178 ) -> anyhow::Result<ServerModuleConfig> {
179 let _params = self.parse_params(params).unwrap();
180
181 Ok(MetaConfig {
182 local: MetaConfigLocal {},
183 private: MetaConfigPrivate,
184 consensus: MetaConfigConsensus {},
185 }
186 .to_erased())
187 }
188
189 fn get_client_config(
191 &self,
192 config: &ServerModuleConsensusConfig,
193 ) -> anyhow::Result<MetaClientConfig> {
194 let _config = MetaConfigConsensus::from_erased(config)?;
195 Ok(MetaClientConfig {})
196 }
197
198 fn validate_config(
199 &self,
200 _identity: &PeerId,
201 _config: ServerModuleConfig,
202 ) -> anyhow::Result<()> {
203 Ok(())
204 }
205
206 fn get_database_migrations(
208 &self,
209 ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Meta>> {
210 BTreeMap::new()
211 }
212}
213
214#[derive(Debug)]
216pub struct Meta {
217 pub cfg: MetaConfig,
218 pub our_peer_id: PeerId,
219 pub num_peers: NumPeers,
220 pub db: Database,
221}
222
223impl Meta {
224 async fn get_desired(dbtx: &mut DatabaseTransaction<'_>) -> Vec<(MetaKey, MetaDesiredValue)> {
225 dbtx.find_by_prefix(&MetaDesiredKeyPrefix)
226 .await
227 .map(|(k, v)| (k.0, v))
228 .collect()
229 .await
230 }
231
232 async fn get_submission(
233 dbtx: &mut DatabaseTransaction<'_>,
234 key: MetaKey,
235 peer_id: PeerId,
236 ) -> Option<MetaSubmissionValue> {
237 dbtx.get_value(&MetaSubmissionsKey { key, peer_id }).await
238 }
239
240 async fn get_consensus(dbtx: &mut DatabaseTransaction<'_>, key: MetaKey) -> Option<MetaValue> {
241 dbtx.get_value(&MetaConsensusKey(key))
242 .await
243 .map(|consensus_value| consensus_value.value)
244 }
245
246 async fn change_consensus(
247 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
248 key: MetaKey,
249 value: MetaValue,
250 matching_submissions: Vec<PeerId>,
251 ) {
252 let value_len = value.as_slice().len();
253 let revision = dbtx
254 .get_value(&MetaConsensusKey(key))
255 .await
256 .map(|cv| cv.revision);
257 let revision = revision.map(|r| r.wrapping_add(1)).unwrap_or_default();
258 dbtx.insert_entry(
259 &MetaConsensusKey(key),
260 &MetaConsensusValue { revision, value },
261 )
262 .await;
263
264 info!(target: LOG_MODULE_META, %key, rev = %revision, len = %value_len, "New consensus value");
265
266 for peer_id in matching_submissions {
267 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
268 .await;
269 }
270 }
271}
272
273#[async_trait]
275impl ServerModule for Meta {
276 type Common = MetaModuleTypes;
278 type Init = MetaInit;
279
280 async fn consensus_proposal(
285 &self,
286 dbtx: &mut DatabaseTransaction<'_>,
287 ) -> Vec<MetaConsensusItem> {
288 let desired: Vec<_> = Self::get_desired(dbtx).await;
289
290 let mut to_submit = vec![];
291
292 for (
293 key,
294 MetaDesiredValue {
295 value: desired_value,
296 salt,
297 },
298 ) in desired
299 {
300 let consensus_value = &Self::get_consensus(dbtx, key).await;
301 let consensus_submission_value =
302 Self::get_submission(dbtx, key, self.our_peer_id).await;
303 if consensus_submission_value.as_ref()
304 == Some(&MetaSubmissionValue {
305 value: desired_value.clone(),
306 salt,
307 })
308 {
309 } else if consensus_value.as_ref() == Some(&desired_value) {
311 if consensus_submission_value.is_none() {
312 } else {
316 to_submit.push(MetaConsensusItem {
320 key,
321 value: desired_value,
322 salt,
323 });
324 }
325 } else {
326 to_submit.push(MetaConsensusItem {
327 key,
328 value: desired_value,
329 salt,
330 });
331 }
332 }
333
334 trace!(target: LOG_MODULE_META, ?to_submit, "Desired actions");
335 to_submit
336 }
337
338 async fn process_consensus_item<'a, 'b>(
343 &'a self,
344 dbtx: &mut DatabaseTransaction<'b>,
345 MetaConsensusItem { key, value, salt }: MetaConsensusItem,
346 peer_id: PeerId,
347 ) -> anyhow::Result<()> {
348 trace!(target: LOG_MODULE_META, %key, %value, %salt, "Processing consensus item proposal");
349
350 let new_value = MetaSubmissionValue { salt, value };
351 if let Some(prev_value) = Self::get_submission(dbtx, key, peer_id).await {
353 if prev_value != new_value {
354 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
355 .await;
356 }
357 }
358 if Some(&new_value.value) == Self::get_consensus(dbtx, key).await.as_ref() {
360 debug!(target: LOG_MODULE_META, %peer_id, %key, "Peer submitted a redundant value");
361 return Ok(());
362 }
363
364 dbtx.insert_entry(&MetaSubmissionsKey { key, peer_id }, &new_value)
366 .await;
367
368 let matching_submissions: Vec<PeerId> = dbtx
370 .find_by_prefix(&MetaSubmissionsByKeyPrefix(key))
371 .await
372 .filter(|(_submission_key, submission_value)| {
373 future::ready(new_value.value == submission_value.value)
374 })
375 .map(|(submission_key, _)| submission_key.peer_id)
376 .collect()
377 .await;
378
379 let threshold = self.num_peers.threshold();
380 info!(target: LOG_MODULE_META,
381 %peer_id,
382 %key,
383 value_len = %new_value.value.as_slice().len(),
384 matching = %matching_submissions.len(),
385 %threshold, "Peer submitted a value");
386
387 if threshold <= matching_submissions.len() {
389 Self::change_consensus(dbtx, key, new_value.value, matching_submissions).await;
390 }
391
392 Ok(())
393 }
394
395 async fn process_input<'a, 'b, 'c>(
396 &'a self,
397 _dbtx: &mut DatabaseTransaction<'c>,
398 _input: &'b MetaInput,
399 _in_point: InPoint,
400 ) -> Result<InputMeta, MetaInputError> {
401 Err(MetaInputError::NotSupported)
402 }
403
404 async fn process_output<'a, 'b>(
405 &'a self,
406 _dbtx: &mut DatabaseTransaction<'b>,
407 _output: &'a MetaOutput,
408 _out_point: OutPoint,
409 ) -> Result<TransactionItemAmount, MetaOutputError> {
410 Err(MetaOutputError::NotSupported)
411 }
412
413 async fn output_status(
414 &self,
415 _dbtx: &mut DatabaseTransaction<'_>,
416 _out_point: OutPoint,
417 ) -> Option<MetaOutputOutcome> {
418 None
419 }
420
421 async fn audit(
422 &self,
423 _dbtx: &mut DatabaseTransaction<'_>,
424 _audit: &mut Audit,
425 _module_instance_id: ModuleInstanceId,
426 ) {
427 }
428
429 fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
430 vec![
431 api_endpoint! {
432 SUBMIT_ENDPOINT,
433 ApiVersion::new(0, 0),
434 async |module: &Meta, context, request: SubmitRequest| -> () {
435
436 match context.request_auth() {
437 None => return Err(ApiError::bad_request("Missing password".to_string())),
438 Some(auth) => {
439 module.handle_submit_request(&mut context.dbtx(), &auth, &request).await?;
440 }
441 }
442
443 Ok(())
444 }
445 },
446 api_endpoint! {
447 GET_CONSENSUS_ENDPOINT,
448 ApiVersion::new(0, 0),
449 async |module: &Meta, context, request: GetConsensusRequest| -> Option<MetaConsensusValue> {
450 module.handle_get_consensus_request(&mut context.dbtx().into_nc(), &request).await
451 }
452 },
453 api_endpoint! {
454 GET_CONSENSUS_REV_ENDPOINT,
455 ApiVersion::new(0, 0),
456 async |module: &Meta, context, request: GetConsensusRequest| -> Option<u64> {
457 module.handle_get_consensus_revision_request(&mut context.dbtx().into_nc(), &request).await
458 }
459 },
460 api_endpoint! {
461 GET_SUBMISSIONS_ENDPOINT,
462 ApiVersion::new(0, 0),
463 async |module: &Meta, context, request: GetSubmissionsRequest| -> GetSubmissionResponse {
464 match context.request_auth() {
465 None => return Err(ApiError::bad_request("Missing password".to_string())),
466 Some(auth) => {
467 module.handle_get_submissions_request(&mut context.dbtx().into_nc(),&auth, &request).await
468 }
469 }
470 }
471 },
472 ]
473 }
474}
475
476impl Meta {
477 async fn handle_submit_request(
478 &self,
479 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
480 _auth: &ApiAuth,
481 req: &SubmitRequest,
482 ) -> Result<(), ApiError> {
483 let salt = thread_rng().r#gen();
484
485 info!(target: LOG_MODULE_META,
486 key = %req.key,
487 peer_id = %self.our_peer_id,
488 value_len = %req.value.as_slice().len(),
489 "Our own guardian submitted a value");
490
491 dbtx.insert_entry(
492 &MetaDesiredKey(req.key),
493 &MetaDesiredValue {
494 value: req.value.clone(),
495 salt,
496 },
497 )
498 .await;
499
500 Ok(())
501 }
502
503 async fn handle_get_consensus_request(
504 &self,
505 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
506 req: &GetConsensusRequest,
507 ) -> Result<Option<MetaConsensusValue>, ApiError> {
508 Ok(dbtx.get_value(&MetaConsensusKey(req.0)).await)
509 }
510
511 async fn handle_get_consensus_revision_request(
512 &self,
513 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
514 req: &GetConsensusRequest,
515 ) -> Result<Option<u64>, ApiError> {
516 Ok(dbtx
517 .get_value(&MetaConsensusKey(req.0))
518 .await
519 .map(|cv| cv.revision))
520 }
521
522 async fn handle_get_submissions_request(
523 &self,
524 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
525 _auth: &ApiAuth,
526 req: &GetSubmissionsRequest,
527 ) -> Result<BTreeMap<PeerId, MetaValue>, ApiError> {
528 Ok(dbtx
529 .find_by_prefix(&MetaSubmissionsByKeyPrefix(req.0))
530 .await
531 .collect::<Vec<_>>()
532 .await
533 .into_iter()
534 .map(|(k, v)| (k.peer_id, v.value))
535 .collect())
536 }
537}
538
539impl Meta {
542 pub async fn handle_submit_request_ui(&self, value: Value) -> Result<(), ApiError> {
544 let mut dbtx = self.db.begin_transaction().await;
545
546 self.handle_submit_request(
547 &mut dbtx.to_ref_nc(),
548 &ApiAuth(String::new()),
549 &SubmitRequest {
550 key: DEFAULT_META_KEY,
551 value: MetaValue::from(serde_json::to_vec(&value).unwrap().as_slice()),
552 },
553 )
554 .await?;
555
556 dbtx.commit_tx_result()
557 .await
558 .map_err(|e| ApiError::server_error(e.to_string()))?;
559
560 Ok(())
561 }
562
563 pub async fn handle_get_consensus_request_ui(&self) -> Result<Option<Value>, ApiError> {
565 self.handle_get_consensus_request(
566 &mut self.db.begin_transaction_nc().await,
567 &GetConsensusRequest(DEFAULT_META_KEY),
568 )
569 .await?
570 .map(|value| serde_json::from_slice(value.value.as_slice()))
571 .transpose()
572 .map_err(|e| ApiError::server_error(e.to_string()))
573 }
574
575 pub async fn handle_get_consensus_revision_request_ui(&self) -> Result<u64, ApiError> {
577 self.handle_get_consensus_revision_request(
578 &mut self.db.begin_transaction_nc().await,
579 &GetConsensusRequest(DEFAULT_META_KEY),
580 )
581 .await
582 .map(|r| r.unwrap_or(0))
583 }
584
585 pub async fn handle_get_submissions_request_ui(
587 &self,
588 ) -> Result<BTreeMap<PeerId, Value>, ApiError> {
589 let mut submissions = BTreeMap::new();
590
591 let mut dbtx = self.db.begin_transaction_nc().await;
592
593 for (peer_id, value) in self
594 .handle_get_submissions_request(
595 &mut dbtx.to_ref_nc(),
596 &ApiAuth(String::new()),
597 &GetSubmissionsRequest(DEFAULT_META_KEY),
598 )
599 .await?
600 {
601 if let Ok(value) = serde_json::from_slice(value.as_slice()) {
602 submissions.insert(peer_id, value);
603 }
604 }
605
606 Ok(submissions)
607 }
608}