fedimint_meta_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::module_name_repetitions)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::missing_errors_doc)]
5
6pub mod db;
7
8use std::collections::BTreeMap;
9use std::future;
10
11use async_trait::async_trait;
12use db::{
13    MetaConsensusKey, MetaDesiredKey, MetaDesiredValue, MetaSubmissionsByKeyPrefix,
14    MetaSubmissionsKey,
15};
16use fedimint_core::config::{
17    ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
18    TypedServerModuleConfig, TypedServerModuleConsensusConfig,
19};
20use fedimint_core::core::ModuleInstanceId;
21use fedimint_core::db::{
22    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
23    NonCommittable,
24};
25use fedimint_core::module::audit::Audit;
26use fedimint_core::module::serde_json::Value;
27use fedimint_core::module::{
28    ApiAuth, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
29    InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
30    TransactionItemAmount, api_endpoint, serde_json,
31};
32use fedimint_core::{InPoint, NumPeers, OutPoint, PeerId, push_db_pair_items};
33use fedimint_logging::LOG_MODULE_META;
34use fedimint_meta_common::config::{
35    MetaClientConfig, MetaConfig, MetaConfigConsensus, MetaConfigLocal, MetaConfigPrivate,
36};
37pub use fedimint_meta_common::config::{MetaGenParams, MetaGenParamsConsensus, MetaGenParamsLocal};
38use fedimint_meta_common::endpoint::{
39    GET_CONSENSUS_ENDPOINT, GET_CONSENSUS_REV_ENDPOINT, GET_SUBMISSIONS_ENDPOINT,
40    GetConsensusRequest, GetSubmissionResponse, GetSubmissionsRequest, SUBMIT_ENDPOINT,
41    SubmitRequest,
42};
43use fedimint_meta_common::{
44    DEFAULT_META_KEY, MODULE_CONSENSUS_VERSION, MetaCommonInit, MetaConsensusItem,
45    MetaConsensusValue, MetaInput, MetaInputError, MetaKey, MetaModuleTypes, MetaOutput,
46    MetaOutputError, MetaOutputOutcome, MetaValue,
47};
48use fedimint_server_core::config::PeerHandleOps;
49use fedimint_server_core::migration::ServerModuleDbMigrationFn;
50use fedimint_server_core::{ServerModule, ServerModuleInit, ServerModuleInitArgs};
51use futures::StreamExt;
52use rand::{Rng, thread_rng};
53use strum::IntoEnumIterator;
54use tracing::{debug, info, trace};
55
56use crate::db::{
57    DbKeyPrefix, MetaConsensusKeyPrefix, MetaDesiredKeyPrefix, MetaSubmissionValue,
58    MetaSubmissionsKeyPrefix,
59};
60
61/// Generates the module
62#[derive(Debug, Clone)]
63pub struct MetaInit;
64
65// TODO: Boilerplate-code
66impl ModuleInit for MetaInit {
67    type Common = MetaCommonInit;
68
69    /// Dumps all database items for debugging
70    async fn dump_database(
71        &self,
72        dbtx: &mut DatabaseTransaction<'_>,
73        prefix_names: Vec<String>,
74    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
75        // TODO: Boilerplate-code
76        let mut items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
77        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
78            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
79        });
80
81        for table in filtered_prefixes {
82            match table {
83                DbKeyPrefix::Desired => {
84                    push_db_pair_items!(
85                        dbtx,
86                        MetaDesiredKeyPrefix,
87                        MetaDesiredKey,
88                        MetaDesiredValue,
89                        items,
90                        "Meta Desired"
91                    );
92                }
93                DbKeyPrefix::Consensus => {
94                    push_db_pair_items!(
95                        dbtx,
96                        MetaConsensusKeyPrefix,
97                        MetaConsensusKey,
98                        MetaConsensusValue,
99                        items,
100                        "Meta Consensus"
101                    );
102                }
103                DbKeyPrefix::Submissions => {
104                    push_db_pair_items!(
105                        dbtx,
106                        MetaSubmissionsKeyPrefix,
107                        MetaSubmissionsKey,
108                        MetaSubmissionValue,
109                        items,
110                        "Meta Submissions"
111                    );
112                }
113            }
114        }
115
116        Box::new(items.into_iter())
117    }
118}
119
120/// Implementation of server module non-consensus functions
121#[async_trait]
122impl ServerModuleInit for MetaInit {
123    type Module = Meta;
124    type Params = MetaGenParams;
125
126    /// Returns the version of this module
127    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
128        &[MODULE_CONSENSUS_VERSION]
129    }
130
131    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
132        SupportedModuleApiVersions::from_raw(
133            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
134            (
135                MODULE_CONSENSUS_VERSION.major,
136                MODULE_CONSENSUS_VERSION.minor,
137            ),
138            &[(0, 0)],
139        )
140    }
141
142    /// Initialize the module
143    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
144        Ok(Meta {
145            cfg: args.cfg().to_typed()?,
146            our_peer_id: args.our_peer_id(),
147            num_peers: args.num_peers(),
148            db: args.db().clone(),
149        })
150    }
151
152    /// Generates configs for all peers in a trusted manner for testing
153    fn trusted_dealer_gen(
154        &self,
155        peers: &[PeerId],
156        params: &ConfigGenModuleParams,
157    ) -> BTreeMap<PeerId, ServerModuleConfig> {
158        let _params = self.parse_params(params).unwrap();
159        // Generate a config for each peer
160        peers
161            .iter()
162            .map(|&peer| {
163                let config = MetaConfig {
164                    local: MetaConfigLocal {},
165                    private: MetaConfigPrivate,
166                    consensus: MetaConfigConsensus {},
167                };
168                (peer, config.to_erased())
169            })
170            .collect()
171    }
172
173    /// Generates configs for all peers in an untrusted manner
174    async fn distributed_gen(
175        &self,
176        _peers: &(dyn PeerHandleOps + Send + Sync),
177        params: &ConfigGenModuleParams,
178    ) -> anyhow::Result<ServerModuleConfig> {
179        let _params = self.parse_params(params).unwrap();
180
181        Ok(MetaConfig {
182            local: MetaConfigLocal {},
183            private: MetaConfigPrivate,
184            consensus: MetaConfigConsensus {},
185        }
186        .to_erased())
187    }
188
189    /// Converts the consensus config into the client config
190    fn get_client_config(
191        &self,
192        config: &ServerModuleConsensusConfig,
193    ) -> anyhow::Result<MetaClientConfig> {
194        let _config = MetaConfigConsensus::from_erased(config)?;
195        Ok(MetaClientConfig {})
196    }
197
198    fn validate_config(
199        &self,
200        _identity: &PeerId,
201        _config: ServerModuleConfig,
202    ) -> anyhow::Result<()> {
203        Ok(())
204    }
205
206    /// DB migrations to move from old to newer versions
207    fn get_database_migrations(
208        &self,
209    ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Meta>> {
210        BTreeMap::new()
211    }
212}
213
214/// Meta module
215#[derive(Debug)]
216pub struct Meta {
217    pub cfg: MetaConfig,
218    pub our_peer_id: PeerId,
219    pub num_peers: NumPeers,
220    pub db: Database,
221}
222
223impl Meta {
224    async fn get_desired(dbtx: &mut DatabaseTransaction<'_>) -> Vec<(MetaKey, MetaDesiredValue)> {
225        dbtx.find_by_prefix(&MetaDesiredKeyPrefix)
226            .await
227            .map(|(k, v)| (k.0, v))
228            .collect()
229            .await
230    }
231
232    async fn get_submission(
233        dbtx: &mut DatabaseTransaction<'_>,
234        key: MetaKey,
235        peer_id: PeerId,
236    ) -> Option<MetaSubmissionValue> {
237        dbtx.get_value(&MetaSubmissionsKey { key, peer_id }).await
238    }
239
240    async fn get_consensus(dbtx: &mut DatabaseTransaction<'_>, key: MetaKey) -> Option<MetaValue> {
241        dbtx.get_value(&MetaConsensusKey(key))
242            .await
243            .map(|consensus_value| consensus_value.value)
244    }
245
246    async fn change_consensus(
247        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
248        key: MetaKey,
249        value: MetaValue,
250        matching_submissions: Vec<PeerId>,
251    ) {
252        let value_len = value.as_slice().len();
253        let revision = dbtx
254            .get_value(&MetaConsensusKey(key))
255            .await
256            .map(|cv| cv.revision);
257        let revision = revision.map(|r| r.wrapping_add(1)).unwrap_or_default();
258        dbtx.insert_entry(
259            &MetaConsensusKey(key),
260            &MetaConsensusValue { revision, value },
261        )
262        .await;
263
264        info!(target: LOG_MODULE_META, %key, rev = %revision, len = %value_len, "New consensus value");
265
266        for peer_id in matching_submissions {
267            dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
268                .await;
269        }
270    }
271}
272
273/// Implementation of consensus for the server module
274#[async_trait]
275impl ServerModule for Meta {
276    /// Define the consensus types
277    type Common = MetaModuleTypes;
278    type Init = MetaInit;
279
280    /// Check the difference between what's desired vs submitted and consensus.
281    ///
282    /// Returns:
283    /// Items to submit as our proposal.
284    async fn consensus_proposal(
285        &self,
286        dbtx: &mut DatabaseTransaction<'_>,
287    ) -> Vec<MetaConsensusItem> {
288        let desired: Vec<_> = Self::get_desired(dbtx).await;
289
290        let mut to_submit = vec![];
291
292        for (
293            key,
294            MetaDesiredValue {
295                value: desired_value,
296                salt,
297            },
298        ) in desired
299        {
300            let consensus_value = &Self::get_consensus(dbtx, key).await;
301            let consensus_submission_value =
302                Self::get_submission(dbtx, key, self.our_peer_id).await;
303            if consensus_submission_value.as_ref()
304                == Some(&MetaSubmissionValue {
305                    value: desired_value.clone(),
306                    salt,
307                })
308            {
309                // our submission is already registered, nothing to do
310            } else if consensus_value.as_ref() == Some(&desired_value) {
311                if consensus_submission_value.is_none() {
312                    // our desired value is equal to consensus and cleared our
313                    // submission (as it is equal the
314                    // consensus) so we don't need to propose it
315                } else {
316                    // we want to submit the same value as the current consensus, usually
317                    // to clear the previous submission that did not became the consensus (we were
318                    // outvoted)
319                    to_submit.push(MetaConsensusItem {
320                        key,
321                        value: desired_value,
322                        salt,
323                    });
324                }
325            } else {
326                to_submit.push(MetaConsensusItem {
327                    key,
328                    value: desired_value,
329                    salt,
330                });
331            }
332        }
333
334        trace!(target: LOG_MODULE_META, ?to_submit, "Desired actions");
335        to_submit
336    }
337
338    /// BUG: This implementation fails to return an `Err` on redundant consensus
339    /// items. If you are using this code as a template,
340    /// make sure to read the [`ServerModule::process_consensus_item`]
341    /// documentation,
342    async fn process_consensus_item<'a, 'b>(
343        &'a self,
344        dbtx: &mut DatabaseTransaction<'b>,
345        MetaConsensusItem { key, value, salt }: MetaConsensusItem,
346        peer_id: PeerId,
347    ) -> anyhow::Result<()> {
348        trace!(target: LOG_MODULE_META, %key, %value, %salt, "Processing consensus item proposal");
349
350        let new_value = MetaSubmissionValue { salt, value };
351        // first of all: any new submission overrides previous submission
352        if let Some(prev_value) = Self::get_submission(dbtx, key, peer_id).await {
353            if prev_value != new_value {
354                dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
355                    .await;
356            }
357        }
358        // then: if the submission is equal to the current consensus, it's ignored
359        if Some(&new_value.value) == Self::get_consensus(dbtx, key).await.as_ref() {
360            debug!(target: LOG_MODULE_META, %peer_id, %key, "Peer submitted a redundant value");
361            return Ok(());
362        }
363
364        // otherwise, new submission is recorded
365        dbtx.insert_entry(&MetaSubmissionsKey { key, peer_id }, &new_value)
366            .await;
367
368        // we check how many peers submitted the same value (including this peer)
369        let matching_submissions: Vec<PeerId> = dbtx
370            .find_by_prefix(&MetaSubmissionsByKeyPrefix(key))
371            .await
372            .filter(|(_submission_key, submission_value)| {
373                future::ready(new_value.value == submission_value.value)
374            })
375            .map(|(submission_key, _)| submission_key.peer_id)
376            .collect()
377            .await;
378
379        let threshold = self.num_peers.threshold();
380        info!(target: LOG_MODULE_META,
381             %peer_id,
382             %key,
383            value_len = %new_value.value.as_slice().len(),
384             matching = %matching_submissions.len(),
385            %threshold, "Peer submitted a value");
386
387        // if threshold or more, change the consensus value
388        if threshold <= matching_submissions.len() {
389            Self::change_consensus(dbtx, key, new_value.value, matching_submissions).await;
390        }
391
392        Ok(())
393    }
394
395    async fn process_input<'a, 'b, 'c>(
396        &'a self,
397        _dbtx: &mut DatabaseTransaction<'c>,
398        _input: &'b MetaInput,
399        _in_point: InPoint,
400    ) -> Result<InputMeta, MetaInputError> {
401        Err(MetaInputError::NotSupported)
402    }
403
404    async fn process_output<'a, 'b>(
405        &'a self,
406        _dbtx: &mut DatabaseTransaction<'b>,
407        _output: &'a MetaOutput,
408        _out_point: OutPoint,
409    ) -> Result<TransactionItemAmount, MetaOutputError> {
410        Err(MetaOutputError::NotSupported)
411    }
412
413    async fn output_status(
414        &self,
415        _dbtx: &mut DatabaseTransaction<'_>,
416        _out_point: OutPoint,
417    ) -> Option<MetaOutputOutcome> {
418        None
419    }
420
421    async fn audit(
422        &self,
423        _dbtx: &mut DatabaseTransaction<'_>,
424        _audit: &mut Audit,
425        _module_instance_id: ModuleInstanceId,
426    ) {
427    }
428
429    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
430        vec![
431            api_endpoint! {
432                SUBMIT_ENDPOINT,
433                ApiVersion::new(0, 0),
434                async |module: &Meta, context, request: SubmitRequest| -> () {
435
436                    match context.request_auth() {
437                        None => return Err(ApiError::bad_request("Missing password".to_string())),
438                        Some(auth) => {
439                            module.handle_submit_request(&mut context.dbtx(), &auth, &request).await?;
440                        }
441                    }
442
443                    Ok(())
444                }
445            },
446            api_endpoint! {
447                GET_CONSENSUS_ENDPOINT,
448                ApiVersion::new(0, 0),
449                async |module: &Meta, context, request: GetConsensusRequest| -> Option<MetaConsensusValue> {
450                    module.handle_get_consensus_request(&mut context.dbtx().into_nc(), &request).await
451                }
452            },
453            api_endpoint! {
454                GET_CONSENSUS_REV_ENDPOINT,
455                ApiVersion::new(0, 0),
456                async |module: &Meta, context, request: GetConsensusRequest| -> Option<u64> {
457                    module.handle_get_consensus_revision_request(&mut context.dbtx().into_nc(), &request).await
458                }
459            },
460            api_endpoint! {
461                GET_SUBMISSIONS_ENDPOINT,
462                ApiVersion::new(0, 0),
463                async |module: &Meta, context, request: GetSubmissionsRequest| -> GetSubmissionResponse {
464                    match context.request_auth() {
465                        None => return Err(ApiError::bad_request("Missing password".to_string())),
466                        Some(auth) => {
467                            module.handle_get_submissions_request(&mut context.dbtx().into_nc(),&auth, &request).await
468                        }
469                    }
470                }
471            },
472        ]
473    }
474}
475
476impl Meta {
477    async fn handle_submit_request(
478        &self,
479        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
480        _auth: &ApiAuth,
481        req: &SubmitRequest,
482    ) -> Result<(), ApiError> {
483        let salt = thread_rng().r#gen();
484
485        info!(target: LOG_MODULE_META,
486             key = %req.key,
487             peer_id = %self.our_peer_id,
488             value_len = %req.value.as_slice().len(),
489             "Our own guardian submitted a value");
490
491        dbtx.insert_entry(
492            &MetaDesiredKey(req.key),
493            &MetaDesiredValue {
494                value: req.value.clone(),
495                salt,
496            },
497        )
498        .await;
499
500        Ok(())
501    }
502
503    async fn handle_get_consensus_request(
504        &self,
505        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
506        req: &GetConsensusRequest,
507    ) -> Result<Option<MetaConsensusValue>, ApiError> {
508        Ok(dbtx.get_value(&MetaConsensusKey(req.0)).await)
509    }
510
511    async fn handle_get_consensus_revision_request(
512        &self,
513        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
514        req: &GetConsensusRequest,
515    ) -> Result<Option<u64>, ApiError> {
516        Ok(dbtx
517            .get_value(&MetaConsensusKey(req.0))
518            .await
519            .map(|cv| cv.revision))
520    }
521
522    async fn handle_get_submissions_request(
523        &self,
524        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
525        _auth: &ApiAuth,
526        req: &GetSubmissionsRequest,
527    ) -> Result<BTreeMap<PeerId, MetaValue>, ApiError> {
528        Ok(dbtx
529            .find_by_prefix(&MetaSubmissionsByKeyPrefix(req.0))
530            .await
531            .collect::<Vec<_>>()
532            .await
533            .into_iter()
534            .map(|(k, v)| (k.peer_id, v.value))
535            .collect())
536    }
537}
538
539// UI Methods for Meta Module
540
541impl Meta {
542    /// UI helper to submit a value change with default auth
543    pub async fn handle_submit_request_ui(&self, value: Value) -> Result<(), ApiError> {
544        let mut dbtx = self.db.begin_transaction().await;
545
546        self.handle_submit_request(
547            &mut dbtx.to_ref_nc(),
548            &ApiAuth(String::new()),
549            &SubmitRequest {
550                key: DEFAULT_META_KEY,
551                value: MetaValue::from(serde_json::to_vec(&value).unwrap().as_slice()),
552            },
553        )
554        .await?;
555
556        dbtx.commit_tx_result()
557            .await
558            .map_err(|e| ApiError::server_error(e.to_string()))?;
559
560        Ok(())
561    }
562
563    /// UI helper to get consensus data as a key-value map
564    pub async fn handle_get_consensus_request_ui(&self) -> Result<Option<Value>, ApiError> {
565        self.handle_get_consensus_request(
566            &mut self.db.begin_transaction_nc().await,
567            &GetConsensusRequest(DEFAULT_META_KEY),
568        )
569        .await?
570        .map(|value| serde_json::from_slice(value.value.as_slice()))
571        .transpose()
572        .map_err(|e| ApiError::server_error(e.to_string()))
573    }
574
575    /// UI helper to get consensus revision
576    pub async fn handle_get_consensus_revision_request_ui(&self) -> Result<u64, ApiError> {
577        self.handle_get_consensus_revision_request(
578            &mut self.db.begin_transaction_nc().await,
579            &GetConsensusRequest(DEFAULT_META_KEY),
580        )
581        .await
582        .map(|r| r.unwrap_or(0))
583    }
584
585    /// Get the submissions for UI display,
586    pub async fn handle_get_submissions_request_ui(
587        &self,
588    ) -> Result<BTreeMap<PeerId, Value>, ApiError> {
589        let mut submissions = BTreeMap::new();
590
591        let mut dbtx = self.db.begin_transaction_nc().await;
592
593        for (peer_id, value) in self
594            .handle_get_submissions_request(
595                &mut dbtx.to_ref_nc(),
596                &ApiAuth(String::new()),
597                &GetSubmissionsRequest(DEFAULT_META_KEY),
598            )
599            .await?
600        {
601            if let Ok(value) = serde_json::from_slice(value.as_slice()) {
602                submissions.insert(peer_id, value);
603            }
604        }
605
606        Ok(submissions)
607    }
608}