fedimint_meta_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::module_name_repetitions)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::missing_errors_doc)]
5
6pub use fedimint_meta_common as common;
7
8pub mod db;
9
10use std::collections::BTreeMap;
11use std::future;
12
13use async_trait::async_trait;
14use db::{
15    MetaConsensusKey, MetaDesiredKey, MetaDesiredValue, MetaSubmissionsByKeyPrefix,
16    MetaSubmissionsKey,
17};
18use fedimint_core::config::{
19    ServerModuleConfig, ServerModuleConsensusConfig, TypedServerModuleConfig,
20    TypedServerModuleConsensusConfig,
21};
22use fedimint_core::core::ModuleInstanceId;
23use fedimint_core::db::{
24    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
25    NonCommittable,
26};
27use fedimint_core::module::audit::Audit;
28use fedimint_core::module::serde_json::Value;
29use fedimint_core::module::{
30    ApiAuth, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
31    InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
32    TransactionItemAmounts, api_endpoint, serde_json,
33};
34use fedimint_core::{InPoint, NumPeers, OutPoint, PeerId, push_db_pair_items};
35use fedimint_logging::LOG_MODULE_META;
36use fedimint_meta_common::config::{
37    MetaClientConfig, MetaConfig, MetaConfigConsensus, MetaConfigPrivate,
38};
39use fedimint_meta_common::endpoint::{
40    GET_CONSENSUS_ENDPOINT, GET_CONSENSUS_REV_ENDPOINT, GET_SUBMISSIONS_ENDPOINT,
41    GetConsensusRequest, GetSubmissionResponse, GetSubmissionsRequest, SUBMIT_ENDPOINT,
42    SubmitRequest,
43};
44use fedimint_meta_common::{
45    DEFAULT_META_KEY, MODULE_CONSENSUS_VERSION, MetaCommonInit, MetaConsensusItem,
46    MetaConsensusValue, MetaInput, MetaInputError, MetaKey, MetaModuleTypes, MetaOutput,
47    MetaOutputError, MetaOutputOutcome, MetaValue,
48};
49use fedimint_server_core::config::PeerHandleOps;
50use fedimint_server_core::migration::ServerModuleDbMigrationFn;
51use fedimint_server_core::{
52    ConfigGenModuleArgs, ServerModule, ServerModuleInit, ServerModuleInitArgs,
53};
54use futures::StreamExt;
55use rand::{Rng, thread_rng};
56use strum::IntoEnumIterator;
57use tracing::{debug, info, trace};
58
59use crate::db::{
60    DbKeyPrefix, MetaConsensusKeyPrefix, MetaDesiredKeyPrefix, MetaSubmissionValue,
61    MetaSubmissionsKeyPrefix,
62};
63
64/// Generates the module
65#[derive(Debug, Clone)]
66pub struct MetaInit;
67
68// TODO: Boilerplate-code
69impl ModuleInit for MetaInit {
70    type Common = MetaCommonInit;
71
72    /// Dumps all database items for debugging
73    async fn dump_database(
74        &self,
75        dbtx: &mut DatabaseTransaction<'_>,
76        prefix_names: Vec<String>,
77    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
78        // TODO: Boilerplate-code
79        let mut items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
80        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
81            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
82        });
83
84        for table in filtered_prefixes {
85            match table {
86                DbKeyPrefix::Desired => {
87                    push_db_pair_items!(
88                        dbtx,
89                        MetaDesiredKeyPrefix,
90                        MetaDesiredKey,
91                        MetaDesiredValue,
92                        items,
93                        "Meta Desired"
94                    );
95                }
96                DbKeyPrefix::Consensus => {
97                    push_db_pair_items!(
98                        dbtx,
99                        MetaConsensusKeyPrefix,
100                        MetaConsensusKey,
101                        MetaConsensusValue,
102                        items,
103                        "Meta Consensus"
104                    );
105                }
106                DbKeyPrefix::Submissions => {
107                    push_db_pair_items!(
108                        dbtx,
109                        MetaSubmissionsKeyPrefix,
110                        MetaSubmissionsKey,
111                        MetaSubmissionValue,
112                        items,
113                        "Meta Submissions"
114                    );
115                }
116            }
117        }
118
119        Box::new(items.into_iter())
120    }
121}
122
123/// Implementation of server module non-consensus functions
124#[async_trait]
125impl ServerModuleInit for MetaInit {
126    type Module = Meta;
127
128    /// Returns the version of this module
129    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
130        &[MODULE_CONSENSUS_VERSION]
131    }
132
133    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
134        SupportedModuleApiVersions::from_raw(
135            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
136            (
137                MODULE_CONSENSUS_VERSION.major,
138                MODULE_CONSENSUS_VERSION.minor,
139            ),
140            &[(0, 0)],
141        )
142    }
143
144    /// Initialize the module
145    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
146        Ok(Meta {
147            cfg: args.cfg().to_typed()?,
148            our_peer_id: args.our_peer_id(),
149            num_peers: args.num_peers(),
150            db: args.db().clone(),
151        })
152    }
153
154    /// Generates configs for all peers in a trusted manner for testing
155    fn trusted_dealer_gen(
156        &self,
157        peers: &[PeerId],
158        _args: &ConfigGenModuleArgs,
159    ) -> BTreeMap<PeerId, ServerModuleConfig> {
160        // Generate a config for each peer
161        peers
162            .iter()
163            .map(|&peer| {
164                let config = MetaConfig {
165                    private: MetaConfigPrivate,
166                    consensus: MetaConfigConsensus {},
167                };
168                (peer, config.to_erased())
169            })
170            .collect()
171    }
172
173    /// Generates configs for all peers in an untrusted manner
174    async fn distributed_gen(
175        &self,
176        _peers: &(dyn PeerHandleOps + Send + Sync),
177        _args: &ConfigGenModuleArgs,
178    ) -> anyhow::Result<ServerModuleConfig> {
179        Ok(MetaConfig {
180            private: MetaConfigPrivate,
181            consensus: MetaConfigConsensus {},
182        }
183        .to_erased())
184    }
185
186    /// Converts the consensus config into the client config
187    fn get_client_config(
188        &self,
189        config: &ServerModuleConsensusConfig,
190    ) -> anyhow::Result<MetaClientConfig> {
191        let _config = MetaConfigConsensus::from_erased(config)?;
192        Ok(MetaClientConfig {})
193    }
194
195    fn validate_config(
196        &self,
197        _identity: &PeerId,
198        _config: ServerModuleConfig,
199    ) -> anyhow::Result<()> {
200        Ok(())
201    }
202
203    /// DB migrations to move from old to newer versions
204    fn get_database_migrations(
205        &self,
206    ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Meta>> {
207        BTreeMap::new()
208    }
209}
210
211/// Meta module
212#[derive(Debug)]
213pub struct Meta {
214    pub cfg: MetaConfig,
215    pub our_peer_id: PeerId,
216    pub num_peers: NumPeers,
217    pub db: Database,
218}
219
220impl Meta {
221    async fn get_desired(dbtx: &mut DatabaseTransaction<'_>) -> Vec<(MetaKey, MetaDesiredValue)> {
222        dbtx.find_by_prefix(&MetaDesiredKeyPrefix)
223            .await
224            .map(|(k, v)| (k.0, v))
225            .collect()
226            .await
227    }
228
229    async fn get_submission(
230        dbtx: &mut DatabaseTransaction<'_>,
231        key: MetaKey,
232        peer_id: PeerId,
233    ) -> Option<MetaSubmissionValue> {
234        dbtx.get_value(&MetaSubmissionsKey { key, peer_id }).await
235    }
236
237    async fn get_consensus(dbtx: &mut DatabaseTransaction<'_>, key: MetaKey) -> Option<MetaValue> {
238        dbtx.get_value(&MetaConsensusKey(key))
239            .await
240            .map(|consensus_value| consensus_value.value)
241    }
242
243    async fn change_consensus(
244        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
245        key: MetaKey,
246        value: MetaValue,
247        matching_submissions: Vec<PeerId>,
248    ) {
249        let value_len = value.as_slice().len();
250        let revision = dbtx
251            .get_value(&MetaConsensusKey(key))
252            .await
253            .map(|cv| cv.revision);
254        let revision = revision.map(|r| r.wrapping_add(1)).unwrap_or_default();
255        dbtx.insert_entry(
256            &MetaConsensusKey(key),
257            &MetaConsensusValue { revision, value },
258        )
259        .await;
260
261        info!(target: LOG_MODULE_META, %key, rev = %revision, len = %value_len, "New consensus value");
262
263        for peer_id in matching_submissions {
264            dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
265                .await;
266        }
267    }
268}
269
270/// Implementation of consensus for the server module
271#[async_trait]
272impl ServerModule for Meta {
273    /// Define the consensus types
274    type Common = MetaModuleTypes;
275    type Init = MetaInit;
276
277    /// Check the difference between what's desired vs submitted and consensus.
278    ///
279    /// Returns:
280    /// Items to submit as our proposal.
281    async fn consensus_proposal(
282        &self,
283        dbtx: &mut DatabaseTransaction<'_>,
284    ) -> Vec<MetaConsensusItem> {
285        let desired: Vec<_> = Self::get_desired(dbtx).await;
286
287        let mut to_submit = vec![];
288
289        for (
290            key,
291            MetaDesiredValue {
292                value: desired_value,
293                salt,
294            },
295        ) in desired
296        {
297            let consensus_value = &Self::get_consensus(dbtx, key).await;
298            let consensus_submission_value =
299                Self::get_submission(dbtx, key, self.our_peer_id).await;
300            if consensus_submission_value.as_ref()
301                == Some(&MetaSubmissionValue {
302                    value: desired_value.clone(),
303                    salt,
304                })
305            {
306                // our submission is already registered, nothing to do
307            } else if consensus_value.as_ref() == Some(&desired_value) {
308                if consensus_submission_value.is_none() {
309                    // our desired value is equal to consensus and cleared our
310                    // submission (as it is equal the
311                    // consensus) so we don't need to propose it
312                } else {
313                    // we want to submit the same value as the current consensus, usually
314                    // to clear the previous submission that did not became the consensus (we were
315                    // outvoted)
316                    to_submit.push(MetaConsensusItem {
317                        key,
318                        value: desired_value,
319                        salt,
320                    });
321                }
322            } else {
323                to_submit.push(MetaConsensusItem {
324                    key,
325                    value: desired_value,
326                    salt,
327                });
328            }
329        }
330
331        trace!(target: LOG_MODULE_META, ?to_submit, "Desired actions");
332        to_submit
333    }
334
335    /// BUG: This implementation fails to return an `Err` on redundant consensus
336    /// items. If you are using this code as a template,
337    /// make sure to read the [`ServerModule::process_consensus_item`]
338    /// documentation,
339    async fn process_consensus_item<'a, 'b>(
340        &'a self,
341        dbtx: &mut DatabaseTransaction<'b>,
342        MetaConsensusItem { key, value, salt }: MetaConsensusItem,
343        peer_id: PeerId,
344    ) -> anyhow::Result<()> {
345        trace!(target: LOG_MODULE_META, %key, %value, %salt, "Processing consensus item proposal");
346
347        let new_value = MetaSubmissionValue { salt, value };
348        // first of all: any new submission overrides previous submission
349        if let Some(prev_value) = Self::get_submission(dbtx, key, peer_id).await
350            && prev_value != new_value
351        {
352            dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
353                .await;
354        }
355        // then: if the submission is equal to the current consensus, it's ignored
356        if Some(&new_value.value) == Self::get_consensus(dbtx, key).await.as_ref() {
357            debug!(target: LOG_MODULE_META, %peer_id, %key, "Peer submitted a redundant value");
358            return Ok(());
359        }
360
361        // otherwise, new submission is recorded
362        dbtx.insert_entry(&MetaSubmissionsKey { key, peer_id }, &new_value)
363            .await;
364
365        // we check how many peers submitted the same value (including this peer)
366        let matching_submissions: Vec<PeerId> = dbtx
367            .find_by_prefix(&MetaSubmissionsByKeyPrefix(key))
368            .await
369            .filter(|(_submission_key, submission_value)| {
370                future::ready(new_value.value == submission_value.value)
371            })
372            .map(|(submission_key, _)| submission_key.peer_id)
373            .collect()
374            .await;
375
376        let threshold = self.num_peers.threshold();
377        info!(target: LOG_MODULE_META,
378             %peer_id,
379             %key,
380            value_len = %new_value.value.as_slice().len(),
381             matching = %matching_submissions.len(),
382            %threshold, "Peer submitted a value");
383
384        // if threshold or more, change the consensus value
385        if threshold <= matching_submissions.len() {
386            Self::change_consensus(dbtx, key, new_value.value, matching_submissions).await;
387        }
388
389        Ok(())
390    }
391
392    async fn process_input<'a, 'b, 'c>(
393        &'a self,
394        _dbtx: &mut DatabaseTransaction<'c>,
395        _input: &'b MetaInput,
396        _in_point: InPoint,
397    ) -> Result<InputMeta, MetaInputError> {
398        Err(MetaInputError::NotSupported)
399    }
400
401    async fn process_output<'a, 'b>(
402        &'a self,
403        _dbtx: &mut DatabaseTransaction<'b>,
404        _output: &'a MetaOutput,
405        _out_point: OutPoint,
406    ) -> Result<TransactionItemAmounts, MetaOutputError> {
407        Err(MetaOutputError::NotSupported)
408    }
409
410    async fn output_status(
411        &self,
412        _dbtx: &mut DatabaseTransaction<'_>,
413        _out_point: OutPoint,
414    ) -> Option<MetaOutputOutcome> {
415        None
416    }
417
418    async fn audit(
419        &self,
420        _dbtx: &mut DatabaseTransaction<'_>,
421        _audit: &mut Audit,
422        _module_instance_id: ModuleInstanceId,
423    ) {
424    }
425
426    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
427        vec![
428            api_endpoint! {
429                SUBMIT_ENDPOINT,
430                ApiVersion::new(0, 0),
431                async |module: &Meta, context, request: SubmitRequest| -> () {
432
433                    match context.request_auth() {
434                        None => return Err(ApiError::bad_request("Missing password".to_string())),
435                        Some(auth) => {
436                            let db = context.db();
437                            let mut dbtx = db.begin_transaction().await;
438                            module.handle_submit_request(&mut dbtx.to_ref_nc(), &auth, &request).await?;
439                            dbtx.commit_tx_result().await?;
440                        }
441                    }
442
443                    Ok(())
444                }
445            },
446            api_endpoint! {
447                GET_CONSENSUS_ENDPOINT,
448                ApiVersion::new(0, 0),
449                async |module: &Meta, context, request: GetConsensusRequest| -> Option<MetaConsensusValue> {
450                    let db = context.db();
451                    let mut dbtx = db.begin_transaction_nc().await;
452                    module.handle_get_consensus_request(&mut dbtx, &request).await
453                }
454            },
455            api_endpoint! {
456                GET_CONSENSUS_REV_ENDPOINT,
457                ApiVersion::new(0, 0),
458                async |module: &Meta, context, request: GetConsensusRequest| -> Option<u64> {
459                    let db = context.db();
460                    let mut dbtx = db.begin_transaction_nc().await;
461                    module.handle_get_consensus_revision_request(&mut dbtx, &request).await
462                }
463            },
464            api_endpoint! {
465                GET_SUBMISSIONS_ENDPOINT,
466                ApiVersion::new(0, 0),
467                async |module: &Meta, context, request: GetSubmissionsRequest| -> GetSubmissionResponse {
468                    match context.request_auth() {
469                        None => return Err(ApiError::bad_request("Missing password".to_string())),
470                        Some(auth) => {
471                            let db = context.db();
472                            let mut dbtx = db.begin_transaction_nc().await;
473                            module.handle_get_submissions_request(&mut dbtx, &auth, &request).await
474                        }
475                    }
476                }
477            },
478        ]
479    }
480}
481
482impl Meta {
483    async fn handle_submit_request(
484        &self,
485        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
486        _auth: &ApiAuth,
487        req: &SubmitRequest,
488    ) -> Result<(), ApiError> {
489        let salt = thread_rng().r#gen();
490
491        info!(target: LOG_MODULE_META,
492             key = %req.key,
493             peer_id = %self.our_peer_id,
494             value_len = %req.value.as_slice().len(),
495             "Our own guardian submitted a value");
496
497        dbtx.insert_entry(
498            &MetaDesiredKey(req.key),
499            &MetaDesiredValue {
500                value: req.value.clone(),
501                salt,
502            },
503        )
504        .await;
505
506        Ok(())
507    }
508
509    async fn handle_get_consensus_request(
510        &self,
511        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
512        req: &GetConsensusRequest,
513    ) -> Result<Option<MetaConsensusValue>, ApiError> {
514        Ok(dbtx.get_value(&MetaConsensusKey(req.0)).await)
515    }
516
517    async fn handle_get_consensus_revision_request(
518        &self,
519        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
520        req: &GetConsensusRequest,
521    ) -> Result<Option<u64>, ApiError> {
522        Ok(dbtx
523            .get_value(&MetaConsensusKey(req.0))
524            .await
525            .map(|cv| cv.revision))
526    }
527
528    async fn handle_get_submissions_request(
529        &self,
530        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
531        _auth: &ApiAuth,
532        req: &GetSubmissionsRequest,
533    ) -> Result<BTreeMap<PeerId, MetaValue>, ApiError> {
534        Ok(dbtx
535            .find_by_prefix(&MetaSubmissionsByKeyPrefix(req.0))
536            .await
537            .collect::<Vec<_>>()
538            .await
539            .into_iter()
540            .map(|(k, v)| (k.peer_id, v.value))
541            .collect())
542    }
543}
544
545// UI Methods for Meta Module
546
547impl Meta {
548    /// UI helper to submit a value change with default auth
549    pub async fn handle_submit_request_ui(&self, value: Value) -> Result<(), ApiError> {
550        let mut dbtx = self.db.begin_transaction().await;
551
552        self.handle_submit_request(
553            &mut dbtx.to_ref_nc(),
554            &ApiAuth(String::new()),
555            &SubmitRequest {
556                key: DEFAULT_META_KEY,
557                value: MetaValue::from(serde_json::to_vec(&value).unwrap().as_slice()),
558            },
559        )
560        .await?;
561
562        dbtx.commit_tx_result()
563            .await
564            .map_err(|e| ApiError::server_error(e.to_string()))?;
565
566        Ok(())
567    }
568
569    /// UI helper to get consensus data as a key-value map
570    pub async fn handle_get_consensus_request_ui(&self) -> Result<Option<Value>, ApiError> {
571        self.handle_get_consensus_request(
572            &mut self.db.begin_transaction_nc().await,
573            &GetConsensusRequest(DEFAULT_META_KEY),
574        )
575        .await?
576        .map(|value| serde_json::from_slice(value.value.as_slice()))
577        .transpose()
578        .map_err(|e| ApiError::server_error(e.to_string()))
579    }
580
581    /// UI helper to get consensus revision
582    pub async fn handle_get_consensus_revision_request_ui(&self) -> Result<u64, ApiError> {
583        self.handle_get_consensus_revision_request(
584            &mut self.db.begin_transaction_nc().await,
585            &GetConsensusRequest(DEFAULT_META_KEY),
586        )
587        .await
588        .map(|r| r.unwrap_or(0))
589    }
590
591    /// Get the submissions for UI display,
592    pub async fn handle_get_submissions_request_ui(
593        &self,
594    ) -> Result<BTreeMap<PeerId, Value>, ApiError> {
595        let mut submissions = BTreeMap::new();
596
597        let mut dbtx = self.db.begin_transaction_nc().await;
598
599        for (peer_id, value) in self
600            .handle_get_submissions_request(
601                &mut dbtx.to_ref_nc(),
602                &ApiAuth(String::new()),
603                &GetSubmissionsRequest(DEFAULT_META_KEY),
604            )
605            .await?
606        {
607            if let Ok(value) = serde_json::from_slice(value.as_slice()) {
608                submissions.insert(peer_id, value);
609            }
610        }
611
612        Ok(submissions)
613    }
614}