fedimint_meta_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::module_name_repetitions)]
3
4pub mod db;
5
6use std::collections::BTreeMap;
7use std::future;
8
9use async_trait::async_trait;
10use db::{
11    MetaConsensusKey, MetaDesiredKey, MetaDesiredValue, MetaSubmissionsByKeyPrefix,
12    MetaSubmissionsKey,
13};
14use fedimint_core::config::{
15    ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
16    TypedServerModuleConfig, TypedServerModuleConsensusConfig,
17};
18use fedimint_core::core::ModuleInstanceId;
19use fedimint_core::db::{
20    CoreMigrationFn, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
21    NonCommittable,
22};
23use fedimint_core::module::audit::Audit;
24use fedimint_core::module::{
25    ApiAuth, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
26    InputMeta, ModuleConsensusVersion, ModuleInit, PeerHandle, SupportedModuleApiVersions,
27    TransactionItemAmount, api_endpoint,
28};
29use fedimint_core::{InPoint, NumPeers, OutPoint, PeerId, push_db_pair_items};
30use fedimint_logging::LOG_MODULE_META;
31use fedimint_meta_common::config::{
32    MetaClientConfig, MetaConfig, MetaConfigConsensus, MetaConfigLocal, MetaConfigPrivate,
33};
34pub use fedimint_meta_common::config::{MetaGenParams, MetaGenParamsConsensus, MetaGenParamsLocal};
35use fedimint_meta_common::endpoint::{
36    GET_CONSENSUS_ENDPOINT, GET_CONSENSUS_REV_ENDPOINT, GET_SUBMISSIONS_ENDPOINT,
37    GetConsensusRequest, GetSubmissionResponse, GetSubmissionsRequest, SUBMIT_ENDPOINT,
38    SubmitRequest,
39};
40use fedimint_meta_common::{
41    MODULE_CONSENSUS_VERSION, MetaCommonInit, MetaConsensusItem, MetaConsensusValue, MetaInput,
42    MetaInputError, MetaKey, MetaModuleTypes, MetaOutput, MetaOutputError, MetaOutputOutcome,
43    MetaValue,
44};
45use fedimint_server::core::{
46    DynServerModule, ServerModule, ServerModuleInit, ServerModuleInitArgs,
47};
48use futures::StreamExt;
49use rand::{Rng, thread_rng};
50use strum::IntoEnumIterator;
51use tracing::{debug, info, trace};
52
53use crate::db::{
54    DbKeyPrefix, MetaConsensusKeyPrefix, MetaDesiredKeyPrefix, MetaSubmissionValue,
55    MetaSubmissionsKeyPrefix,
56};
57
58/// Generates the module
59#[derive(Debug, Clone)]
60pub struct MetaInit;
61
62// TODO: Boilerplate-code
63impl ModuleInit for MetaInit {
64    type Common = MetaCommonInit;
65
66    /// Dumps all database items for debugging
67    async fn dump_database(
68        &self,
69        dbtx: &mut DatabaseTransaction<'_>,
70        prefix_names: Vec<String>,
71    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
72        // TODO: Boilerplate-code
73        let mut items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
74        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
75            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
76        });
77
78        for table in filtered_prefixes {
79            match table {
80                DbKeyPrefix::Desired => {
81                    push_db_pair_items!(
82                        dbtx,
83                        MetaDesiredKeyPrefix,
84                        MetaDesiredKey,
85                        MetaDesiredValue,
86                        items,
87                        "Meta Desired"
88                    );
89                }
90                DbKeyPrefix::Consensus => {
91                    push_db_pair_items!(
92                        dbtx,
93                        MetaConsensusKeyPrefix,
94                        MetaConsensusKey,
95                        MetaConsensusValue,
96                        items,
97                        "Meta Consensus"
98                    );
99                }
100                DbKeyPrefix::Submissions => {
101                    push_db_pair_items!(
102                        dbtx,
103                        MetaSubmissionsKeyPrefix,
104                        MetaSubmissionsKey,
105                        MetaSubmissionValue,
106                        items,
107                        "Meta Submissions"
108                    );
109                }
110            }
111        }
112
113        Box::new(items.into_iter())
114    }
115}
116
117/// Implementation of server module non-consensus functions
118#[async_trait]
119impl ServerModuleInit for MetaInit {
120    type Params = MetaGenParams;
121
122    /// Returns the version of this module
123    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
124        &[MODULE_CONSENSUS_VERSION]
125    }
126
127    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
128        SupportedModuleApiVersions::from_raw(
129            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
130            (
131                MODULE_CONSENSUS_VERSION.major,
132                MODULE_CONSENSUS_VERSION.minor,
133            ),
134            &[(0, 0)],
135        )
136    }
137
138    /// Initialize the module
139    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<DynServerModule> {
140        Ok(Meta {
141            cfg: args.cfg().to_typed()?,
142            our_peer_id: args.our_peer_id(),
143            num_peers: args.num_peers(),
144        }
145        .into())
146    }
147
148    /// Generates configs for all peers in a trusted manner for testing
149    fn trusted_dealer_gen(
150        &self,
151        peers: &[PeerId],
152        params: &ConfigGenModuleParams,
153    ) -> BTreeMap<PeerId, ServerModuleConfig> {
154        let _params = self.parse_params(params).unwrap();
155        // Generate a config for each peer
156        peers
157            .iter()
158            .map(|&peer| {
159                let config = MetaConfig {
160                    local: MetaConfigLocal {},
161                    private: MetaConfigPrivate,
162                    consensus: MetaConfigConsensus {},
163                };
164                (peer, config.to_erased())
165            })
166            .collect()
167    }
168
169    /// Generates configs for all peers in an untrusted manner
170    async fn distributed_gen(
171        &self,
172        _peers: &PeerHandle,
173        params: &ConfigGenModuleParams,
174    ) -> anyhow::Result<ServerModuleConfig> {
175        let _params = self.parse_params(params).unwrap();
176
177        Ok(MetaConfig {
178            local: MetaConfigLocal {},
179            private: MetaConfigPrivate,
180            consensus: MetaConfigConsensus {},
181        }
182        .to_erased())
183    }
184
185    /// Converts the consensus config into the client config
186    fn get_client_config(
187        &self,
188        config: &ServerModuleConsensusConfig,
189    ) -> anyhow::Result<MetaClientConfig> {
190        let _config = MetaConfigConsensus::from_erased(config)?;
191        Ok(MetaClientConfig {})
192    }
193
194    fn validate_config(
195        &self,
196        _identity: &PeerId,
197        _config: ServerModuleConfig,
198    ) -> anyhow::Result<()> {
199        Ok(())
200    }
201
202    /// DB migrations to move from old to newer versions
203    fn get_database_migrations(&self) -> BTreeMap<DatabaseVersion, CoreMigrationFn> {
204        BTreeMap::new()
205    }
206}
207
208/// Meta module
209#[derive(Debug)]
210pub struct Meta {
211    pub cfg: MetaConfig,
212    pub our_peer_id: PeerId,
213    pub num_peers: NumPeers,
214}
215
216impl Meta {
217    async fn get_desired(dbtx: &mut DatabaseTransaction<'_>) -> Vec<(MetaKey, MetaDesiredValue)> {
218        dbtx.find_by_prefix(&MetaDesiredKeyPrefix)
219            .await
220            .map(|(k, v)| (k.0, v))
221            .collect()
222            .await
223    }
224
225    async fn get_submission(
226        dbtx: &mut DatabaseTransaction<'_>,
227        key: MetaKey,
228        peer_id: PeerId,
229    ) -> Option<MetaSubmissionValue> {
230        dbtx.get_value(&MetaSubmissionsKey { key, peer_id }).await
231    }
232
233    async fn get_consensus(dbtx: &mut DatabaseTransaction<'_>, key: MetaKey) -> Option<MetaValue> {
234        dbtx.get_value(&MetaConsensusKey(key))
235            .await
236            .map(|consensus_value| consensus_value.value)
237    }
238
239    async fn change_consensus(
240        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
241        key: MetaKey,
242        value: MetaValue,
243        matching_submissions: Vec<PeerId>,
244    ) {
245        let value_len = value.as_slice().len();
246        let revision = dbtx
247            .get_value(&MetaConsensusKey(key))
248            .await
249            .map(|cv| cv.revision);
250        let revision = revision.map(|r| r.wrapping_add(1)).unwrap_or_default();
251        dbtx.insert_entry(
252            &MetaConsensusKey(key),
253            &MetaConsensusValue { revision, value },
254        )
255        .await;
256
257        info!(target: LOG_MODULE_META, %key, rev = %revision, len = %value_len, "New consensus value");
258
259        for peer_id in matching_submissions {
260            dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
261                .await;
262        }
263    }
264}
265
266/// Implementation of consensus for the server module
267#[async_trait]
268impl ServerModule for Meta {
269    /// Define the consensus types
270    type Common = MetaModuleTypes;
271    type Init = MetaInit;
272
273    /// Check the difference between what's desired vs submitted and consensus.
274    ///
275    /// Returns:
276    /// Items to submit as our proposal.
277    async fn consensus_proposal(
278        &self,
279        dbtx: &mut DatabaseTransaction<'_>,
280    ) -> Vec<MetaConsensusItem> {
281        let desired: Vec<_> = Self::get_desired(dbtx).await;
282
283        let mut to_submit = vec![];
284
285        for (
286            key,
287            MetaDesiredValue {
288                value: desired_value,
289                salt,
290            },
291        ) in desired
292        {
293            let consensus_value = &Self::get_consensus(dbtx, key).await;
294            let consensus_submission_value =
295                Self::get_submission(dbtx, key, self.our_peer_id).await;
296            if consensus_submission_value.as_ref()
297                == Some(&MetaSubmissionValue {
298                    value: desired_value.clone(),
299                    salt,
300                })
301            {
302                // our submission is already registered, nothing to do
303            } else if consensus_value.as_ref() == Some(&desired_value) {
304                if consensus_submission_value.is_none() {
305                    // our desired value is equal to consensus and cleared our
306                    // submission (as it is equal the
307                    // consensus) so we don't need to propose it
308                } else {
309                    // we want to submit the same value as the current consensus, usually
310                    // to clear the previous submission that did not became the consensus (we were
311                    // outvoted)
312                    to_submit.push(MetaConsensusItem {
313                        key,
314                        value: desired_value,
315                        salt,
316                    });
317                }
318            } else {
319                to_submit.push(MetaConsensusItem {
320                    key,
321                    value: desired_value,
322                    salt,
323                });
324            }
325        }
326
327        trace!(target: LOG_MODULE_META, ?to_submit, "Desired actions");
328        to_submit
329    }
330
331    /// BUG: This implementation fails to return an `Err` on redundant consensus
332    /// items. If you are using this code as a template,
333    /// make sure to read the [`ServerModule::process_consensus_item`]
334    /// documentation,
335    async fn process_consensus_item<'a, 'b>(
336        &'a self,
337        dbtx: &mut DatabaseTransaction<'b>,
338        MetaConsensusItem { key, value, salt }: MetaConsensusItem,
339        peer_id: PeerId,
340    ) -> anyhow::Result<()> {
341        trace!(target: LOG_MODULE_META, %key, %value, %salt, "Processing consensus item proposal");
342
343        let new_value = MetaSubmissionValue { salt, value };
344        // first of all: any new submission overrides previous submission
345        if let Some(prev_value) = Self::get_submission(dbtx, key, peer_id).await {
346            if prev_value != new_value {
347                dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
348                    .await;
349            }
350        }
351        // then: if the submission is equal to the current consensus, it's ignored
352        if Some(&new_value.value) == Self::get_consensus(dbtx, key).await.as_ref() {
353            debug!(target: LOG_MODULE_META, %peer_id, %key, "Peer submitted a redundant value");
354            return Ok(());
355        }
356
357        // otherwise, new submission is recorded
358        dbtx.insert_entry(&MetaSubmissionsKey { key, peer_id }, &new_value)
359            .await;
360
361        // we check how many peers submitted the same value (including this peer)
362        let matching_submissions: Vec<PeerId> = dbtx
363            .find_by_prefix(&MetaSubmissionsByKeyPrefix(key))
364            .await
365            .filter(|(_submission_key, submission_value)| {
366                future::ready(new_value.value == submission_value.value)
367            })
368            .map(|(submission_key, _)| submission_key.peer_id)
369            .collect()
370            .await;
371
372        let threshold = self.num_peers.threshold();
373        info!(target: LOG_MODULE_META,
374             %peer_id,
375             %key,
376            value_len = %new_value.value.as_slice().len(),
377             matching = %matching_submissions.len(),
378            %threshold, "Peer submitted a value");
379
380        // if threshold or more, change the consensus value
381        if threshold <= matching_submissions.len() {
382            Self::change_consensus(dbtx, key, new_value.value, matching_submissions).await;
383        }
384
385        Ok(())
386    }
387
388    async fn process_input<'a, 'b, 'c>(
389        &'a self,
390        _dbtx: &mut DatabaseTransaction<'c>,
391        _input: &'b MetaInput,
392        _in_point: InPoint,
393    ) -> Result<InputMeta, MetaInputError> {
394        Err(MetaInputError::NotSupported)
395    }
396
397    async fn process_output<'a, 'b>(
398        &'a self,
399        _dbtx: &mut DatabaseTransaction<'b>,
400        _output: &'a MetaOutput,
401        _out_point: OutPoint,
402    ) -> Result<TransactionItemAmount, MetaOutputError> {
403        Err(MetaOutputError::NotSupported)
404    }
405
406    async fn output_status(
407        &self,
408        _dbtx: &mut DatabaseTransaction<'_>,
409        _out_point: OutPoint,
410    ) -> Option<MetaOutputOutcome> {
411        None
412    }
413
414    async fn audit(
415        &self,
416        _dbtx: &mut DatabaseTransaction<'_>,
417        _audit: &mut Audit,
418        _module_instance_id: ModuleInstanceId,
419    ) {
420    }
421
422    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
423        vec![
424            api_endpoint! {
425                SUBMIT_ENDPOINT,
426                ApiVersion::new(0, 0),
427                async |module: &Meta, context, request: SubmitRequest| -> () {
428
429                    match context.request_auth() {
430                        None => return Err(ApiError::bad_request("Missing password".to_string())),
431                        Some(auth) => {
432                            module.handle_submit_request(&mut context.dbtx(), &auth, &request).await?;
433                        }
434                    }
435
436                    Ok(())
437                }
438            },
439            api_endpoint! {
440                GET_CONSENSUS_ENDPOINT,
441                ApiVersion::new(0, 0),
442                async |module: &Meta, context, request: GetConsensusRequest| -> Option<MetaConsensusValue> {
443                    module.handle_get_consensus_request(&mut context.dbtx().into_nc(), &request).await
444                }
445            },
446            api_endpoint! {
447                GET_CONSENSUS_REV_ENDPOINT,
448                ApiVersion::new(0, 0),
449                async |module: &Meta, context, request: GetConsensusRequest| -> Option<u64> {
450                    module.handle_get_consensus_revision_request(&mut context.dbtx().into_nc(), &request).await
451                }
452            },
453            api_endpoint! {
454                GET_SUBMISSIONS_ENDPOINT,
455                ApiVersion::new(0, 0),
456                async |module: &Meta, context, request: GetSubmissionsRequest| -> GetSubmissionResponse {
457                    match context.request_auth() {
458                        None => return Err(ApiError::bad_request("Missing password".to_string())),
459                        Some(auth) => {
460                            module.handle_get_submissions_request(&mut context.dbtx().into_nc(),&auth, &request).await
461                        }
462                    }
463                }
464            },
465        ]
466    }
467}
468
469impl Meta {
470    async fn handle_submit_request(
471        &self,
472        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
473        _auth: &ApiAuth,
474        req: &SubmitRequest,
475    ) -> Result<(), ApiError> {
476        let salt = thread_rng().r#gen();
477
478        info!(target: LOG_MODULE_META,
479             key = %req.key,
480             peer_id = %self.our_peer_id,
481             value_len = %req.value.as_slice().len(),
482             "Our own guardian submitted a value");
483
484        dbtx.insert_entry(
485            &MetaDesiredKey(req.key),
486            &MetaDesiredValue {
487                value: req.value.clone(),
488                salt,
489            },
490        )
491        .await;
492
493        Ok(())
494    }
495
496    async fn handle_get_consensus_request(
497        &self,
498        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
499        req: &GetConsensusRequest,
500    ) -> Result<Option<MetaConsensusValue>, ApiError> {
501        Ok(dbtx.get_value(&MetaConsensusKey(req.0)).await)
502    }
503
504    async fn handle_get_consensus_revision_request(
505        &self,
506        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
507        req: &GetConsensusRequest,
508    ) -> Result<Option<u64>, ApiError> {
509        Ok(dbtx
510            .get_value(&MetaConsensusKey(req.0))
511            .await
512            .map(|cv| cv.revision))
513    }
514
515    async fn handle_get_submissions_request(
516        &self,
517        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
518        _auth: &ApiAuth,
519        req: &GetSubmissionsRequest,
520    ) -> Result<BTreeMap<PeerId, MetaValue>, ApiError> {
521        Ok(dbtx
522            .find_by_prefix(&MetaSubmissionsByKeyPrefix(req.0))
523            .await
524            .collect::<Vec<_>>()
525            .await
526            .into_iter()
527            .map(|(k, v)| (k.peer_id, v.value))
528            .collect())
529    }
530}