1#![deny(clippy::pedantic)]
2#![allow(clippy::module_name_repetitions)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::missing_errors_doc)]
5
6pub mod db;
7
8use std::collections::BTreeMap;
9use std::future;
10
11use async_trait::async_trait;
12use db::{
13 MetaConsensusKey, MetaDesiredKey, MetaDesiredValue, MetaSubmissionsByKeyPrefix,
14 MetaSubmissionsKey,
15};
16use fedimint_core::config::{
17 ServerModuleConfig, ServerModuleConsensusConfig, TypedServerModuleConfig,
18 TypedServerModuleConsensusConfig,
19};
20use fedimint_core::core::ModuleInstanceId;
21use fedimint_core::db::{
22 Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
23 NonCommittable,
24};
25use fedimint_core::module::audit::Audit;
26use fedimint_core::module::serde_json::Value;
27use fedimint_core::module::{
28 ApiAuth, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
29 InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
30 TransactionItemAmounts, api_endpoint, serde_json,
31};
32use fedimint_core::{InPoint, NumPeers, OutPoint, PeerId, push_db_pair_items};
33use fedimint_logging::LOG_MODULE_META;
34use fedimint_meta_common::config::{
35 MetaClientConfig, MetaConfig, MetaConfigConsensus, MetaConfigPrivate,
36};
37use fedimint_meta_common::endpoint::{
38 GET_CONSENSUS_ENDPOINT, GET_CONSENSUS_REV_ENDPOINT, GET_SUBMISSIONS_ENDPOINT,
39 GetConsensusRequest, GetSubmissionResponse, GetSubmissionsRequest, SUBMIT_ENDPOINT,
40 SubmitRequest,
41};
42use fedimint_meta_common::{
43 DEFAULT_META_KEY, MODULE_CONSENSUS_VERSION, MetaCommonInit, MetaConsensusItem,
44 MetaConsensusValue, MetaInput, MetaInputError, MetaKey, MetaModuleTypes, MetaOutput,
45 MetaOutputError, MetaOutputOutcome, MetaValue,
46};
47use fedimint_server_core::config::PeerHandleOps;
48use fedimint_server_core::migration::ServerModuleDbMigrationFn;
49use fedimint_server_core::{
50 ConfigGenModuleArgs, ServerModule, ServerModuleInit, ServerModuleInitArgs,
51};
52use futures::StreamExt;
53use rand::{Rng, thread_rng};
54use strum::IntoEnumIterator;
55use tracing::{debug, info, trace};
56
57use crate::db::{
58 DbKeyPrefix, MetaConsensusKeyPrefix, MetaDesiredKeyPrefix, MetaSubmissionValue,
59 MetaSubmissionsKeyPrefix,
60};
61
62#[derive(Debug, Clone)]
64pub struct MetaInit;
65
66impl ModuleInit for MetaInit {
68 type Common = MetaCommonInit;
69
70 async fn dump_database(
72 &self,
73 dbtx: &mut DatabaseTransaction<'_>,
74 prefix_names: Vec<String>,
75 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
76 let mut items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
78 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
79 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
80 });
81
82 for table in filtered_prefixes {
83 match table {
84 DbKeyPrefix::Desired => {
85 push_db_pair_items!(
86 dbtx,
87 MetaDesiredKeyPrefix,
88 MetaDesiredKey,
89 MetaDesiredValue,
90 items,
91 "Meta Desired"
92 );
93 }
94 DbKeyPrefix::Consensus => {
95 push_db_pair_items!(
96 dbtx,
97 MetaConsensusKeyPrefix,
98 MetaConsensusKey,
99 MetaConsensusValue,
100 items,
101 "Meta Consensus"
102 );
103 }
104 DbKeyPrefix::Submissions => {
105 push_db_pair_items!(
106 dbtx,
107 MetaSubmissionsKeyPrefix,
108 MetaSubmissionsKey,
109 MetaSubmissionValue,
110 items,
111 "Meta Submissions"
112 );
113 }
114 }
115 }
116
117 Box::new(items.into_iter())
118 }
119}
120
121#[async_trait]
123impl ServerModuleInit for MetaInit {
124 type Module = Meta;
125
126 fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
128 &[MODULE_CONSENSUS_VERSION]
129 }
130
131 fn supported_api_versions(&self) -> SupportedModuleApiVersions {
132 SupportedModuleApiVersions::from_raw(
133 (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
134 (
135 MODULE_CONSENSUS_VERSION.major,
136 MODULE_CONSENSUS_VERSION.minor,
137 ),
138 &[(0, 0)],
139 )
140 }
141
142 async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
144 Ok(Meta {
145 cfg: args.cfg().to_typed()?,
146 our_peer_id: args.our_peer_id(),
147 num_peers: args.num_peers(),
148 db: args.db().clone(),
149 })
150 }
151
152 fn trusted_dealer_gen(
154 &self,
155 peers: &[PeerId],
156 _args: &ConfigGenModuleArgs,
157 ) -> BTreeMap<PeerId, ServerModuleConfig> {
158 peers
160 .iter()
161 .map(|&peer| {
162 let config = MetaConfig {
163 private: MetaConfigPrivate,
164 consensus: MetaConfigConsensus {},
165 };
166 (peer, config.to_erased())
167 })
168 .collect()
169 }
170
171 async fn distributed_gen(
173 &self,
174 _peers: &(dyn PeerHandleOps + Send + Sync),
175 _args: &ConfigGenModuleArgs,
176 ) -> anyhow::Result<ServerModuleConfig> {
177 Ok(MetaConfig {
178 private: MetaConfigPrivate,
179 consensus: MetaConfigConsensus {},
180 }
181 .to_erased())
182 }
183
184 fn get_client_config(
186 &self,
187 config: &ServerModuleConsensusConfig,
188 ) -> anyhow::Result<MetaClientConfig> {
189 let _config = MetaConfigConsensus::from_erased(config)?;
190 Ok(MetaClientConfig {})
191 }
192
193 fn validate_config(
194 &self,
195 _identity: &PeerId,
196 _config: ServerModuleConfig,
197 ) -> anyhow::Result<()> {
198 Ok(())
199 }
200
201 fn get_database_migrations(
203 &self,
204 ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Meta>> {
205 BTreeMap::new()
206 }
207}
208
209#[derive(Debug)]
211pub struct Meta {
212 pub cfg: MetaConfig,
213 pub our_peer_id: PeerId,
214 pub num_peers: NumPeers,
215 pub db: Database,
216}
217
218impl Meta {
219 async fn get_desired(dbtx: &mut DatabaseTransaction<'_>) -> Vec<(MetaKey, MetaDesiredValue)> {
220 dbtx.find_by_prefix(&MetaDesiredKeyPrefix)
221 .await
222 .map(|(k, v)| (k.0, v))
223 .collect()
224 .await
225 }
226
227 async fn get_submission(
228 dbtx: &mut DatabaseTransaction<'_>,
229 key: MetaKey,
230 peer_id: PeerId,
231 ) -> Option<MetaSubmissionValue> {
232 dbtx.get_value(&MetaSubmissionsKey { key, peer_id }).await
233 }
234
235 async fn get_consensus(dbtx: &mut DatabaseTransaction<'_>, key: MetaKey) -> Option<MetaValue> {
236 dbtx.get_value(&MetaConsensusKey(key))
237 .await
238 .map(|consensus_value| consensus_value.value)
239 }
240
241 async fn change_consensus(
242 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
243 key: MetaKey,
244 value: MetaValue,
245 matching_submissions: Vec<PeerId>,
246 ) {
247 let value_len = value.as_slice().len();
248 let revision = dbtx
249 .get_value(&MetaConsensusKey(key))
250 .await
251 .map(|cv| cv.revision);
252 let revision = revision.map(|r| r.wrapping_add(1)).unwrap_or_default();
253 dbtx.insert_entry(
254 &MetaConsensusKey(key),
255 &MetaConsensusValue { revision, value },
256 )
257 .await;
258
259 info!(target: LOG_MODULE_META, %key, rev = %revision, len = %value_len, "New consensus value");
260
261 for peer_id in matching_submissions {
262 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
263 .await;
264 }
265 }
266}
267
268#[async_trait]
270impl ServerModule for Meta {
271 type Common = MetaModuleTypes;
273 type Init = MetaInit;
274
275 async fn consensus_proposal(
280 &self,
281 dbtx: &mut DatabaseTransaction<'_>,
282 ) -> Vec<MetaConsensusItem> {
283 let desired: Vec<_> = Self::get_desired(dbtx).await;
284
285 let mut to_submit = vec![];
286
287 for (
288 key,
289 MetaDesiredValue {
290 value: desired_value,
291 salt,
292 },
293 ) in desired
294 {
295 let consensus_value = &Self::get_consensus(dbtx, key).await;
296 let consensus_submission_value =
297 Self::get_submission(dbtx, key, self.our_peer_id).await;
298 if consensus_submission_value.as_ref()
299 == Some(&MetaSubmissionValue {
300 value: desired_value.clone(),
301 salt,
302 })
303 {
304 } else if consensus_value.as_ref() == Some(&desired_value) {
306 if consensus_submission_value.is_none() {
307 } else {
311 to_submit.push(MetaConsensusItem {
315 key,
316 value: desired_value,
317 salt,
318 });
319 }
320 } else {
321 to_submit.push(MetaConsensusItem {
322 key,
323 value: desired_value,
324 salt,
325 });
326 }
327 }
328
329 trace!(target: LOG_MODULE_META, ?to_submit, "Desired actions");
330 to_submit
331 }
332
333 async fn process_consensus_item<'a, 'b>(
338 &'a self,
339 dbtx: &mut DatabaseTransaction<'b>,
340 MetaConsensusItem { key, value, salt }: MetaConsensusItem,
341 peer_id: PeerId,
342 ) -> anyhow::Result<()> {
343 trace!(target: LOG_MODULE_META, %key, %value, %salt, "Processing consensus item proposal");
344
345 let new_value = MetaSubmissionValue { salt, value };
346 if let Some(prev_value) = Self::get_submission(dbtx, key, peer_id).await
348 && prev_value != new_value
349 {
350 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
351 .await;
352 }
353 if Some(&new_value.value) == Self::get_consensus(dbtx, key).await.as_ref() {
355 debug!(target: LOG_MODULE_META, %peer_id, %key, "Peer submitted a redundant value");
356 return Ok(());
357 }
358
359 dbtx.insert_entry(&MetaSubmissionsKey { key, peer_id }, &new_value)
361 .await;
362
363 let matching_submissions: Vec<PeerId> = dbtx
365 .find_by_prefix(&MetaSubmissionsByKeyPrefix(key))
366 .await
367 .filter(|(_submission_key, submission_value)| {
368 future::ready(new_value.value == submission_value.value)
369 })
370 .map(|(submission_key, _)| submission_key.peer_id)
371 .collect()
372 .await;
373
374 let threshold = self.num_peers.threshold();
375 info!(target: LOG_MODULE_META,
376 %peer_id,
377 %key,
378 value_len = %new_value.value.as_slice().len(),
379 matching = %matching_submissions.len(),
380 %threshold, "Peer submitted a value");
381
382 if threshold <= matching_submissions.len() {
384 Self::change_consensus(dbtx, key, new_value.value, matching_submissions).await;
385 }
386
387 Ok(())
388 }
389
390 async fn process_input<'a, 'b, 'c>(
391 &'a self,
392 _dbtx: &mut DatabaseTransaction<'c>,
393 _input: &'b MetaInput,
394 _in_point: InPoint,
395 ) -> Result<InputMeta, MetaInputError> {
396 Err(MetaInputError::NotSupported)
397 }
398
399 async fn process_output<'a, 'b>(
400 &'a self,
401 _dbtx: &mut DatabaseTransaction<'b>,
402 _output: &'a MetaOutput,
403 _out_point: OutPoint,
404 ) -> Result<TransactionItemAmounts, MetaOutputError> {
405 Err(MetaOutputError::NotSupported)
406 }
407
408 async fn output_status(
409 &self,
410 _dbtx: &mut DatabaseTransaction<'_>,
411 _out_point: OutPoint,
412 ) -> Option<MetaOutputOutcome> {
413 None
414 }
415
416 async fn audit(
417 &self,
418 _dbtx: &mut DatabaseTransaction<'_>,
419 _audit: &mut Audit,
420 _module_instance_id: ModuleInstanceId,
421 ) {
422 }
423
424 fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
425 vec![
426 api_endpoint! {
427 SUBMIT_ENDPOINT,
428 ApiVersion::new(0, 0),
429 async |module: &Meta, context, request: SubmitRequest| -> () {
430
431 match context.request_auth() {
432 None => return Err(ApiError::bad_request("Missing password".to_string())),
433 Some(auth) => {
434 let db = context.db();
435 let mut dbtx = db.begin_transaction().await;
436 module.handle_submit_request(&mut dbtx.to_ref_nc(), &auth, &request).await?;
437 dbtx.commit_tx_result().await?;
438 }
439 }
440
441 Ok(())
442 }
443 },
444 api_endpoint! {
445 GET_CONSENSUS_ENDPOINT,
446 ApiVersion::new(0, 0),
447 async |module: &Meta, context, request: GetConsensusRequest| -> Option<MetaConsensusValue> {
448 let db = context.db();
449 let mut dbtx = db.begin_transaction_nc().await;
450 module.handle_get_consensus_request(&mut dbtx, &request).await
451 }
452 },
453 api_endpoint! {
454 GET_CONSENSUS_REV_ENDPOINT,
455 ApiVersion::new(0, 0),
456 async |module: &Meta, context, request: GetConsensusRequest| -> Option<u64> {
457 let db = context.db();
458 let mut dbtx = db.begin_transaction_nc().await;
459 module.handle_get_consensus_revision_request(&mut dbtx, &request).await
460 }
461 },
462 api_endpoint! {
463 GET_SUBMISSIONS_ENDPOINT,
464 ApiVersion::new(0, 0),
465 async |module: &Meta, context, request: GetSubmissionsRequest| -> GetSubmissionResponse {
466 match context.request_auth() {
467 None => return Err(ApiError::bad_request("Missing password".to_string())),
468 Some(auth) => {
469 let db = context.db();
470 let mut dbtx = db.begin_transaction_nc().await;
471 module.handle_get_submissions_request(&mut dbtx, &auth, &request).await
472 }
473 }
474 }
475 },
476 ]
477 }
478}
479
480impl Meta {
481 async fn handle_submit_request(
482 &self,
483 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
484 _auth: &ApiAuth,
485 req: &SubmitRequest,
486 ) -> Result<(), ApiError> {
487 let salt = thread_rng().r#gen();
488
489 info!(target: LOG_MODULE_META,
490 key = %req.key,
491 peer_id = %self.our_peer_id,
492 value_len = %req.value.as_slice().len(),
493 "Our own guardian submitted a value");
494
495 dbtx.insert_entry(
496 &MetaDesiredKey(req.key),
497 &MetaDesiredValue {
498 value: req.value.clone(),
499 salt,
500 },
501 )
502 .await;
503
504 Ok(())
505 }
506
507 async fn handle_get_consensus_request(
508 &self,
509 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
510 req: &GetConsensusRequest,
511 ) -> Result<Option<MetaConsensusValue>, ApiError> {
512 Ok(dbtx.get_value(&MetaConsensusKey(req.0)).await)
513 }
514
515 async fn handle_get_consensus_revision_request(
516 &self,
517 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
518 req: &GetConsensusRequest,
519 ) -> Result<Option<u64>, ApiError> {
520 Ok(dbtx
521 .get_value(&MetaConsensusKey(req.0))
522 .await
523 .map(|cv| cv.revision))
524 }
525
526 async fn handle_get_submissions_request(
527 &self,
528 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
529 _auth: &ApiAuth,
530 req: &GetSubmissionsRequest,
531 ) -> Result<BTreeMap<PeerId, MetaValue>, ApiError> {
532 Ok(dbtx
533 .find_by_prefix(&MetaSubmissionsByKeyPrefix(req.0))
534 .await
535 .collect::<Vec<_>>()
536 .await
537 .into_iter()
538 .map(|(k, v)| (k.peer_id, v.value))
539 .collect())
540 }
541}
542
543impl Meta {
546 pub async fn handle_submit_request_ui(&self, value: Value) -> Result<(), ApiError> {
548 let mut dbtx = self.db.begin_transaction().await;
549
550 self.handle_submit_request(
551 &mut dbtx.to_ref_nc(),
552 &ApiAuth(String::new()),
553 &SubmitRequest {
554 key: DEFAULT_META_KEY,
555 value: MetaValue::from(serde_json::to_vec(&value).unwrap().as_slice()),
556 },
557 )
558 .await?;
559
560 dbtx.commit_tx_result()
561 .await
562 .map_err(|e| ApiError::server_error(e.to_string()))?;
563
564 Ok(())
565 }
566
567 pub async fn handle_get_consensus_request_ui(&self) -> Result<Option<Value>, ApiError> {
569 self.handle_get_consensus_request(
570 &mut self.db.begin_transaction_nc().await,
571 &GetConsensusRequest(DEFAULT_META_KEY),
572 )
573 .await?
574 .map(|value| serde_json::from_slice(value.value.as_slice()))
575 .transpose()
576 .map_err(|e| ApiError::server_error(e.to_string()))
577 }
578
579 pub async fn handle_get_consensus_revision_request_ui(&self) -> Result<u64, ApiError> {
581 self.handle_get_consensus_revision_request(
582 &mut self.db.begin_transaction_nc().await,
583 &GetConsensusRequest(DEFAULT_META_KEY),
584 )
585 .await
586 .map(|r| r.unwrap_or(0))
587 }
588
589 pub async fn handle_get_submissions_request_ui(
591 &self,
592 ) -> Result<BTreeMap<PeerId, Value>, ApiError> {
593 let mut submissions = BTreeMap::new();
594
595 let mut dbtx = self.db.begin_transaction_nc().await;
596
597 for (peer_id, value) in self
598 .handle_get_submissions_request(
599 &mut dbtx.to_ref_nc(),
600 &ApiAuth(String::new()),
601 &GetSubmissionsRequest(DEFAULT_META_KEY),
602 )
603 .await?
604 {
605 if let Ok(value) = serde_json::from_slice(value.as_slice()) {
606 submissions.insert(peer_id, value);
607 }
608 }
609
610 Ok(submissions)
611 }
612}