1#![deny(clippy::pedantic)]
2#![allow(clippy::module_name_repetitions)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::missing_errors_doc)]
5
6pub use fedimint_meta_common as common;
7
8pub mod db;
9
10use std::collections::BTreeMap;
11use std::future;
12
13use async_trait::async_trait;
14use db::{
15 MetaConsensusKey, MetaDesiredKey, MetaDesiredValue, MetaSubmissionsByKeyPrefix,
16 MetaSubmissionsKey,
17};
18use fedimint_core::config::{
19 ServerModuleConfig, ServerModuleConsensusConfig, TypedServerModuleConfig,
20 TypedServerModuleConsensusConfig,
21};
22use fedimint_core::core::ModuleInstanceId;
23use fedimint_core::db::{
24 Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
25 NonCommittable,
26};
27use fedimint_core::module::audit::Audit;
28use fedimint_core::module::serde_json::Value;
29use fedimint_core::module::{
30 ApiAuth, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
31 InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
32 TransactionItemAmounts, api_endpoint, serde_json,
33};
34use fedimint_core::{InPoint, NumPeers, OutPoint, PeerId, push_db_pair_items};
35use fedimint_logging::LOG_MODULE_META;
36use fedimint_meta_common::config::{
37 MetaClientConfig, MetaConfig, MetaConfigConsensus, MetaConfigPrivate,
38};
39use fedimint_meta_common::endpoint::{
40 GET_CONSENSUS_ENDPOINT, GET_CONSENSUS_REV_ENDPOINT, GET_SUBMISSIONS_ENDPOINT,
41 GetConsensusRequest, GetSubmissionResponse, GetSubmissionsRequest, SUBMIT_ENDPOINT,
42 SubmitRequest,
43};
44use fedimint_meta_common::{
45 DEFAULT_META_KEY, MODULE_CONSENSUS_VERSION, MetaCommonInit, MetaConsensusItem,
46 MetaConsensusValue, MetaInput, MetaInputError, MetaKey, MetaModuleTypes, MetaOutput,
47 MetaOutputError, MetaOutputOutcome, MetaValue,
48};
49use fedimint_server_core::config::PeerHandleOps;
50use fedimint_server_core::migration::ServerModuleDbMigrationFn;
51use fedimint_server_core::{
52 ConfigGenModuleArgs, ServerModule, ServerModuleInit, ServerModuleInitArgs,
53};
54use futures::StreamExt;
55use rand::{Rng, thread_rng};
56use strum::IntoEnumIterator;
57use tracing::{debug, info, trace};
58
59use crate::db::{
60 DbKeyPrefix, MetaConsensusKeyPrefix, MetaDesiredKeyPrefix, MetaSubmissionValue,
61 MetaSubmissionsKeyPrefix,
62};
63
64#[derive(Debug, Clone)]
66pub struct MetaInit;
67
68impl ModuleInit for MetaInit {
70 type Common = MetaCommonInit;
71
72 async fn dump_database(
74 &self,
75 dbtx: &mut DatabaseTransaction<'_>,
76 prefix_names: Vec<String>,
77 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
78 let mut items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
80 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
81 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
82 });
83
84 for table in filtered_prefixes {
85 match table {
86 DbKeyPrefix::Desired => {
87 push_db_pair_items!(
88 dbtx,
89 MetaDesiredKeyPrefix,
90 MetaDesiredKey,
91 MetaDesiredValue,
92 items,
93 "Meta Desired"
94 );
95 }
96 DbKeyPrefix::Consensus => {
97 push_db_pair_items!(
98 dbtx,
99 MetaConsensusKeyPrefix,
100 MetaConsensusKey,
101 MetaConsensusValue,
102 items,
103 "Meta Consensus"
104 );
105 }
106 DbKeyPrefix::Submissions => {
107 push_db_pair_items!(
108 dbtx,
109 MetaSubmissionsKeyPrefix,
110 MetaSubmissionsKey,
111 MetaSubmissionValue,
112 items,
113 "Meta Submissions"
114 );
115 }
116 }
117 }
118
119 Box::new(items.into_iter())
120 }
121}
122
123#[async_trait]
125impl ServerModuleInit for MetaInit {
126 type Module = Meta;
127
128 fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
130 &[MODULE_CONSENSUS_VERSION]
131 }
132
133 fn supported_api_versions(&self) -> SupportedModuleApiVersions {
134 SupportedModuleApiVersions::from_raw(
135 (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
136 (
137 MODULE_CONSENSUS_VERSION.major,
138 MODULE_CONSENSUS_VERSION.minor,
139 ),
140 &[(0, 0)],
141 )
142 }
143
144 async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
146 Ok(Meta {
147 cfg: args.cfg().to_typed()?,
148 our_peer_id: args.our_peer_id(),
149 num_peers: args.num_peers(),
150 db: args.db().clone(),
151 })
152 }
153
154 fn trusted_dealer_gen(
156 &self,
157 peers: &[PeerId],
158 _args: &ConfigGenModuleArgs,
159 ) -> BTreeMap<PeerId, ServerModuleConfig> {
160 peers
162 .iter()
163 .map(|&peer| {
164 let config = MetaConfig {
165 private: MetaConfigPrivate,
166 consensus: MetaConfigConsensus {},
167 };
168 (peer, config.to_erased())
169 })
170 .collect()
171 }
172
173 async fn distributed_gen(
175 &self,
176 _peers: &(dyn PeerHandleOps + Send + Sync),
177 _args: &ConfigGenModuleArgs,
178 ) -> anyhow::Result<ServerModuleConfig> {
179 Ok(MetaConfig {
180 private: MetaConfigPrivate,
181 consensus: MetaConfigConsensus {},
182 }
183 .to_erased())
184 }
185
186 fn get_client_config(
188 &self,
189 config: &ServerModuleConsensusConfig,
190 ) -> anyhow::Result<MetaClientConfig> {
191 let _config = MetaConfigConsensus::from_erased(config)?;
192 Ok(MetaClientConfig {})
193 }
194
195 fn validate_config(
196 &self,
197 _identity: &PeerId,
198 _config: ServerModuleConfig,
199 ) -> anyhow::Result<()> {
200 Ok(())
201 }
202
203 fn get_database_migrations(
205 &self,
206 ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Meta>> {
207 BTreeMap::new()
208 }
209}
210
211#[derive(Debug)]
213pub struct Meta {
214 pub cfg: MetaConfig,
215 pub our_peer_id: PeerId,
216 pub num_peers: NumPeers,
217 pub db: Database,
218}
219
220impl Meta {
221 async fn get_desired(dbtx: &mut DatabaseTransaction<'_>) -> Vec<(MetaKey, MetaDesiredValue)> {
222 dbtx.find_by_prefix(&MetaDesiredKeyPrefix)
223 .await
224 .map(|(k, v)| (k.0, v))
225 .collect()
226 .await
227 }
228
229 async fn get_submission(
230 dbtx: &mut DatabaseTransaction<'_>,
231 key: MetaKey,
232 peer_id: PeerId,
233 ) -> Option<MetaSubmissionValue> {
234 dbtx.get_value(&MetaSubmissionsKey { key, peer_id }).await
235 }
236
237 async fn get_consensus(dbtx: &mut DatabaseTransaction<'_>, key: MetaKey) -> Option<MetaValue> {
238 dbtx.get_value(&MetaConsensusKey(key))
239 .await
240 .map(|consensus_value| consensus_value.value)
241 }
242
243 async fn change_consensus(
244 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
245 key: MetaKey,
246 value: MetaValue,
247 matching_submissions: Vec<PeerId>,
248 ) {
249 let value_len = value.as_slice().len();
250 let revision = dbtx
251 .get_value(&MetaConsensusKey(key))
252 .await
253 .map(|cv| cv.revision);
254 let revision = revision.map(|r| r.wrapping_add(1)).unwrap_or_default();
255 dbtx.insert_entry(
256 &MetaConsensusKey(key),
257 &MetaConsensusValue { revision, value },
258 )
259 .await;
260
261 info!(target: LOG_MODULE_META, %key, rev = %revision, len = %value_len, "New consensus value");
262
263 for peer_id in matching_submissions {
264 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
265 .await;
266 }
267 }
268}
269
270#[async_trait]
272impl ServerModule for Meta {
273 type Common = MetaModuleTypes;
275 type Init = MetaInit;
276
277 async fn consensus_proposal(
282 &self,
283 dbtx: &mut DatabaseTransaction<'_>,
284 ) -> Vec<MetaConsensusItem> {
285 let desired: Vec<_> = Self::get_desired(dbtx).await;
286
287 let mut to_submit = vec![];
288
289 for (
290 key,
291 MetaDesiredValue {
292 value: desired_value,
293 salt,
294 },
295 ) in desired
296 {
297 let consensus_value = &Self::get_consensus(dbtx, key).await;
298 let consensus_submission_value =
299 Self::get_submission(dbtx, key, self.our_peer_id).await;
300 if consensus_submission_value.as_ref()
301 == Some(&MetaSubmissionValue {
302 value: desired_value.clone(),
303 salt,
304 })
305 {
306 } else if consensus_value.as_ref() == Some(&desired_value) {
308 if consensus_submission_value.is_none() {
309 } else {
313 to_submit.push(MetaConsensusItem {
317 key,
318 value: desired_value,
319 salt,
320 });
321 }
322 } else {
323 to_submit.push(MetaConsensusItem {
324 key,
325 value: desired_value,
326 salt,
327 });
328 }
329 }
330
331 trace!(target: LOG_MODULE_META, ?to_submit, "Desired actions");
332 to_submit
333 }
334
335 async fn process_consensus_item<'a, 'b>(
340 &'a self,
341 dbtx: &mut DatabaseTransaction<'b>,
342 MetaConsensusItem { key, value, salt }: MetaConsensusItem,
343 peer_id: PeerId,
344 ) -> anyhow::Result<()> {
345 trace!(target: LOG_MODULE_META, %key, %value, %salt, "Processing consensus item proposal");
346
347 let new_value = MetaSubmissionValue { salt, value };
348 if let Some(prev_value) = Self::get_submission(dbtx, key, peer_id).await
350 && prev_value != new_value
351 {
352 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
353 .await;
354 }
355 if Some(&new_value.value) == Self::get_consensus(dbtx, key).await.as_ref() {
357 debug!(target: LOG_MODULE_META, %peer_id, %key, "Peer submitted a redundant value");
358 return Ok(());
359 }
360
361 dbtx.insert_entry(&MetaSubmissionsKey { key, peer_id }, &new_value)
363 .await;
364
365 let matching_submissions: Vec<PeerId> = dbtx
367 .find_by_prefix(&MetaSubmissionsByKeyPrefix(key))
368 .await
369 .filter(|(_submission_key, submission_value)| {
370 future::ready(new_value.value == submission_value.value)
371 })
372 .map(|(submission_key, _)| submission_key.peer_id)
373 .collect()
374 .await;
375
376 let threshold = self.num_peers.threshold();
377 info!(target: LOG_MODULE_META,
378 %peer_id,
379 %key,
380 value_len = %new_value.value.as_slice().len(),
381 matching = %matching_submissions.len(),
382 %threshold, "Peer submitted a value");
383
384 if threshold <= matching_submissions.len() {
386 Self::change_consensus(dbtx, key, new_value.value, matching_submissions).await;
387 }
388
389 Ok(())
390 }
391
392 async fn process_input<'a, 'b, 'c>(
393 &'a self,
394 _dbtx: &mut DatabaseTransaction<'c>,
395 _input: &'b MetaInput,
396 _in_point: InPoint,
397 ) -> Result<InputMeta, MetaInputError> {
398 Err(MetaInputError::NotSupported)
399 }
400
401 async fn process_output<'a, 'b>(
402 &'a self,
403 _dbtx: &mut DatabaseTransaction<'b>,
404 _output: &'a MetaOutput,
405 _out_point: OutPoint,
406 ) -> Result<TransactionItemAmounts, MetaOutputError> {
407 Err(MetaOutputError::NotSupported)
408 }
409
410 async fn output_status(
411 &self,
412 _dbtx: &mut DatabaseTransaction<'_>,
413 _out_point: OutPoint,
414 ) -> Option<MetaOutputOutcome> {
415 None
416 }
417
418 async fn audit(
419 &self,
420 _dbtx: &mut DatabaseTransaction<'_>,
421 _audit: &mut Audit,
422 _module_instance_id: ModuleInstanceId,
423 ) {
424 }
425
426 fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
427 vec![
428 api_endpoint! {
429 SUBMIT_ENDPOINT,
430 ApiVersion::new(0, 0),
431 async |module: &Meta, context, request: SubmitRequest| -> () {
432
433 match context.request_auth() {
434 None => return Err(ApiError::bad_request("Missing password".to_string())),
435 Some(auth) => {
436 let db = context.db();
437 let mut dbtx = db.begin_transaction().await;
438 module.handle_submit_request(&mut dbtx.to_ref_nc(), &auth, &request).await?;
439 dbtx.commit_tx_result().await?;
440 }
441 }
442
443 Ok(())
444 }
445 },
446 api_endpoint! {
447 GET_CONSENSUS_ENDPOINT,
448 ApiVersion::new(0, 0),
449 async |module: &Meta, context, request: GetConsensusRequest| -> Option<MetaConsensusValue> {
450 let db = context.db();
451 let mut dbtx = db.begin_transaction_nc().await;
452 module.handle_get_consensus_request(&mut dbtx, &request).await
453 }
454 },
455 api_endpoint! {
456 GET_CONSENSUS_REV_ENDPOINT,
457 ApiVersion::new(0, 0),
458 async |module: &Meta, context, request: GetConsensusRequest| -> Option<u64> {
459 let db = context.db();
460 let mut dbtx = db.begin_transaction_nc().await;
461 module.handle_get_consensus_revision_request(&mut dbtx, &request).await
462 }
463 },
464 api_endpoint! {
465 GET_SUBMISSIONS_ENDPOINT,
466 ApiVersion::new(0, 0),
467 async |module: &Meta, context, request: GetSubmissionsRequest| -> GetSubmissionResponse {
468 match context.request_auth() {
469 None => return Err(ApiError::bad_request("Missing password".to_string())),
470 Some(auth) => {
471 let db = context.db();
472 let mut dbtx = db.begin_transaction_nc().await;
473 module.handle_get_submissions_request(&mut dbtx, &auth, &request).await
474 }
475 }
476 }
477 },
478 ]
479 }
480}
481
482impl Meta {
483 async fn handle_submit_request(
484 &self,
485 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
486 _auth: &ApiAuth,
487 req: &SubmitRequest,
488 ) -> Result<(), ApiError> {
489 let salt = thread_rng().r#gen();
490
491 info!(target: LOG_MODULE_META,
492 key = %req.key,
493 peer_id = %self.our_peer_id,
494 value_len = %req.value.as_slice().len(),
495 "Our own guardian submitted a value");
496
497 dbtx.insert_entry(
498 &MetaDesiredKey(req.key),
499 &MetaDesiredValue {
500 value: req.value.clone(),
501 salt,
502 },
503 )
504 .await;
505
506 Ok(())
507 }
508
509 async fn handle_get_consensus_request(
510 &self,
511 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
512 req: &GetConsensusRequest,
513 ) -> Result<Option<MetaConsensusValue>, ApiError> {
514 Ok(dbtx.get_value(&MetaConsensusKey(req.0)).await)
515 }
516
517 async fn handle_get_consensus_revision_request(
518 &self,
519 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
520 req: &GetConsensusRequest,
521 ) -> Result<Option<u64>, ApiError> {
522 Ok(dbtx
523 .get_value(&MetaConsensusKey(req.0))
524 .await
525 .map(|cv| cv.revision))
526 }
527
528 async fn handle_get_submissions_request(
529 &self,
530 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
531 _auth: &ApiAuth,
532 req: &GetSubmissionsRequest,
533 ) -> Result<BTreeMap<PeerId, MetaValue>, ApiError> {
534 Ok(dbtx
535 .find_by_prefix(&MetaSubmissionsByKeyPrefix(req.0))
536 .await
537 .collect::<Vec<_>>()
538 .await
539 .into_iter()
540 .map(|(k, v)| (k.peer_id, v.value))
541 .collect())
542 }
543}
544
545impl Meta {
548 pub async fn handle_submit_request_ui(&self, value: Value) -> Result<(), ApiError> {
550 let mut dbtx = self.db.begin_transaction().await;
551
552 self.handle_submit_request(
553 &mut dbtx.to_ref_nc(),
554 &ApiAuth(String::new()),
555 &SubmitRequest {
556 key: DEFAULT_META_KEY,
557 value: MetaValue::from(serde_json::to_vec(&value).unwrap().as_slice()),
558 },
559 )
560 .await?;
561
562 dbtx.commit_tx_result()
563 .await
564 .map_err(|e| ApiError::server_error(e.to_string()))?;
565
566 Ok(())
567 }
568
569 pub async fn handle_get_consensus_request_ui(&self) -> Result<Option<Value>, ApiError> {
571 self.handle_get_consensus_request(
572 &mut self.db.begin_transaction_nc().await,
573 &GetConsensusRequest(DEFAULT_META_KEY),
574 )
575 .await?
576 .map(|value| serde_json::from_slice(value.value.as_slice()))
577 .transpose()
578 .map_err(|e| ApiError::server_error(e.to_string()))
579 }
580
581 pub async fn handle_get_consensus_revision_request_ui(&self) -> Result<u64, ApiError> {
583 self.handle_get_consensus_revision_request(
584 &mut self.db.begin_transaction_nc().await,
585 &GetConsensusRequest(DEFAULT_META_KEY),
586 )
587 .await
588 .map(|r| r.unwrap_or(0))
589 }
590
591 pub async fn handle_get_submissions_request_ui(
593 &self,
594 ) -> Result<BTreeMap<PeerId, Value>, ApiError> {
595 let mut submissions = BTreeMap::new();
596
597 let mut dbtx = self.db.begin_transaction_nc().await;
598
599 for (peer_id, value) in self
600 .handle_get_submissions_request(
601 &mut dbtx.to_ref_nc(),
602 &ApiAuth(String::new()),
603 &GetSubmissionsRequest(DEFAULT_META_KEY),
604 )
605 .await?
606 {
607 if let Ok(value) = serde_json::from_slice(value.as_slice()) {
608 submissions.insert(peer_id, value);
609 }
610 }
611
612 Ok(submissions)
613 }
614}