fedimint_meta_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::module_name_repetitions)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::missing_errors_doc)]
5
6pub mod db;
7
8use std::collections::BTreeMap;
9use std::future;
10
11use async_trait::async_trait;
12use db::{
13    MetaConsensusKey, MetaDesiredKey, MetaDesiredValue, MetaSubmissionsByKeyPrefix,
14    MetaSubmissionsKey,
15};
16use fedimint_core::config::{
17    ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
18    TypedServerModuleConfig, TypedServerModuleConsensusConfig,
19};
20use fedimint_core::core::ModuleInstanceId;
21use fedimint_core::db::{
22    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
23    NonCommittable,
24};
25use fedimint_core::module::audit::Audit;
26use fedimint_core::module::serde_json::Value;
27use fedimint_core::module::{
28    ApiAuth, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
29    InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
30    TransactionItemAmount, api_endpoint, serde_json,
31};
32use fedimint_core::{InPoint, NumPeers, OutPoint, PeerId, push_db_pair_items};
33use fedimint_logging::LOG_MODULE_META;
34use fedimint_meta_common::config::{
35    MetaClientConfig, MetaConfig, MetaConfigConsensus, MetaConfigPrivate,
36};
37pub use fedimint_meta_common::config::{MetaGenParams, MetaGenParamsConsensus, MetaGenParamsLocal};
38use fedimint_meta_common::endpoint::{
39    GET_CONSENSUS_ENDPOINT, GET_CONSENSUS_REV_ENDPOINT, GET_SUBMISSIONS_ENDPOINT,
40    GetConsensusRequest, GetSubmissionResponse, GetSubmissionsRequest, SUBMIT_ENDPOINT,
41    SubmitRequest,
42};
43use fedimint_meta_common::{
44    DEFAULT_META_KEY, MODULE_CONSENSUS_VERSION, MetaCommonInit, MetaConsensusItem,
45    MetaConsensusValue, MetaInput, MetaInputError, MetaKey, MetaModuleTypes, MetaOutput,
46    MetaOutputError, MetaOutputOutcome, MetaValue,
47};
48use fedimint_server_core::config::PeerHandleOps;
49use fedimint_server_core::migration::ServerModuleDbMigrationFn;
50use fedimint_server_core::{ServerModule, ServerModuleInit, ServerModuleInitArgs};
51use futures::StreamExt;
52use rand::{Rng, thread_rng};
53use strum::IntoEnumIterator;
54use tracing::{debug, info, trace};
55
56use crate::db::{
57    DbKeyPrefix, MetaConsensusKeyPrefix, MetaDesiredKeyPrefix, MetaSubmissionValue,
58    MetaSubmissionsKeyPrefix,
59};
60
61/// Generates the module
62#[derive(Debug, Clone)]
63pub struct MetaInit;
64
65// TODO: Boilerplate-code
66impl ModuleInit for MetaInit {
67    type Common = MetaCommonInit;
68
69    /// Dumps all database items for debugging
70    async fn dump_database(
71        &self,
72        dbtx: &mut DatabaseTransaction<'_>,
73        prefix_names: Vec<String>,
74    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
75        // TODO: Boilerplate-code
76        let mut items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
77        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
78            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
79        });
80
81        for table in filtered_prefixes {
82            match table {
83                DbKeyPrefix::Desired => {
84                    push_db_pair_items!(
85                        dbtx,
86                        MetaDesiredKeyPrefix,
87                        MetaDesiredKey,
88                        MetaDesiredValue,
89                        items,
90                        "Meta Desired"
91                    );
92                }
93                DbKeyPrefix::Consensus => {
94                    push_db_pair_items!(
95                        dbtx,
96                        MetaConsensusKeyPrefix,
97                        MetaConsensusKey,
98                        MetaConsensusValue,
99                        items,
100                        "Meta Consensus"
101                    );
102                }
103                DbKeyPrefix::Submissions => {
104                    push_db_pair_items!(
105                        dbtx,
106                        MetaSubmissionsKeyPrefix,
107                        MetaSubmissionsKey,
108                        MetaSubmissionValue,
109                        items,
110                        "Meta Submissions"
111                    );
112                }
113            }
114        }
115
116        Box::new(items.into_iter())
117    }
118}
119
120/// Implementation of server module non-consensus functions
121#[async_trait]
122impl ServerModuleInit for MetaInit {
123    type Module = Meta;
124    type Params = MetaGenParams;
125
126    /// Returns the version of this module
127    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
128        &[MODULE_CONSENSUS_VERSION]
129    }
130
131    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
132        SupportedModuleApiVersions::from_raw(
133            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
134            (
135                MODULE_CONSENSUS_VERSION.major,
136                MODULE_CONSENSUS_VERSION.minor,
137            ),
138            &[(0, 0)],
139        )
140    }
141
142    /// Initialize the module
143    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
144        Ok(Meta {
145            cfg: args.cfg().to_typed()?,
146            our_peer_id: args.our_peer_id(),
147            num_peers: args.num_peers(),
148            db: args.db().clone(),
149        })
150    }
151
152    /// Generates configs for all peers in a trusted manner for testing
153    fn trusted_dealer_gen(
154        &self,
155        peers: &[PeerId],
156        params: &ConfigGenModuleParams,
157    ) -> BTreeMap<PeerId, ServerModuleConfig> {
158        let _params = self.parse_params(params).unwrap();
159        // Generate a config for each peer
160        peers
161            .iter()
162            .map(|&peer| {
163                let config = MetaConfig {
164                    private: MetaConfigPrivate,
165                    consensus: MetaConfigConsensus {},
166                };
167                (peer, config.to_erased())
168            })
169            .collect()
170    }
171
172    /// Generates configs for all peers in an untrusted manner
173    async fn distributed_gen(
174        &self,
175        _peers: &(dyn PeerHandleOps + Send + Sync),
176        params: &ConfigGenModuleParams,
177    ) -> anyhow::Result<ServerModuleConfig> {
178        let _params = self.parse_params(params).unwrap();
179
180        Ok(MetaConfig {
181            private: MetaConfigPrivate,
182            consensus: MetaConfigConsensus {},
183        }
184        .to_erased())
185    }
186
187    /// Converts the consensus config into the client config
188    fn get_client_config(
189        &self,
190        config: &ServerModuleConsensusConfig,
191    ) -> anyhow::Result<MetaClientConfig> {
192        let _config = MetaConfigConsensus::from_erased(config)?;
193        Ok(MetaClientConfig {})
194    }
195
196    fn validate_config(
197        &self,
198        _identity: &PeerId,
199        _config: ServerModuleConfig,
200    ) -> anyhow::Result<()> {
201        Ok(())
202    }
203
204    /// DB migrations to move from old to newer versions
205    fn get_database_migrations(
206        &self,
207    ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Meta>> {
208        BTreeMap::new()
209    }
210}
211
212/// Meta module
213#[derive(Debug)]
214pub struct Meta {
215    pub cfg: MetaConfig,
216    pub our_peer_id: PeerId,
217    pub num_peers: NumPeers,
218    pub db: Database,
219}
220
221impl Meta {
222    async fn get_desired(dbtx: &mut DatabaseTransaction<'_>) -> Vec<(MetaKey, MetaDesiredValue)> {
223        dbtx.find_by_prefix(&MetaDesiredKeyPrefix)
224            .await
225            .map(|(k, v)| (k.0, v))
226            .collect()
227            .await
228    }
229
230    async fn get_submission(
231        dbtx: &mut DatabaseTransaction<'_>,
232        key: MetaKey,
233        peer_id: PeerId,
234    ) -> Option<MetaSubmissionValue> {
235        dbtx.get_value(&MetaSubmissionsKey { key, peer_id }).await
236    }
237
238    async fn get_consensus(dbtx: &mut DatabaseTransaction<'_>, key: MetaKey) -> Option<MetaValue> {
239        dbtx.get_value(&MetaConsensusKey(key))
240            .await
241            .map(|consensus_value| consensus_value.value)
242    }
243
244    async fn change_consensus(
245        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
246        key: MetaKey,
247        value: MetaValue,
248        matching_submissions: Vec<PeerId>,
249    ) {
250        let value_len = value.as_slice().len();
251        let revision = dbtx
252            .get_value(&MetaConsensusKey(key))
253            .await
254            .map(|cv| cv.revision);
255        let revision = revision.map(|r| r.wrapping_add(1)).unwrap_or_default();
256        dbtx.insert_entry(
257            &MetaConsensusKey(key),
258            &MetaConsensusValue { revision, value },
259        )
260        .await;
261
262        info!(target: LOG_MODULE_META, %key, rev = %revision, len = %value_len, "New consensus value");
263
264        for peer_id in matching_submissions {
265            dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
266                .await;
267        }
268    }
269}
270
271/// Implementation of consensus for the server module
272#[async_trait]
273impl ServerModule for Meta {
274    /// Define the consensus types
275    type Common = MetaModuleTypes;
276    type Init = MetaInit;
277
278    /// Check the difference between what's desired vs submitted and consensus.
279    ///
280    /// Returns:
281    /// Items to submit as our proposal.
282    async fn consensus_proposal(
283        &self,
284        dbtx: &mut DatabaseTransaction<'_>,
285    ) -> Vec<MetaConsensusItem> {
286        let desired: Vec<_> = Self::get_desired(dbtx).await;
287
288        let mut to_submit = vec![];
289
290        for (
291            key,
292            MetaDesiredValue {
293                value: desired_value,
294                salt,
295            },
296        ) in desired
297        {
298            let consensus_value = &Self::get_consensus(dbtx, key).await;
299            let consensus_submission_value =
300                Self::get_submission(dbtx, key, self.our_peer_id).await;
301            if consensus_submission_value.as_ref()
302                == Some(&MetaSubmissionValue {
303                    value: desired_value.clone(),
304                    salt,
305                })
306            {
307                // our submission is already registered, nothing to do
308            } else if consensus_value.as_ref() == Some(&desired_value) {
309                if consensus_submission_value.is_none() {
310                    // our desired value is equal to consensus and cleared our
311                    // submission (as it is equal the
312                    // consensus) so we don't need to propose it
313                } else {
314                    // we want to submit the same value as the current consensus, usually
315                    // to clear the previous submission that did not became the consensus (we were
316                    // outvoted)
317                    to_submit.push(MetaConsensusItem {
318                        key,
319                        value: desired_value,
320                        salt,
321                    });
322                }
323            } else {
324                to_submit.push(MetaConsensusItem {
325                    key,
326                    value: desired_value,
327                    salt,
328                });
329            }
330        }
331
332        trace!(target: LOG_MODULE_META, ?to_submit, "Desired actions");
333        to_submit
334    }
335
336    /// BUG: This implementation fails to return an `Err` on redundant consensus
337    /// items. If you are using this code as a template,
338    /// make sure to read the [`ServerModule::process_consensus_item`]
339    /// documentation,
340    async fn process_consensus_item<'a, 'b>(
341        &'a self,
342        dbtx: &mut DatabaseTransaction<'b>,
343        MetaConsensusItem { key, value, salt }: MetaConsensusItem,
344        peer_id: PeerId,
345    ) -> anyhow::Result<()> {
346        trace!(target: LOG_MODULE_META, %key, %value, %salt, "Processing consensus item proposal");
347
348        let new_value = MetaSubmissionValue { salt, value };
349        // first of all: any new submission overrides previous submission
350        if let Some(prev_value) = Self::get_submission(dbtx, key, peer_id).await {
351            if prev_value != new_value {
352                dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
353                    .await;
354            }
355        }
356        // then: if the submission is equal to the current consensus, it's ignored
357        if Some(&new_value.value) == Self::get_consensus(dbtx, key).await.as_ref() {
358            debug!(target: LOG_MODULE_META, %peer_id, %key, "Peer submitted a redundant value");
359            return Ok(());
360        }
361
362        // otherwise, new submission is recorded
363        dbtx.insert_entry(&MetaSubmissionsKey { key, peer_id }, &new_value)
364            .await;
365
366        // we check how many peers submitted the same value (including this peer)
367        let matching_submissions: Vec<PeerId> = dbtx
368            .find_by_prefix(&MetaSubmissionsByKeyPrefix(key))
369            .await
370            .filter(|(_submission_key, submission_value)| {
371                future::ready(new_value.value == submission_value.value)
372            })
373            .map(|(submission_key, _)| submission_key.peer_id)
374            .collect()
375            .await;
376
377        let threshold = self.num_peers.threshold();
378        info!(target: LOG_MODULE_META,
379             %peer_id,
380             %key,
381            value_len = %new_value.value.as_slice().len(),
382             matching = %matching_submissions.len(),
383            %threshold, "Peer submitted a value");
384
385        // if threshold or more, change the consensus value
386        if threshold <= matching_submissions.len() {
387            Self::change_consensus(dbtx, key, new_value.value, matching_submissions).await;
388        }
389
390        Ok(())
391    }
392
393    async fn process_input<'a, 'b, 'c>(
394        &'a self,
395        _dbtx: &mut DatabaseTransaction<'c>,
396        _input: &'b MetaInput,
397        _in_point: InPoint,
398    ) -> Result<InputMeta, MetaInputError> {
399        Err(MetaInputError::NotSupported)
400    }
401
402    async fn process_output<'a, 'b>(
403        &'a self,
404        _dbtx: &mut DatabaseTransaction<'b>,
405        _output: &'a MetaOutput,
406        _out_point: OutPoint,
407    ) -> Result<TransactionItemAmount, MetaOutputError> {
408        Err(MetaOutputError::NotSupported)
409    }
410
411    async fn output_status(
412        &self,
413        _dbtx: &mut DatabaseTransaction<'_>,
414        _out_point: OutPoint,
415    ) -> Option<MetaOutputOutcome> {
416        None
417    }
418
419    async fn audit(
420        &self,
421        _dbtx: &mut DatabaseTransaction<'_>,
422        _audit: &mut Audit,
423        _module_instance_id: ModuleInstanceId,
424    ) {
425    }
426
427    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
428        vec![
429            api_endpoint! {
430                SUBMIT_ENDPOINT,
431                ApiVersion::new(0, 0),
432                async |module: &Meta, context, request: SubmitRequest| -> () {
433
434                    match context.request_auth() {
435                        None => return Err(ApiError::bad_request("Missing password".to_string())),
436                        Some(auth) => {
437                            module.handle_submit_request(&mut context.dbtx(), &auth, &request).await?;
438                        }
439                    }
440
441                    Ok(())
442                }
443            },
444            api_endpoint! {
445                GET_CONSENSUS_ENDPOINT,
446                ApiVersion::new(0, 0),
447                async |module: &Meta, context, request: GetConsensusRequest| -> Option<MetaConsensusValue> {
448                    module.handle_get_consensus_request(&mut context.dbtx().into_nc(), &request).await
449                }
450            },
451            api_endpoint! {
452                GET_CONSENSUS_REV_ENDPOINT,
453                ApiVersion::new(0, 0),
454                async |module: &Meta, context, request: GetConsensusRequest| -> Option<u64> {
455                    module.handle_get_consensus_revision_request(&mut context.dbtx().into_nc(), &request).await
456                }
457            },
458            api_endpoint! {
459                GET_SUBMISSIONS_ENDPOINT,
460                ApiVersion::new(0, 0),
461                async |module: &Meta, context, request: GetSubmissionsRequest| -> GetSubmissionResponse {
462                    match context.request_auth() {
463                        None => return Err(ApiError::bad_request("Missing password".to_string())),
464                        Some(auth) => {
465                            module.handle_get_submissions_request(&mut context.dbtx().into_nc(),&auth, &request).await
466                        }
467                    }
468                }
469            },
470        ]
471    }
472}
473
474impl Meta {
475    async fn handle_submit_request(
476        &self,
477        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
478        _auth: &ApiAuth,
479        req: &SubmitRequest,
480    ) -> Result<(), ApiError> {
481        let salt = thread_rng().r#gen();
482
483        info!(target: LOG_MODULE_META,
484             key = %req.key,
485             peer_id = %self.our_peer_id,
486             value_len = %req.value.as_slice().len(),
487             "Our own guardian submitted a value");
488
489        dbtx.insert_entry(
490            &MetaDesiredKey(req.key),
491            &MetaDesiredValue {
492                value: req.value.clone(),
493                salt,
494            },
495        )
496        .await;
497
498        Ok(())
499    }
500
501    async fn handle_get_consensus_request(
502        &self,
503        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
504        req: &GetConsensusRequest,
505    ) -> Result<Option<MetaConsensusValue>, ApiError> {
506        Ok(dbtx.get_value(&MetaConsensusKey(req.0)).await)
507    }
508
509    async fn handle_get_consensus_revision_request(
510        &self,
511        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
512        req: &GetConsensusRequest,
513    ) -> Result<Option<u64>, ApiError> {
514        Ok(dbtx
515            .get_value(&MetaConsensusKey(req.0))
516            .await
517            .map(|cv| cv.revision))
518    }
519
520    async fn handle_get_submissions_request(
521        &self,
522        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
523        _auth: &ApiAuth,
524        req: &GetSubmissionsRequest,
525    ) -> Result<BTreeMap<PeerId, MetaValue>, ApiError> {
526        Ok(dbtx
527            .find_by_prefix(&MetaSubmissionsByKeyPrefix(req.0))
528            .await
529            .collect::<Vec<_>>()
530            .await
531            .into_iter()
532            .map(|(k, v)| (k.peer_id, v.value))
533            .collect())
534    }
535}
536
537// UI Methods for Meta Module
538
539impl Meta {
540    /// UI helper to submit a value change with default auth
541    pub async fn handle_submit_request_ui(&self, value: Value) -> Result<(), ApiError> {
542        let mut dbtx = self.db.begin_transaction().await;
543
544        self.handle_submit_request(
545            &mut dbtx.to_ref_nc(),
546            &ApiAuth(String::new()),
547            &SubmitRequest {
548                key: DEFAULT_META_KEY,
549                value: MetaValue::from(serde_json::to_vec(&value).unwrap().as_slice()),
550            },
551        )
552        .await?;
553
554        dbtx.commit_tx_result()
555            .await
556            .map_err(|e| ApiError::server_error(e.to_string()))?;
557
558        Ok(())
559    }
560
561    /// UI helper to get consensus data as a key-value map
562    pub async fn handle_get_consensus_request_ui(&self) -> Result<Option<Value>, ApiError> {
563        self.handle_get_consensus_request(
564            &mut self.db.begin_transaction_nc().await,
565            &GetConsensusRequest(DEFAULT_META_KEY),
566        )
567        .await?
568        .map(|value| serde_json::from_slice(value.value.as_slice()))
569        .transpose()
570        .map_err(|e| ApiError::server_error(e.to_string()))
571    }
572
573    /// UI helper to get consensus revision
574    pub async fn handle_get_consensus_revision_request_ui(&self) -> Result<u64, ApiError> {
575        self.handle_get_consensus_revision_request(
576            &mut self.db.begin_transaction_nc().await,
577            &GetConsensusRequest(DEFAULT_META_KEY),
578        )
579        .await
580        .map(|r| r.unwrap_or(0))
581    }
582
583    /// Get the submissions for UI display,
584    pub async fn handle_get_submissions_request_ui(
585        &self,
586    ) -> Result<BTreeMap<PeerId, Value>, ApiError> {
587        let mut submissions = BTreeMap::new();
588
589        let mut dbtx = self.db.begin_transaction_nc().await;
590
591        for (peer_id, value) in self
592            .handle_get_submissions_request(
593                &mut dbtx.to_ref_nc(),
594                &ApiAuth(String::new()),
595                &GetSubmissionsRequest(DEFAULT_META_KEY),
596            )
597            .await?
598        {
599            if let Ok(value) = serde_json::from_slice(value.as_slice()) {
600                submissions.insert(peer_id, value);
601            }
602        }
603
604        Ok(submissions)
605    }
606}