1#![deny(clippy::pedantic)]
2#![allow(clippy::module_name_repetitions)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::missing_errors_doc)]
5
6pub mod db;
7
8use std::collections::BTreeMap;
9use std::future;
10
11use async_trait::async_trait;
12use db::{
13 MetaConsensusKey, MetaDesiredKey, MetaDesiredValue, MetaSubmissionsByKeyPrefix,
14 MetaSubmissionsKey,
15};
16use fedimint_core::config::{
17 ServerModuleConfig, ServerModuleConsensusConfig, TypedServerModuleConfig,
18 TypedServerModuleConsensusConfig,
19};
20use fedimint_core::core::ModuleInstanceId;
21use fedimint_core::db::{
22 Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
23 NonCommittable,
24};
25use fedimint_core::module::audit::Audit;
26use fedimint_core::module::serde_json::Value;
27use fedimint_core::module::{
28 ApiAuth, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
29 InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
30 TransactionItemAmounts, api_endpoint, serde_json,
31};
32use fedimint_core::{InPoint, NumPeers, OutPoint, PeerId, push_db_pair_items};
33use fedimint_logging::LOG_MODULE_META;
34use fedimint_meta_common::config::{
35 MetaClientConfig, MetaConfig, MetaConfigConsensus, MetaConfigPrivate,
36};
37use fedimint_meta_common::endpoint::{
38 GET_CONSENSUS_ENDPOINT, GET_CONSENSUS_REV_ENDPOINT, GET_SUBMISSIONS_ENDPOINT,
39 GetConsensusRequest, GetSubmissionResponse, GetSubmissionsRequest, SUBMIT_ENDPOINT,
40 SubmitRequest,
41};
42use fedimint_meta_common::{
43 DEFAULT_META_KEY, MODULE_CONSENSUS_VERSION, MetaCommonInit, MetaConsensusItem,
44 MetaConsensusValue, MetaInput, MetaInputError, MetaKey, MetaModuleTypes, MetaOutput,
45 MetaOutputError, MetaOutputOutcome, MetaValue,
46};
47use fedimint_server_core::config::PeerHandleOps;
48use fedimint_server_core::migration::ServerModuleDbMigrationFn;
49use fedimint_server_core::{
50 ConfigGenModuleArgs, ServerModule, ServerModuleInit, ServerModuleInitArgs,
51};
52use futures::StreamExt;
53use rand::{Rng, thread_rng};
54use strum::IntoEnumIterator;
55use tracing::{debug, info, trace};
56
57use crate::db::{
58 DbKeyPrefix, MetaConsensusKeyPrefix, MetaDesiredKeyPrefix, MetaSubmissionValue,
59 MetaSubmissionsKeyPrefix,
60};
61
62#[derive(Debug, Clone)]
64pub struct MetaInit;
65
66impl ModuleInit for MetaInit {
68 type Common = MetaCommonInit;
69
70 async fn dump_database(
72 &self,
73 dbtx: &mut DatabaseTransaction<'_>,
74 prefix_names: Vec<String>,
75 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
76 let mut items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
78 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
79 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
80 });
81
82 for table in filtered_prefixes {
83 match table {
84 DbKeyPrefix::Desired => {
85 push_db_pair_items!(
86 dbtx,
87 MetaDesiredKeyPrefix,
88 MetaDesiredKey,
89 MetaDesiredValue,
90 items,
91 "Meta Desired"
92 );
93 }
94 DbKeyPrefix::Consensus => {
95 push_db_pair_items!(
96 dbtx,
97 MetaConsensusKeyPrefix,
98 MetaConsensusKey,
99 MetaConsensusValue,
100 items,
101 "Meta Consensus"
102 );
103 }
104 DbKeyPrefix::Submissions => {
105 push_db_pair_items!(
106 dbtx,
107 MetaSubmissionsKeyPrefix,
108 MetaSubmissionsKey,
109 MetaSubmissionValue,
110 items,
111 "Meta Submissions"
112 );
113 }
114 }
115 }
116
117 Box::new(items.into_iter())
118 }
119}
120
121#[async_trait]
123impl ServerModuleInit for MetaInit {
124 type Module = Meta;
125
126 fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
128 &[MODULE_CONSENSUS_VERSION]
129 }
130
131 fn supported_api_versions(&self) -> SupportedModuleApiVersions {
132 SupportedModuleApiVersions::from_raw(
133 (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
134 (
135 MODULE_CONSENSUS_VERSION.major,
136 MODULE_CONSENSUS_VERSION.minor,
137 ),
138 &[(0, 0)],
139 )
140 }
141
142 async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
144 Ok(Meta {
145 cfg: args.cfg().to_typed()?,
146 our_peer_id: args.our_peer_id(),
147 num_peers: args.num_peers(),
148 db: args.db().clone(),
149 })
150 }
151
152 fn trusted_dealer_gen(
154 &self,
155 peers: &[PeerId],
156 _args: &ConfigGenModuleArgs,
157 ) -> BTreeMap<PeerId, ServerModuleConfig> {
158 peers
160 .iter()
161 .map(|&peer| {
162 let config = MetaConfig {
163 private: MetaConfigPrivate,
164 consensus: MetaConfigConsensus {},
165 };
166 (peer, config.to_erased())
167 })
168 .collect()
169 }
170
171 async fn distributed_gen(
173 &self,
174 _peers: &(dyn PeerHandleOps + Send + Sync),
175 _args: &ConfigGenModuleArgs,
176 ) -> anyhow::Result<ServerModuleConfig> {
177 Ok(MetaConfig {
178 private: MetaConfigPrivate,
179 consensus: MetaConfigConsensus {},
180 }
181 .to_erased())
182 }
183
184 fn get_client_config(
186 &self,
187 config: &ServerModuleConsensusConfig,
188 ) -> anyhow::Result<MetaClientConfig> {
189 let _config = MetaConfigConsensus::from_erased(config)?;
190 Ok(MetaClientConfig {})
191 }
192
193 fn validate_config(
194 &self,
195 _identity: &PeerId,
196 _config: ServerModuleConfig,
197 ) -> anyhow::Result<()> {
198 Ok(())
199 }
200
201 fn get_database_migrations(
203 &self,
204 ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Meta>> {
205 BTreeMap::new()
206 }
207}
208
209#[derive(Debug)]
211pub struct Meta {
212 pub cfg: MetaConfig,
213 pub our_peer_id: PeerId,
214 pub num_peers: NumPeers,
215 pub db: Database,
216}
217
218impl Meta {
219 async fn get_desired(dbtx: &mut DatabaseTransaction<'_>) -> Vec<(MetaKey, MetaDesiredValue)> {
220 dbtx.find_by_prefix(&MetaDesiredKeyPrefix)
221 .await
222 .map(|(k, v)| (k.0, v))
223 .collect()
224 .await
225 }
226
227 async fn get_submission(
228 dbtx: &mut DatabaseTransaction<'_>,
229 key: MetaKey,
230 peer_id: PeerId,
231 ) -> Option<MetaSubmissionValue> {
232 dbtx.get_value(&MetaSubmissionsKey { key, peer_id }).await
233 }
234
235 async fn get_consensus(dbtx: &mut DatabaseTransaction<'_>, key: MetaKey) -> Option<MetaValue> {
236 dbtx.get_value(&MetaConsensusKey(key))
237 .await
238 .map(|consensus_value| consensus_value.value)
239 }
240
241 async fn change_consensus(
242 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
243 key: MetaKey,
244 value: MetaValue,
245 matching_submissions: Vec<PeerId>,
246 ) {
247 let value_len = value.as_slice().len();
248 let revision = dbtx
249 .get_value(&MetaConsensusKey(key))
250 .await
251 .map(|cv| cv.revision);
252 let revision = revision.map(|r| r.wrapping_add(1)).unwrap_or_default();
253 dbtx.insert_entry(
254 &MetaConsensusKey(key),
255 &MetaConsensusValue { revision, value },
256 )
257 .await;
258
259 info!(target: LOG_MODULE_META, %key, rev = %revision, len = %value_len, "New consensus value");
260
261 for peer_id in matching_submissions {
262 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
263 .await;
264 }
265 }
266}
267
268#[async_trait]
270impl ServerModule for Meta {
271 type Common = MetaModuleTypes;
273 type Init = MetaInit;
274
275 async fn consensus_proposal(
280 &self,
281 dbtx: &mut DatabaseTransaction<'_>,
282 ) -> Vec<MetaConsensusItem> {
283 let desired: Vec<_> = Self::get_desired(dbtx).await;
284
285 let mut to_submit = vec![];
286
287 for (
288 key,
289 MetaDesiredValue {
290 value: desired_value,
291 salt,
292 },
293 ) in desired
294 {
295 let consensus_value = &Self::get_consensus(dbtx, key).await;
296 let consensus_submission_value =
297 Self::get_submission(dbtx, key, self.our_peer_id).await;
298 if consensus_submission_value.as_ref()
299 == Some(&MetaSubmissionValue {
300 value: desired_value.clone(),
301 salt,
302 })
303 {
304 } else if consensus_value.as_ref() == Some(&desired_value) {
306 if consensus_submission_value.is_none() {
307 } else {
311 to_submit.push(MetaConsensusItem {
315 key,
316 value: desired_value,
317 salt,
318 });
319 }
320 } else {
321 to_submit.push(MetaConsensusItem {
322 key,
323 value: desired_value,
324 salt,
325 });
326 }
327 }
328
329 trace!(target: LOG_MODULE_META, ?to_submit, "Desired actions");
330 to_submit
331 }
332
333 async fn process_consensus_item<'a, 'b>(
338 &'a self,
339 dbtx: &mut DatabaseTransaction<'b>,
340 MetaConsensusItem { key, value, salt }: MetaConsensusItem,
341 peer_id: PeerId,
342 ) -> anyhow::Result<()> {
343 trace!(target: LOG_MODULE_META, %key, %value, %salt, "Processing consensus item proposal");
344
345 let new_value = MetaSubmissionValue { salt, value };
346 if let Some(prev_value) = Self::get_submission(dbtx, key, peer_id).await
348 && prev_value != new_value
349 {
350 dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
351 .await;
352 }
353 if Some(&new_value.value) == Self::get_consensus(dbtx, key).await.as_ref() {
355 debug!(target: LOG_MODULE_META, %peer_id, %key, "Peer submitted a redundant value");
356 return Ok(());
357 }
358
359 dbtx.insert_entry(&MetaSubmissionsKey { key, peer_id }, &new_value)
361 .await;
362
363 let matching_submissions: Vec<PeerId> = dbtx
365 .find_by_prefix(&MetaSubmissionsByKeyPrefix(key))
366 .await
367 .filter(|(_submission_key, submission_value)| {
368 future::ready(new_value.value == submission_value.value)
369 })
370 .map(|(submission_key, _)| submission_key.peer_id)
371 .collect()
372 .await;
373
374 let threshold = self.num_peers.threshold();
375 info!(target: LOG_MODULE_META,
376 %peer_id,
377 %key,
378 value_len = %new_value.value.as_slice().len(),
379 matching = %matching_submissions.len(),
380 %threshold, "Peer submitted a value");
381
382 if threshold <= matching_submissions.len() {
384 Self::change_consensus(dbtx, key, new_value.value, matching_submissions).await;
385 }
386
387 Ok(())
388 }
389
390 async fn process_input<'a, 'b, 'c>(
391 &'a self,
392 _dbtx: &mut DatabaseTransaction<'c>,
393 _input: &'b MetaInput,
394 _in_point: InPoint,
395 ) -> Result<InputMeta, MetaInputError> {
396 Err(MetaInputError::NotSupported)
397 }
398
399 async fn process_output<'a, 'b>(
400 &'a self,
401 _dbtx: &mut DatabaseTransaction<'b>,
402 _output: &'a MetaOutput,
403 _out_point: OutPoint,
404 ) -> Result<TransactionItemAmounts, MetaOutputError> {
405 Err(MetaOutputError::NotSupported)
406 }
407
408 async fn output_status(
409 &self,
410 _dbtx: &mut DatabaseTransaction<'_>,
411 _out_point: OutPoint,
412 ) -> Option<MetaOutputOutcome> {
413 None
414 }
415
416 async fn audit(
417 &self,
418 _dbtx: &mut DatabaseTransaction<'_>,
419 _audit: &mut Audit,
420 _module_instance_id: ModuleInstanceId,
421 ) {
422 }
423
424 fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
425 vec![
426 api_endpoint! {
427 SUBMIT_ENDPOINT,
428 ApiVersion::new(0, 0),
429 async |module: &Meta, context, request: SubmitRequest| -> () {
430
431 match context.request_auth() {
432 None => return Err(ApiError::bad_request("Missing password".to_string())),
433 Some(auth) => {
434 module.handle_submit_request(&mut context.dbtx(), &auth, &request).await?;
435 }
436 }
437
438 Ok(())
439 }
440 },
441 api_endpoint! {
442 GET_CONSENSUS_ENDPOINT,
443 ApiVersion::new(0, 0),
444 async |module: &Meta, context, request: GetConsensusRequest| -> Option<MetaConsensusValue> {
445 module.handle_get_consensus_request(&mut context.dbtx().into_nc(), &request).await
446 }
447 },
448 api_endpoint! {
449 GET_CONSENSUS_REV_ENDPOINT,
450 ApiVersion::new(0, 0),
451 async |module: &Meta, context, request: GetConsensusRequest| -> Option<u64> {
452 module.handle_get_consensus_revision_request(&mut context.dbtx().into_nc(), &request).await
453 }
454 },
455 api_endpoint! {
456 GET_SUBMISSIONS_ENDPOINT,
457 ApiVersion::new(0, 0),
458 async |module: &Meta, context, request: GetSubmissionsRequest| -> GetSubmissionResponse {
459 match context.request_auth() {
460 None => return Err(ApiError::bad_request("Missing password".to_string())),
461 Some(auth) => {
462 module.handle_get_submissions_request(&mut context.dbtx().into_nc(),&auth, &request).await
463 }
464 }
465 }
466 },
467 ]
468 }
469}
470
471impl Meta {
472 async fn handle_submit_request(
473 &self,
474 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
475 _auth: &ApiAuth,
476 req: &SubmitRequest,
477 ) -> Result<(), ApiError> {
478 let salt = thread_rng().r#gen();
479
480 info!(target: LOG_MODULE_META,
481 key = %req.key,
482 peer_id = %self.our_peer_id,
483 value_len = %req.value.as_slice().len(),
484 "Our own guardian submitted a value");
485
486 dbtx.insert_entry(
487 &MetaDesiredKey(req.key),
488 &MetaDesiredValue {
489 value: req.value.clone(),
490 salt,
491 },
492 )
493 .await;
494
495 Ok(())
496 }
497
498 async fn handle_get_consensus_request(
499 &self,
500 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
501 req: &GetConsensusRequest,
502 ) -> Result<Option<MetaConsensusValue>, ApiError> {
503 Ok(dbtx.get_value(&MetaConsensusKey(req.0)).await)
504 }
505
506 async fn handle_get_consensus_revision_request(
507 &self,
508 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
509 req: &GetConsensusRequest,
510 ) -> Result<Option<u64>, ApiError> {
511 Ok(dbtx
512 .get_value(&MetaConsensusKey(req.0))
513 .await
514 .map(|cv| cv.revision))
515 }
516
517 async fn handle_get_submissions_request(
518 &self,
519 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
520 _auth: &ApiAuth,
521 req: &GetSubmissionsRequest,
522 ) -> Result<BTreeMap<PeerId, MetaValue>, ApiError> {
523 Ok(dbtx
524 .find_by_prefix(&MetaSubmissionsByKeyPrefix(req.0))
525 .await
526 .collect::<Vec<_>>()
527 .await
528 .into_iter()
529 .map(|(k, v)| (k.peer_id, v.value))
530 .collect())
531 }
532}
533
534impl Meta {
537 pub async fn handle_submit_request_ui(&self, value: Value) -> Result<(), ApiError> {
539 let mut dbtx = self.db.begin_transaction().await;
540
541 self.handle_submit_request(
542 &mut dbtx.to_ref_nc(),
543 &ApiAuth(String::new()),
544 &SubmitRequest {
545 key: DEFAULT_META_KEY,
546 value: MetaValue::from(serde_json::to_vec(&value).unwrap().as_slice()),
547 },
548 )
549 .await?;
550
551 dbtx.commit_tx_result()
552 .await
553 .map_err(|e| ApiError::server_error(e.to_string()))?;
554
555 Ok(())
556 }
557
558 pub async fn handle_get_consensus_request_ui(&self) -> Result<Option<Value>, ApiError> {
560 self.handle_get_consensus_request(
561 &mut self.db.begin_transaction_nc().await,
562 &GetConsensusRequest(DEFAULT_META_KEY),
563 )
564 .await?
565 .map(|value| serde_json::from_slice(value.value.as_slice()))
566 .transpose()
567 .map_err(|e| ApiError::server_error(e.to_string()))
568 }
569
570 pub async fn handle_get_consensus_revision_request_ui(&self) -> Result<u64, ApiError> {
572 self.handle_get_consensus_revision_request(
573 &mut self.db.begin_transaction_nc().await,
574 &GetConsensusRequest(DEFAULT_META_KEY),
575 )
576 .await
577 .map(|r| r.unwrap_or(0))
578 }
579
580 pub async fn handle_get_submissions_request_ui(
582 &self,
583 ) -> Result<BTreeMap<PeerId, Value>, ApiError> {
584 let mut submissions = BTreeMap::new();
585
586 let mut dbtx = self.db.begin_transaction_nc().await;
587
588 for (peer_id, value) in self
589 .handle_get_submissions_request(
590 &mut dbtx.to_ref_nc(),
591 &ApiAuth(String::new()),
592 &GetSubmissionsRequest(DEFAULT_META_KEY),
593 )
594 .await?
595 {
596 if let Ok(value) = serde_json::from_slice(value.as_slice()) {
597 submissions.insert(peer_id, value);
598 }
599 }
600
601 Ok(submissions)
602 }
603}