fedimint_meta_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::module_name_repetitions)]
3#![allow(clippy::missing_panics_doc)]
4#![allow(clippy::missing_errors_doc)]
5
6pub mod db;
7
8use std::collections::BTreeMap;
9use std::future;
10
11use async_trait::async_trait;
12use db::{
13    MetaConsensusKey, MetaDesiredKey, MetaDesiredValue, MetaSubmissionsByKeyPrefix,
14    MetaSubmissionsKey,
15};
16use fedimint_core::config::{
17    ServerModuleConfig, ServerModuleConsensusConfig, TypedServerModuleConfig,
18    TypedServerModuleConsensusConfig,
19};
20use fedimint_core::core::ModuleInstanceId;
21use fedimint_core::db::{
22    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
23    NonCommittable,
24};
25use fedimint_core::module::audit::Audit;
26use fedimint_core::module::serde_json::Value;
27use fedimint_core::module::{
28    ApiAuth, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
29    InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
30    TransactionItemAmounts, api_endpoint, serde_json,
31};
32use fedimint_core::{InPoint, NumPeers, OutPoint, PeerId, push_db_pair_items};
33use fedimint_logging::LOG_MODULE_META;
34use fedimint_meta_common::config::{
35    MetaClientConfig, MetaConfig, MetaConfigConsensus, MetaConfigPrivate,
36};
37use fedimint_meta_common::endpoint::{
38    GET_CONSENSUS_ENDPOINT, GET_CONSENSUS_REV_ENDPOINT, GET_SUBMISSIONS_ENDPOINT,
39    GetConsensusRequest, GetSubmissionResponse, GetSubmissionsRequest, SUBMIT_ENDPOINT,
40    SubmitRequest,
41};
42use fedimint_meta_common::{
43    DEFAULT_META_KEY, MODULE_CONSENSUS_VERSION, MetaCommonInit, MetaConsensusItem,
44    MetaConsensusValue, MetaInput, MetaInputError, MetaKey, MetaModuleTypes, MetaOutput,
45    MetaOutputError, MetaOutputOutcome, MetaValue,
46};
47use fedimint_server_core::config::PeerHandleOps;
48use fedimint_server_core::migration::ServerModuleDbMigrationFn;
49use fedimint_server_core::{
50    ConfigGenModuleArgs, ServerModule, ServerModuleInit, ServerModuleInitArgs,
51};
52use futures::StreamExt;
53use rand::{Rng, thread_rng};
54use strum::IntoEnumIterator;
55use tracing::{debug, info, trace};
56
57use crate::db::{
58    DbKeyPrefix, MetaConsensusKeyPrefix, MetaDesiredKeyPrefix, MetaSubmissionValue,
59    MetaSubmissionsKeyPrefix,
60};
61
62/// Generates the module
63#[derive(Debug, Clone)]
64pub struct MetaInit;
65
66// TODO: Boilerplate-code
67impl ModuleInit for MetaInit {
68    type Common = MetaCommonInit;
69
70    /// Dumps all database items for debugging
71    async fn dump_database(
72        &self,
73        dbtx: &mut DatabaseTransaction<'_>,
74        prefix_names: Vec<String>,
75    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
76        // TODO: Boilerplate-code
77        let mut items: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
78        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
79            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
80        });
81
82        for table in filtered_prefixes {
83            match table {
84                DbKeyPrefix::Desired => {
85                    push_db_pair_items!(
86                        dbtx,
87                        MetaDesiredKeyPrefix,
88                        MetaDesiredKey,
89                        MetaDesiredValue,
90                        items,
91                        "Meta Desired"
92                    );
93                }
94                DbKeyPrefix::Consensus => {
95                    push_db_pair_items!(
96                        dbtx,
97                        MetaConsensusKeyPrefix,
98                        MetaConsensusKey,
99                        MetaConsensusValue,
100                        items,
101                        "Meta Consensus"
102                    );
103                }
104                DbKeyPrefix::Submissions => {
105                    push_db_pair_items!(
106                        dbtx,
107                        MetaSubmissionsKeyPrefix,
108                        MetaSubmissionsKey,
109                        MetaSubmissionValue,
110                        items,
111                        "Meta Submissions"
112                    );
113                }
114            }
115        }
116
117        Box::new(items.into_iter())
118    }
119}
120
121/// Implementation of server module non-consensus functions
122#[async_trait]
123impl ServerModuleInit for MetaInit {
124    type Module = Meta;
125
126    /// Returns the version of this module
127    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
128        &[MODULE_CONSENSUS_VERSION]
129    }
130
131    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
132        SupportedModuleApiVersions::from_raw(
133            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
134            (
135                MODULE_CONSENSUS_VERSION.major,
136                MODULE_CONSENSUS_VERSION.minor,
137            ),
138            &[(0, 0)],
139        )
140    }
141
142    /// Initialize the module
143    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
144        Ok(Meta {
145            cfg: args.cfg().to_typed()?,
146            our_peer_id: args.our_peer_id(),
147            num_peers: args.num_peers(),
148            db: args.db().clone(),
149        })
150    }
151
152    /// Generates configs for all peers in a trusted manner for testing
153    fn trusted_dealer_gen(
154        &self,
155        peers: &[PeerId],
156        _args: &ConfigGenModuleArgs,
157    ) -> BTreeMap<PeerId, ServerModuleConfig> {
158        // Generate a config for each peer
159        peers
160            .iter()
161            .map(|&peer| {
162                let config = MetaConfig {
163                    private: MetaConfigPrivate,
164                    consensus: MetaConfigConsensus {},
165                };
166                (peer, config.to_erased())
167            })
168            .collect()
169    }
170
171    /// Generates configs for all peers in an untrusted manner
172    async fn distributed_gen(
173        &self,
174        _peers: &(dyn PeerHandleOps + Send + Sync),
175        _args: &ConfigGenModuleArgs,
176    ) -> anyhow::Result<ServerModuleConfig> {
177        Ok(MetaConfig {
178            private: MetaConfigPrivate,
179            consensus: MetaConfigConsensus {},
180        }
181        .to_erased())
182    }
183
184    /// Converts the consensus config into the client config
185    fn get_client_config(
186        &self,
187        config: &ServerModuleConsensusConfig,
188    ) -> anyhow::Result<MetaClientConfig> {
189        let _config = MetaConfigConsensus::from_erased(config)?;
190        Ok(MetaClientConfig {})
191    }
192
193    fn validate_config(
194        &self,
195        _identity: &PeerId,
196        _config: ServerModuleConfig,
197    ) -> anyhow::Result<()> {
198        Ok(())
199    }
200
201    /// DB migrations to move from old to newer versions
202    fn get_database_migrations(
203        &self,
204    ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Meta>> {
205        BTreeMap::new()
206    }
207}
208
209/// Meta module
210#[derive(Debug)]
211pub struct Meta {
212    pub cfg: MetaConfig,
213    pub our_peer_id: PeerId,
214    pub num_peers: NumPeers,
215    pub db: Database,
216}
217
218impl Meta {
219    async fn get_desired(dbtx: &mut DatabaseTransaction<'_>) -> Vec<(MetaKey, MetaDesiredValue)> {
220        dbtx.find_by_prefix(&MetaDesiredKeyPrefix)
221            .await
222            .map(|(k, v)| (k.0, v))
223            .collect()
224            .await
225    }
226
227    async fn get_submission(
228        dbtx: &mut DatabaseTransaction<'_>,
229        key: MetaKey,
230        peer_id: PeerId,
231    ) -> Option<MetaSubmissionValue> {
232        dbtx.get_value(&MetaSubmissionsKey { key, peer_id }).await
233    }
234
235    async fn get_consensus(dbtx: &mut DatabaseTransaction<'_>, key: MetaKey) -> Option<MetaValue> {
236        dbtx.get_value(&MetaConsensusKey(key))
237            .await
238            .map(|consensus_value| consensus_value.value)
239    }
240
241    async fn change_consensus(
242        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
243        key: MetaKey,
244        value: MetaValue,
245        matching_submissions: Vec<PeerId>,
246    ) {
247        let value_len = value.as_slice().len();
248        let revision = dbtx
249            .get_value(&MetaConsensusKey(key))
250            .await
251            .map(|cv| cv.revision);
252        let revision = revision.map(|r| r.wrapping_add(1)).unwrap_or_default();
253        dbtx.insert_entry(
254            &MetaConsensusKey(key),
255            &MetaConsensusValue { revision, value },
256        )
257        .await;
258
259        info!(target: LOG_MODULE_META, %key, rev = %revision, len = %value_len, "New consensus value");
260
261        for peer_id in matching_submissions {
262            dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
263                .await;
264        }
265    }
266}
267
268/// Implementation of consensus for the server module
269#[async_trait]
270impl ServerModule for Meta {
271    /// Define the consensus types
272    type Common = MetaModuleTypes;
273    type Init = MetaInit;
274
275    /// Check the difference between what's desired vs submitted and consensus.
276    ///
277    /// Returns:
278    /// Items to submit as our proposal.
279    async fn consensus_proposal(
280        &self,
281        dbtx: &mut DatabaseTransaction<'_>,
282    ) -> Vec<MetaConsensusItem> {
283        let desired: Vec<_> = Self::get_desired(dbtx).await;
284
285        let mut to_submit = vec![];
286
287        for (
288            key,
289            MetaDesiredValue {
290                value: desired_value,
291                salt,
292            },
293        ) in desired
294        {
295            let consensus_value = &Self::get_consensus(dbtx, key).await;
296            let consensus_submission_value =
297                Self::get_submission(dbtx, key, self.our_peer_id).await;
298            if consensus_submission_value.as_ref()
299                == Some(&MetaSubmissionValue {
300                    value: desired_value.clone(),
301                    salt,
302                })
303            {
304                // our submission is already registered, nothing to do
305            } else if consensus_value.as_ref() == Some(&desired_value) {
306                if consensus_submission_value.is_none() {
307                    // our desired value is equal to consensus and cleared our
308                    // submission (as it is equal the
309                    // consensus) so we don't need to propose it
310                } else {
311                    // we want to submit the same value as the current consensus, usually
312                    // to clear the previous submission that did not became the consensus (we were
313                    // outvoted)
314                    to_submit.push(MetaConsensusItem {
315                        key,
316                        value: desired_value,
317                        salt,
318                    });
319                }
320            } else {
321                to_submit.push(MetaConsensusItem {
322                    key,
323                    value: desired_value,
324                    salt,
325                });
326            }
327        }
328
329        trace!(target: LOG_MODULE_META, ?to_submit, "Desired actions");
330        to_submit
331    }
332
333    /// BUG: This implementation fails to return an `Err` on redundant consensus
334    /// items. If you are using this code as a template,
335    /// make sure to read the [`ServerModule::process_consensus_item`]
336    /// documentation,
337    async fn process_consensus_item<'a, 'b>(
338        &'a self,
339        dbtx: &mut DatabaseTransaction<'b>,
340        MetaConsensusItem { key, value, salt }: MetaConsensusItem,
341        peer_id: PeerId,
342    ) -> anyhow::Result<()> {
343        trace!(target: LOG_MODULE_META, %key, %value, %salt, "Processing consensus item proposal");
344
345        let new_value = MetaSubmissionValue { salt, value };
346        // first of all: any new submission overrides previous submission
347        if let Some(prev_value) = Self::get_submission(dbtx, key, peer_id).await
348            && prev_value != new_value
349        {
350            dbtx.remove_entry(&MetaSubmissionsKey { key, peer_id })
351                .await;
352        }
353        // then: if the submission is equal to the current consensus, it's ignored
354        if Some(&new_value.value) == Self::get_consensus(dbtx, key).await.as_ref() {
355            debug!(target: LOG_MODULE_META, %peer_id, %key, "Peer submitted a redundant value");
356            return Ok(());
357        }
358
359        // otherwise, new submission is recorded
360        dbtx.insert_entry(&MetaSubmissionsKey { key, peer_id }, &new_value)
361            .await;
362
363        // we check how many peers submitted the same value (including this peer)
364        let matching_submissions: Vec<PeerId> = dbtx
365            .find_by_prefix(&MetaSubmissionsByKeyPrefix(key))
366            .await
367            .filter(|(_submission_key, submission_value)| {
368                future::ready(new_value.value == submission_value.value)
369            })
370            .map(|(submission_key, _)| submission_key.peer_id)
371            .collect()
372            .await;
373
374        let threshold = self.num_peers.threshold();
375        info!(target: LOG_MODULE_META,
376             %peer_id,
377             %key,
378            value_len = %new_value.value.as_slice().len(),
379             matching = %matching_submissions.len(),
380            %threshold, "Peer submitted a value");
381
382        // if threshold or more, change the consensus value
383        if threshold <= matching_submissions.len() {
384            Self::change_consensus(dbtx, key, new_value.value, matching_submissions).await;
385        }
386
387        Ok(())
388    }
389
390    async fn process_input<'a, 'b, 'c>(
391        &'a self,
392        _dbtx: &mut DatabaseTransaction<'c>,
393        _input: &'b MetaInput,
394        _in_point: InPoint,
395    ) -> Result<InputMeta, MetaInputError> {
396        Err(MetaInputError::NotSupported)
397    }
398
399    async fn process_output<'a, 'b>(
400        &'a self,
401        _dbtx: &mut DatabaseTransaction<'b>,
402        _output: &'a MetaOutput,
403        _out_point: OutPoint,
404    ) -> Result<TransactionItemAmounts, MetaOutputError> {
405        Err(MetaOutputError::NotSupported)
406    }
407
408    async fn output_status(
409        &self,
410        _dbtx: &mut DatabaseTransaction<'_>,
411        _out_point: OutPoint,
412    ) -> Option<MetaOutputOutcome> {
413        None
414    }
415
416    async fn audit(
417        &self,
418        _dbtx: &mut DatabaseTransaction<'_>,
419        _audit: &mut Audit,
420        _module_instance_id: ModuleInstanceId,
421    ) {
422    }
423
424    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
425        vec![
426            api_endpoint! {
427                SUBMIT_ENDPOINT,
428                ApiVersion::new(0, 0),
429                async |module: &Meta, context, request: SubmitRequest| -> () {
430
431                    match context.request_auth() {
432                        None => return Err(ApiError::bad_request("Missing password".to_string())),
433                        Some(auth) => {
434                            module.handle_submit_request(&mut context.dbtx(), &auth, &request).await?;
435                        }
436                    }
437
438                    Ok(())
439                }
440            },
441            api_endpoint! {
442                GET_CONSENSUS_ENDPOINT,
443                ApiVersion::new(0, 0),
444                async |module: &Meta, context, request: GetConsensusRequest| -> Option<MetaConsensusValue> {
445                    module.handle_get_consensus_request(&mut context.dbtx().into_nc(), &request).await
446                }
447            },
448            api_endpoint! {
449                GET_CONSENSUS_REV_ENDPOINT,
450                ApiVersion::new(0, 0),
451                async |module: &Meta, context, request: GetConsensusRequest| -> Option<u64> {
452                    module.handle_get_consensus_revision_request(&mut context.dbtx().into_nc(), &request).await
453                }
454            },
455            api_endpoint! {
456                GET_SUBMISSIONS_ENDPOINT,
457                ApiVersion::new(0, 0),
458                async |module: &Meta, context, request: GetSubmissionsRequest| -> GetSubmissionResponse {
459                    match context.request_auth() {
460                        None => return Err(ApiError::bad_request("Missing password".to_string())),
461                        Some(auth) => {
462                            module.handle_get_submissions_request(&mut context.dbtx().into_nc(),&auth, &request).await
463                        }
464                    }
465                }
466            },
467        ]
468    }
469}
470
471impl Meta {
472    async fn handle_submit_request(
473        &self,
474        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
475        _auth: &ApiAuth,
476        req: &SubmitRequest,
477    ) -> Result<(), ApiError> {
478        let salt = thread_rng().r#gen();
479
480        info!(target: LOG_MODULE_META,
481             key = %req.key,
482             peer_id = %self.our_peer_id,
483             value_len = %req.value.as_slice().len(),
484             "Our own guardian submitted a value");
485
486        dbtx.insert_entry(
487            &MetaDesiredKey(req.key),
488            &MetaDesiredValue {
489                value: req.value.clone(),
490                salt,
491            },
492        )
493        .await;
494
495        Ok(())
496    }
497
498    async fn handle_get_consensus_request(
499        &self,
500        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
501        req: &GetConsensusRequest,
502    ) -> Result<Option<MetaConsensusValue>, ApiError> {
503        Ok(dbtx.get_value(&MetaConsensusKey(req.0)).await)
504    }
505
506    async fn handle_get_consensus_revision_request(
507        &self,
508        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
509        req: &GetConsensusRequest,
510    ) -> Result<Option<u64>, ApiError> {
511        Ok(dbtx
512            .get_value(&MetaConsensusKey(req.0))
513            .await
514            .map(|cv| cv.revision))
515    }
516
517    async fn handle_get_submissions_request(
518        &self,
519        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
520        _auth: &ApiAuth,
521        req: &GetSubmissionsRequest,
522    ) -> Result<BTreeMap<PeerId, MetaValue>, ApiError> {
523        Ok(dbtx
524            .find_by_prefix(&MetaSubmissionsByKeyPrefix(req.0))
525            .await
526            .collect::<Vec<_>>()
527            .await
528            .into_iter()
529            .map(|(k, v)| (k.peer_id, v.value))
530            .collect())
531    }
532}
533
534// UI Methods for Meta Module
535
536impl Meta {
537    /// UI helper to submit a value change with default auth
538    pub async fn handle_submit_request_ui(&self, value: Value) -> Result<(), ApiError> {
539        let mut dbtx = self.db.begin_transaction().await;
540
541        self.handle_submit_request(
542            &mut dbtx.to_ref_nc(),
543            &ApiAuth(String::new()),
544            &SubmitRequest {
545                key: DEFAULT_META_KEY,
546                value: MetaValue::from(serde_json::to_vec(&value).unwrap().as_slice()),
547            },
548        )
549        .await?;
550
551        dbtx.commit_tx_result()
552            .await
553            .map_err(|e| ApiError::server_error(e.to_string()))?;
554
555        Ok(())
556    }
557
558    /// UI helper to get consensus data as a key-value map
559    pub async fn handle_get_consensus_request_ui(&self) -> Result<Option<Value>, ApiError> {
560        self.handle_get_consensus_request(
561            &mut self.db.begin_transaction_nc().await,
562            &GetConsensusRequest(DEFAULT_META_KEY),
563        )
564        .await?
565        .map(|value| serde_json::from_slice(value.value.as_slice()))
566        .transpose()
567        .map_err(|e| ApiError::server_error(e.to_string()))
568    }
569
570    /// UI helper to get consensus revision
571    pub async fn handle_get_consensus_revision_request_ui(&self) -> Result<u64, ApiError> {
572        self.handle_get_consensus_revision_request(
573            &mut self.db.begin_transaction_nc().await,
574            &GetConsensusRequest(DEFAULT_META_KEY),
575        )
576        .await
577        .map(|r| r.unwrap_or(0))
578    }
579
580    /// Get the submissions for UI display,
581    pub async fn handle_get_submissions_request_ui(
582        &self,
583    ) -> Result<BTreeMap<PeerId, Value>, ApiError> {
584        let mut submissions = BTreeMap::new();
585
586        let mut dbtx = self.db.begin_transaction_nc().await;
587
588        for (peer_id, value) in self
589            .handle_get_submissions_request(
590                &mut dbtx.to_ref_nc(),
591                &ApiAuth(String::new()),
592                &GetSubmissionsRequest(DEFAULT_META_KEY),
593            )
594            .await?
595        {
596            if let Ok(value) = serde_json::from_slice(value.as_slice()) {
597                submissions.insert(peer_id, value);
598            }
599        }
600
601        Ok(submissions)
602    }
603}