Skip to main content

fedimint_server/consensus/
engine.rs

1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use aleph_bft::Keychain as KeychainTrait;
8use anyhow::{Context, anyhow, bail};
9use async_channel::Receiver;
10use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, ServerError};
11use fedimint_api_client::query::FilterMap;
12use fedimint_core::config::P2PMessage;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::module::audit::Audit;
19use fedimint_core::module::registry::ModuleDecoderRegistry;
20use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
21use fedimint_core::net::peers::DynP2PConnections;
22use fedimint_core::runtime::spawn;
23use fedimint_core::secp256k1::schnorr;
24use fedimint_core::session_outcome::{AcceptedItem, SessionOutcome, SignedSessionOutcome};
25use fedimint_core::task::{TaskGroup, TaskHandle, sleep};
26use fedimint_core::timing::TimeReporter;
27use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
28use fedimint_core::{NumPeers, NumPeersExt, PeerId, timing};
29use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
30use futures::StreamExt;
31use rand::Rng;
32use rand::seq::IteratorRandom;
33use tokio::sync::watch;
34use tracing::{Level, debug, error, info, instrument, trace, warn};
35
36use crate::LOG_CONSENSUS;
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{DataProvider, UnitData};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::to_node_index;
45use crate::consensus::db::{
46    AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47    SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
51use crate::metrics::{
52    CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
53    CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ITEMS_PROCESSED_TOTAL,
54    CONSENSUS_ORDERING_LATENCY_SECONDS, CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX,
55    CONSENSUS_SESSION_COUNT,
56};
57
58// The name of the directory where the database checkpoints are stored.
59const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61/// Runs the main server consensus loop
62pub struct ConsensusEngine {
63    pub modules: ServerModuleRegistry,
64    pub db: Database,
65    pub federation_api: DynGlobalApi,
66    pub cfg: ServerConfig,
67    pub submission_receiver: Receiver<ConsensusItem>,
68    pub shutdown_receiver: watch::Receiver<Option<u64>>,
69    pub connections: DynP2PConnections<P2PMessage>,
70    pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
71    pub ord_latency_sender: watch::Sender<Option<Duration>>,
72    pub task_group: TaskGroup,
73    pub data_dir: PathBuf,
74    pub db_checkpoint_retention: u64,
75    pub session_timeout: Duration,
76}
77
78impl ConsensusEngine {
79    fn num_peers(&self) -> NumPeers {
80        self.cfg.consensus.broadcast_public_keys.to_num_peers()
81    }
82
83    fn identity(&self) -> PeerId {
84        self.cfg.local.identity
85    }
86
87    #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
88    pub async fn run(self) -> anyhow::Result<()> {
89        if self.num_peers().total() == 1 {
90            self.run_single_guardian(self.task_group.make_handle())
91                .await
92        } else {
93            self.run_consensus(self.task_group.make_handle()).await
94        }
95    }
96
97    pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
98        assert_eq!(self.num_peers(), NumPeers::from(1));
99
100        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
101
102        while !task_handle.is_shutting_down() {
103            let session_index = self.get_finished_session_count().await;
104
105            CONSENSUS_SESSION_COUNT.set(session_index as i64);
106
107            let mut item_index = self.pending_accepted_items().await.len() as u64;
108
109            let session_start_time = std::time::Instant::now();
110
111            while let Ok(item) = self.submission_receiver.recv().await {
112                if self
113                    .process_consensus_item(session_index, item_index, item, self.identity())
114                    .await
115                    .is_ok()
116                {
117                    item_index += 1;
118                }
119
120                // we rely on the module consensus items to notice the timeout
121                if session_start_time.elapsed() > Duration::from_mins(1) {
122                    break;
123                }
124            }
125
126            let session_outcome = SessionOutcome {
127                items: self.pending_accepted_items().await,
128            };
129
130            let header = session_outcome.header(session_index);
131            let signature = Keychain::new(&self.cfg).sign_schnorr(&header);
132            let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
133
134            self.complete_session(
135                session_index,
136                SignedSessionOutcome {
137                    session_outcome,
138                    signatures,
139                },
140            )
141            .await;
142
143            self.checkpoint_database(session_index);
144
145            info!(target: LOG_CONSENSUS, "Session {session_index} completed");
146
147            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
148                break;
149            }
150        }
151
152        info!(target: LOG_CONSENSUS, "Consensus task shut down");
153
154        Ok(())
155    }
156
157    pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
158        // We need four peers to run the atomic broadcast
159        assert!(self.num_peers().total() >= 4);
160
161        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
162
163        while !task_handle.is_shutting_down() {
164            let session_index = self.get_finished_session_count().await;
165
166            CONSENSUS_SESSION_COUNT.set(session_index as i64);
167
168            let is_recovery = self.is_recovery().await;
169
170            info!(
171                target: LOG_CONSENSUS,
172                session_index,
173                is_recovery,
174                "Starting consensus session"
175            );
176
177            // If a session gets stuck for an extended period of time we exit
178            // the process such that our supervisor restarts fedimintd, which
179            // has been observed to resolve stuck sessions. Since AlephBFT
180            // persists its units to the database this is equivalent to a crash
181            // and therefore safe.
182            let session = tokio::time::timeout(
183                self.session_timeout,
184                self.run_session(self.connections.clone(), session_index),
185            )
186            .await
187            .context("Consensus session timed out, exiting...")?;
188
189            if session.is_none() {
190                return Ok(());
191            }
192
193            info!(target: LOG_CONSENSUS, ?session_index, "Completed consensus session");
194
195            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
196                info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
197
198                sleep(Duration::from_mins(1)).await;
199
200                break;
201            }
202        }
203
204        info!(target: LOG_CONSENSUS, "Consensus task shut down");
205
206        Ok(())
207    }
208
209    pub async fn run_session(
210        &self,
211        connections: DynP2PConnections<P2PMessage>,
212        session_index: u64,
213    ) -> Option<()> {
214        // In order to bound a sessions RAM consumption we need to bound its number of
215        // units and therefore its number of rounds. Since we use a session to
216        // create a naive secp256k1 threshold signature for the header of session
217        // outcome we have to guarantee that an attacker cannot exhaust our
218        // memory by preventing the creation of a threshold signature, thereby
219        // keeping the session open indefinitely. Hence, after a certain round
220        // index, we increase the delay between rounds exponentially such that
221        // the end of the aleph bft session would only be reached after a minimum
222        // of 10 years. In case of such an attack the broadcast stops ordering any
223        // items until the attack subsides as no items are ordered while the
224        // signatures are collected. The maximum RAM consumption of the aleph bft
225        // broadcast instance is therefore bound by:
226        //
227        // self.keychain.peer_count()
228        //      * (broadcast_rounds_per_session + EXP_SLOWDOWN_ROUNDS)
229        //      * ALEPH_BFT_UNIT_BYTE_LIMIT
230
231        const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
232        const BASE: f64 = 1.02;
233
234        let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
235        let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
236
237        let mut delay_config = aleph_bft::default_delay_config();
238
239        delay_config.unit_creation_delay = Arc::new(move |round_index| {
240            let delay = if round_index == 0 {
241                0.0
242            } else {
243                round_delay
244                    * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
245                    * rand::thread_rng().gen_range(0.5..=1.5)
246            };
247
248            Duration::from_millis(delay.round() as u64)
249        });
250
251        let config = aleph_bft::create_config(
252            self.num_peers().total().into(),
253            self.identity().to_usize().into(),
254            session_index,
255            self.cfg
256                .consensus
257                .broadcast_rounds_per_session
258                .checked_add(EXP_SLOWDOWN_ROUNDS)
259                .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
260            delay_config,
261            Duration::from_hours(87600),
262        )
263        .expect("The exponential slowdown exceeds 10 years");
264
265        // we can use an unbounded channel here since the number and size of units
266        // ordered in a single aleph session is bounded as described above
267        let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
268        let (signature_sender, signature_receiver) = watch::channel(None);
269        let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
270        let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
271
272        // Create channels for P2P session sync
273        let (signed_outcomes_sender, signed_outcomes_receiver) = async_channel::unbounded();
274        let (signatures_sender, signatures_receiver) = async_channel::unbounded();
275
276        let aleph_handle = spawn(
277            "aleph run session",
278            aleph_bft::run_session(
279                config,
280                aleph_bft::LocalIO::new(
281                    DataProvider::new(
282                        self.submission_receiver.clone(),
283                        signature_receiver,
284                        timestamp_sender,
285                        self.is_recovery().await,
286                    ),
287                    FinalizationHandler::new(unit_data_sender),
288                    BackupWriter::new(self.db.clone()).await,
289                    BackupReader::new(self.db.clone()),
290                ),
291                Network::new(
292                    connections.clone(),
293                    signed_outcomes_sender,
294                    signatures_sender,
295                    self.db.clone(),
296                ),
297                Keychain::new(&self.cfg),
298                Spawner::new(self.task_group.make_subgroup()),
299                aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
300            ),
301        );
302
303        self.ord_latency_sender.send_replace(None);
304
305        let signed_session_outcome = self
306            .complete_signed_session_outcome(
307                session_index,
308                unit_data_receiver,
309                signature_sender,
310                timestamp_receiver,
311                signed_outcomes_receiver,
312                signatures_receiver,
313                connections,
314            )
315            .await?;
316
317        assert!(
318            self.validate_signed_session_outcome(&signed_session_outcome, session_index),
319            "Our created signed session outcome fails validation"
320        );
321
322        info!(target: LOG_CONSENSUS, ?session_index, "Terminating Aleph BFT session");
323
324        // We can terminate the session instead of waiting for other peers to complete
325        // it since they can always download the signed session outcome from us
326        terminator_sender.send(()).ok();
327        aleph_handle.await.ok();
328
329        // This method removes the backup of the current session from the database
330        // and therefore has to be called after we have waited for the session to
331        // shut down, or we risk write-write conflicts with the UnitSaver
332        self.complete_session(session_index, signed_session_outcome)
333            .await;
334
335        self.checkpoint_database(session_index);
336
337        Some(())
338    }
339
340    async fn is_recovery(&self) -> bool {
341        self.db
342            .begin_transaction_nc()
343            .await
344            .find_by_prefix(&AlephUnitsPrefix)
345            .await
346            .next()
347            .await
348            .is_some()
349    }
350
351    #[allow(clippy::too_many_arguments)]
352    pub async fn complete_signed_session_outcome(
353        &self,
354        session_index: u64,
355        ordered_unit_receiver: Receiver<OrderedUnit>,
356        signature_sender: watch::Sender<Option<schnorr::Signature>>,
357        timestamp_receiver: Receiver<Instant>,
358        signed_outcomes_receiver: Receiver<(PeerId, SignedSessionOutcome)>,
359        signatures_receiver: Receiver<(PeerId, schnorr::Signature)>,
360        _connections: DynP2PConnections<P2PMessage>,
361    ) -> Option<SignedSessionOutcome> {
362        // It is guaranteed that aleph bft will always replay all previously processed
363        // items from the current session from index zero
364        let mut item_index = 0;
365
366        // We request the signed session outcome every three seconds from a random peer
367        let mut index_broadcast_interval = tokio::time::interval(Duration::from_secs(3));
368
369        let mut request_signed_session_outcome = Box::pin(async {
370            self.request_signed_session_outcome(&self.federation_api, session_index)
371                .await
372        });
373
374        // We build a session outcome out of the ordered batches until either we have
375        // processed broadcast_rounds_per_session rounds or a threshold signed
376        // session outcome is obtained from our peers
377        loop {
378            tokio::select! {
379                result = ordered_unit_receiver.recv() => {
380                    let ordered_unit = result.ok()?;
381
382                    if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
383                        info!(
384                            target: LOG_CONSENSUS,
385                            session_index,
386                            "Reached Aleph BFT round limit, stopping item collection"
387                        );
388                        break;
389                    }
390
391                    if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
392                        if ordered_unit.creator == self.identity() {
393                            match timestamp_receiver.try_recv() {
394                                Ok(timestamp) => {
395                                    let latency = match *self.ord_latency_sender.borrow() {
396                                        Some(latency) => (9 * latency +  timestamp.elapsed()) / 10,
397                                        None => timestamp.elapsed()
398                                    };
399
400                                    self.ord_latency_sender.send_replace(Some(latency));
401
402                                    CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
403                                }
404                                Err(err) => {
405                                    debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal in recovery");
406                                }
407                            }
408                        }
409
410                        match Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()) {
411                            Ok(items) => {
412                                for item in items {
413                                    if let Ok(()) = self.process_consensus_item(
414                                        session_index,
415                                        item_index,
416                                        item.clone(),
417                                        ordered_unit.creator
418                                    ).await {
419                                        item_index += 1;
420                                    }
421                                }
422                            }
423                            Err(err) => {
424                                error!(
425                                    target: LOG_CONSENSUS,
426                                    session_index,
427                                    peer = %ordered_unit.creator,
428                                    err = %err.fmt_compact(),
429                                    "Failed to decode consensus items from peer"
430                                );
431                            }
432                        }
433                    }
434                },
435                // TODO: remove this branch in 0.11.0
436                signed_session_outcome = &mut request_signed_session_outcome => {
437                    info!(
438                        target: LOG_CONSENSUS,
439                        ?session_index,
440                        "Recovered signed session outcome from peers while processing consensus items"
441                    );
442
443                    let pending_accepted_items = self.pending_accepted_items().await;
444
445                    // this panics if we have more accepted items than the signed session outcome
446                    let (processed, unprocessed) = signed_session_outcome
447                        .session_outcome
448                        .items
449                        .split_at(pending_accepted_items.len());
450
451                    info!(
452                        target: LOG_CONSENSUS,
453                        session_index,
454                        processed = %processed.len(),
455                        unprocessed = %unprocessed.len(),
456                        "Processing remaining items..."
457                    );
458
459                    assert!(
460                        processed.iter().eq(pending_accepted_items.iter()),
461                        "Consensus Failure: pending accepted items disagree with federation consensus"
462                    );
463
464                    for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
465                        if let Err(err) = self.process_consensus_item(
466                            session_index,
467                            item_index as u64,
468                            accepted_item.item.clone(),
469                            accepted_item.peer
470                        ).await {
471                            panic!(
472                                "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
473                                processed.len(),
474                                unprocessed.len(),
475                            );
476                        }
477                    }
478
479                    return Some(signed_session_outcome);
480                },
481                result = signed_outcomes_receiver.recv() => {
482                    let (peer_id, p2p_outcome) = result.ok()?;
483
484                    // Validate signatures
485                    if self.validate_signed_session_outcome(&p2p_outcome, session_index) {
486                        info!(
487                            target: LOG_CONSENSUS,
488                            session_index,
489                            peer_id = %peer_id,
490                            "Received SignedSessionOutcome via P2P while collection signatures"
491                        );
492
493                        let pending_accepted_items = self.pending_accepted_items().await;
494
495                        // this panics if we have more accepted items than the signed session outcome
496                        let (processed, unprocessed) = p2p_outcome
497                            .session_outcome
498                            .items
499                            .split_at(pending_accepted_items.len());
500
501                        info!(
502                            target: LOG_CONSENSUS,
503                            ?session_index,
504                            processed = %processed.len(),
505                            unprocessed = %unprocessed.len(),
506                            "Processing remaining items..."
507                        );
508
509                        assert!(
510                            processed.iter().eq(pending_accepted_items.iter()),
511                            "Consensus Failure: pending accepted items disagree with federation consensus"
512                        );
513
514                        for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
515                            if let Err(err) = self.process_consensus_item(
516                                session_index,
517                                item_index as u64,
518                                accepted_item.item.clone(),
519                                accepted_item.peer
520                            ).await {
521                                panic!(
522                                    "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
523                                    processed.len(),
524                                    unprocessed.len(),
525                                );
526                            }
527                        }
528
529                        info!(
530                            target: LOG_CONSENSUS,
531                            ?session_index,
532                            peer_id = %peer_id,
533                            "Successfully recovered session via P2P"
534                        );
535
536                        return Some(p2p_outcome);
537                    }
538
539                    debug!(
540                        target: LOG_CONSENSUS,
541                        %peer_id,
542                        "Invalid P2P SignedSessionOutcome"
543                    );
544                }
545                _ = index_broadcast_interval.tick() => {
546                    // TODO: start sending the new messages in 0.11.0
547                    // connections.send(
548                    //     Recipient::Peer(self.random_peer()),
549                    //     P2PMessage::SessionIndex(session_index),
550                    // );
551                }
552            }
553        }
554
555        let items = self.pending_accepted_items().await;
556
557        assert_eq!(item_index, items.len() as u64);
558
559        info!(target: LOG_CONSENSUS, ?session_index, ?item_index, "Processed all items for session");
560
561        let session_outcome = SessionOutcome { items };
562
563        let header = session_outcome.header(session_index);
564
565        info!(
566            target: LOG_CONSENSUS,
567            ?session_index,
568            "Signing session header..."
569        );
570
571        let keychain = Keychain::new(&self.cfg);
572
573        let our_signature = keychain.sign_schnorr(&header);
574
575        // Send our own signature to the data provider to be submitted to AlephBFT
576        #[allow(clippy::disallowed_methods)]
577        signature_sender.send(Some(our_signature)).ok()?;
578
579        let mut signatures = BTreeMap::from_iter([(self.identity(), our_signature)]);
580
581        let items_dump = tokio::sync::OnceCell::new();
582
583        // We request the session signature every second to all peers
584        let mut signature_broadcast_interval = tokio::time::interval(Duration::from_secs(1));
585
586        // We collect the ordered signatures until we either obtain a threshold
587        // signature or a signed session outcome arrives from our peers
588        while signatures.len() < self.num_peers().threshold() {
589            tokio::select! {
590                // TODO: remove this branch in 0.11.0
591                result = ordered_unit_receiver.recv() => {
592                    let ordered_unit = result.ok()?;
593
594                    if let Some(UnitData::Signature(signature)) = ordered_unit.data {
595                        info!(
596                            target: LOG_CONSENSUS,
597                            ?session_index,
598                            peer = %ordered_unit.creator,
599                            "Collected signature from peer via AlephBFT, verifying..."
600                        );
601
602                        if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
603                            signatures.insert(ordered_unit.creator, schnorr::Signature::from_slice(&signature).expect("AlephBFT signature is valid"));
604                        } else {
605                            error!(
606                                target: LOG_CONSENSUS,
607                                ?session_index,
608                                peer = %ordered_unit.creator,
609                                "Consensus Failure: invalid header signature from peer"
610                            );
611
612                            items_dump.get_or_init(|| async {
613                                for (idx, item) in session_outcome.items.iter().enumerate() {
614                                    info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
615                                }
616                            }).await;
617                        }
618                    }
619                }
620                result = signatures_receiver.recv() => {
621                    let (peer_id, signature) = result.ok()?;
622
623                    if keychain.verify_schnorr(&header, &signature, peer_id) {
624                        signatures.insert(peer_id, signature);
625
626                        info!(
627                            target: LOG_CONSENSUS,
628                            session_index,
629                            peer_id = %peer_id,
630                            "Collected signature from peer via P2P"
631                        );
632                    }
633
634                    debug!(
635                        target: LOG_CONSENSUS,
636                        session_index,
637                        peer_id = %peer_id,
638                        "Invalid P2P signature from peer"
639                    );
640                }
641                // TODO: remove this branch in 0.11.0
642                signed_session_outcome = &mut request_signed_session_outcome => {
643                    info!(
644                        target: LOG_CONSENSUS,
645                        ?session_index,
646                        "Recovered signed session outcome from peers while collecting signatures"
647                    );
648
649                    assert_eq!(
650                        header,
651                        signed_session_outcome.session_outcome.header(session_index),
652                        "Consensus Failure: header disagrees with federation consensus"
653                    );
654
655                    return Some(signed_session_outcome);
656                },
657                result = signed_outcomes_receiver.recv() => {
658                    let (peer_id, p2p_outcome) = result.ok()?;
659
660                    if self.validate_signed_session_outcome(&p2p_outcome, session_index) {
661                        assert_eq!(
662                            header,
663                            p2p_outcome.session_outcome.header(session_index),
664                            "Consensus Failure: header disagrees with federation consensus"
665                        );
666
667                        info!(
668                            target: LOG_CONSENSUS,
669                            session_index,
670                            %peer_id,
671                            "Recovered session via P2P while collecting signatures"
672                        );
673
674                        return Some(p2p_outcome);
675                    }
676
677                    debug!(
678                        target: LOG_CONSENSUS,
679                        %peer_id,
680                        "Invalid P2P SignedSessionOutcome"
681                    );
682                }
683                _ = signature_broadcast_interval.tick() => {
684                    // TODO: start sending the new messages in 0.11.0
685                    // connections.send(
686                    //     Recipient::Everyone,
687                    //     P2PMessage::SessionSignature(our_signature),
688                    // );
689                }
690                _ = index_broadcast_interval.tick() => {
691                    // TODO: start sending the new messages in 0.11.0
692                    // connections.send(
693                    //     Recipient::Peer(self.random_peer()),
694                    //     P2PMessage::SessionIndex(session_index),
695                    // );
696                }
697            }
698        }
699
700        info!(
701            target: LOG_CONSENSUS,
702            session_index,
703            "Successfully collected threshold of signatures"
704        );
705
706        Some(SignedSessionOutcome {
707            session_outcome,
708            signatures,
709        })
710    }
711
712    /// Returns a random peer ID excluding ourselves
713    #[allow(unused)]
714    fn random_peer(&self) -> PeerId {
715        self.num_peers()
716            .peer_ids()
717            .filter(|p| *p != self.identity())
718            .choose(&mut rand::thread_rng())
719            .expect("We have at least three peers")
720    }
721
722    /// Validate a SignedSessionOutcome received via P2P
723    fn validate_signed_session_outcome(
724        &self,
725        outcome: &SignedSessionOutcome,
726        session_index: u64,
727    ) -> bool {
728        if outcome.signatures.len() != self.num_peers().threshold() {
729            return false;
730        }
731
732        let keychain = Keychain::new(&self.cfg);
733        let header = outcome.session_outcome.header(session_index);
734
735        outcome
736            .signatures
737            .iter()
738            .all(|(signer_id, sig)| keychain.verify_schnorr(&header, sig, *signer_id))
739    }
740
741    fn decoders(&self) -> ModuleDecoderRegistry {
742        self.modules.decoder_registry()
743    }
744
745    pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
746        self.db
747            .begin_transaction_nc()
748            .await
749            .find_by_prefix(&AcceptedItemPrefix)
750            .await
751            .map(|entry| entry.1)
752            .collect()
753            .await
754    }
755
756    pub async fn complete_session(
757        &self,
758        session_index: u64,
759        signed_session_outcome: SignedSessionOutcome,
760    ) {
761        let mut dbtx = self.db.begin_transaction().await;
762
763        dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
764
765        dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
766
767        if dbtx
768            .insert_entry(
769                &SignedSessionOutcomeKey(session_index),
770                &signed_session_outcome,
771            )
772            .await
773            .is_some()
774        {
775            panic!("We tried to overwrite a signed session outcome");
776        }
777
778        dbtx.commit_tx_result()
779            .await
780            .expect("This is the only place where we write to this key");
781    }
782
783    /// Returns the full path where the database checkpoints are stored.
784    fn db_checkpoints_dir(&self) -> PathBuf {
785        self.data_dir.join(DB_CHECKPOINTS_DIR)
786    }
787
788    /// Creates the directory within the data directory for storing the database
789    /// checkpoints or deletes checkpoints before `current_session` -
790    /// `checkpoint_retention`.
791    fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
792        let checkpoint_dir = self.db_checkpoints_dir();
793
794        if checkpoint_dir.exists() {
795            debug!(
796                target: LOG_CONSENSUS,
797                ?current_session,
798                "Removing database checkpoints up to `current_session`"
799            );
800
801            for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
802                // Validate that the directory is a session index
803                if let Ok(file_name) = checkpoint.file_name().into_string()
804                    && let Ok(session) = file_name.parse::<u64>()
805                    && current_session >= self.db_checkpoint_retention
806                    && session < current_session - self.db_checkpoint_retention
807                {
808                    fs::remove_dir_all(checkpoint.path())?;
809                }
810            }
811        } else {
812            fs::create_dir_all(&checkpoint_dir)?;
813        }
814
815        Ok(())
816    }
817
818    /// Creates a backup of the database in the checkpoint directory. These
819    /// checkpoints can be used to restore the database in case the
820    /// federation falls out of consensus (recommended for experts only).
821    fn checkpoint_database(&self, session_index: u64) {
822        // If `checkpoint_retention` has been turned off, don't checkpoint the database
823        // at all.
824        if self.db_checkpoint_retention == 0 {
825            return;
826        }
827
828        let checkpoint_dir = self.db_checkpoints_dir();
829        let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
830
831        {
832            let _timing /* logs on drop */ = timing::TimeReporter::new("database-checkpoint").level(Level::TRACE);
833            match self.db.checkpoint(&session_checkpoint_dir) {
834                Ok(()) => {
835                    debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
836                }
837                Err(err) => {
838                    warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, err = %err.fmt_compact(), "Could not create db checkpoint");
839                }
840            }
841        }
842
843        {
844            // Check if any old checkpoint need to be cleaned up
845            let _timing /* logs on drop */ = timing::TimeReporter::new("remove-database-checkpoint").level(Level::TRACE);
846            if let Err(err) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
847                warn!(target: LOG_CONSENSUS, err = %err.fmt_compact_anyhow(), "Could not delete old checkpoints");
848            }
849        }
850    }
851
852    /// Deletes the database checkpoint directory equal to `session_index` -
853    /// `checkpoint_retention`
854    fn delete_old_database_checkpoint(
855        &self,
856        session_index: u64,
857        checkpoint_dir: &Path,
858    ) -> anyhow::Result<()> {
859        if self.db_checkpoint_retention > session_index {
860            return Ok(());
861        }
862
863        let delete_session_index = session_index - self.db_checkpoint_retention;
864        let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
865        if checkpoint_to_delete.exists() {
866            fs::remove_dir_all(checkpoint_to_delete)?;
867        }
868
869        Ok(())
870    }
871
872    #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
873    pub async fn process_consensus_item(
874        &self,
875        session_index: u64,
876        item_index: u64,
877        item: ConsensusItem,
878        peer: PeerId,
879    ) -> anyhow::Result<()> {
880        let _timing /* logs on drop */ = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
881
882        let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
883            .with_label_values(&[&peer.to_usize().to_string()])
884            .start_timer();
885
886        trace!(
887            target: LOG_CONSENSUS,
888            %peer,
889            item = ?DebugConsensusItem(&item),
890            "Processing consensus item"
891        );
892
893        self.ci_status_senders
894            .get(&peer)
895            .expect("No ci status sender for peer")
896            .send_replace(Some(session_index));
897
898        CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
899            .with_label_values(&[
900                &self.cfg.local.identity.to_usize().to_string(),
901                &peer.to_usize().to_string(),
902            ])
903            .set(session_index as i64);
904
905        let mut dbtx = self.db.begin_transaction().await;
906
907        dbtx.ignore_uncommitted();
908
909        // When we recover from a mid-session crash aleph bft will replay the units that
910        // were already processed before the crash. We therefore skip all consensus
911        // items until we have seen every previously accepted items again.
912        if let Some(existing_item) = dbtx
913            .get_value(&AcceptedItemKey(item_index.to_owned()))
914            .await
915        {
916            if existing_item.item == item && existing_item.peer == peer {
917                return Ok(());
918            }
919
920            bail!(
921                "Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}",
922                existing_item.peer
923            );
924        }
925
926        self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
927            .await
928            .inspect_err(|err| {
929                // Rejected items are very common, so only trace level
930                trace!(
931                    target: LOG_CONSENSUS,
932                    %peer,
933                    item = ?DebugConsensusItem(&item),
934                    err = %err.fmt_compact_anyhow(),
935                    "Rejected consensus item"
936                );
937            })?;
938
939        // After this point we have to commit the database transaction since the
940        // item has been fully processed without errors
941        dbtx.warn_uncommitted();
942
943        dbtx.insert_entry(
944            &AcceptedItemKey(item_index),
945            &AcceptedItem {
946                item: item.clone(),
947                peer,
948            },
949        )
950        .await;
951
952        debug!(
953            target: LOG_CONSENSUS,
954            %peer,
955            item = ?DebugConsensusItem(&item),
956            "Processed consensus item"
957        );
958        let mut audit = Audit::default();
959
960        for (module_instance_id, kind, module) in self.modules.iter_modules() {
961            let _module_audit_timing =
962                TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
963
964            let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
965                .with_label_values(&[
966                    MODULE_INSTANCE_ID_GLOBAL.to_string().as_str(),
967                    kind.as_str(),
968                ])
969                .start_timer();
970
971            module
972                .audit(
973                    &mut dbtx
974                        .to_ref_with_prefix_module_id(module_instance_id)
975                        .0
976                        .into_nc(),
977                    &mut audit,
978                    module_instance_id,
979                )
980                .await;
981
982            timing_prom.observe_duration();
983        }
984
985        assert!(
986            audit
987                .net_assets()
988                .expect("Overflow while checking balance sheet")
989                .milli_sat
990                >= 0,
991            "Balance sheet of the fed has gone negative, this should never happen! {audit}"
992        );
993
994        dbtx.commit_tx_result()
995            .await
996            .expect("Committing consensus epoch failed");
997
998        CONSENSUS_ITEMS_PROCESSED_TOTAL
999            .with_label_values(&[&peer.to_usize().to_string()])
1000            .inc();
1001
1002        timing_prom.observe_duration();
1003
1004        Ok(())
1005    }
1006
1007    async fn process_consensus_item_with_db_transaction(
1008        &self,
1009        dbtx: &mut DatabaseTransaction<'_>,
1010        consensus_item: ConsensusItem,
1011        peer_id: PeerId,
1012    ) -> anyhow::Result<()> {
1013        // We rely on decoding rejecting any unknown module instance ids to avoid
1014        // peer-triggered panic here
1015        self.decoders().assert_reject_mode();
1016
1017        match consensus_item {
1018            ConsensusItem::Module(module_item) => {
1019                let instance_id = module_item.module_instance_id();
1020
1021                let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
1022
1023                self.modules
1024                    .get_expect(instance_id)
1025                    .process_consensus_item(module_dbtx, &module_item, peer_id)
1026                    .await
1027            }
1028            ConsensusItem::Transaction(transaction) => {
1029                let txid = transaction.tx_hash();
1030                if dbtx
1031                    .get_value(&AcceptedTransactionKey(txid))
1032                    .await
1033                    .is_some()
1034                {
1035                    debug!(
1036                        target: LOG_CONSENSUS,
1037                        %txid,
1038                        "Transaction already accepted"
1039                    );
1040                    bail!("Transaction is already accepted");
1041                }
1042
1043                let modules_ids = transaction
1044                    .outputs
1045                    .iter()
1046                    .map(DynOutput::module_instance_id)
1047                    .collect::<Vec<_>>();
1048
1049                process_transaction_with_dbtx(
1050                    self.modules.clone(),
1051                    dbtx,
1052                    &transaction,
1053                    self.cfg.consensus.version,
1054                    TxProcessingMode::Consensus,
1055                )
1056                .await
1057                .map_err(|error| anyhow!(error.to_string()))?;
1058
1059                debug!(target: LOG_CONSENSUS, %txid,  "Transaction accepted");
1060                dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
1061                    .await;
1062
1063                Ok(())
1064            }
1065            ConsensusItem::Default { variant, .. } => {
1066                warn!(
1067                    target: LOG_CONSENSUS,
1068                    "Minor consensus version mismatch: unexpected consensus item type: {variant}"
1069                );
1070
1071                panic!("Unexpected consensus item type: {variant}")
1072            }
1073        }
1074    }
1075
1076    async fn request_signed_session_outcome(
1077        &self,
1078        federation_api: &DynGlobalApi,
1079        index: u64,
1080    ) -> SignedSessionOutcome {
1081        let decoders = self.decoders();
1082        let keychain = Keychain::new(&self.cfg);
1083        let threshold = self.num_peers().threshold();
1084
1085        let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
1086            let signed_session_outcome = response
1087                .try_into_inner(&decoders)
1088                .map_err(|x| ServerError::ResponseDeserialization(x.into()))?;
1089            let header = signed_session_outcome.session_outcome.header(index);
1090            if signed_session_outcome.signatures.len() == threshold
1091                && signed_session_outcome
1092                    .signatures
1093                    .iter()
1094                    .all(|(peer_id, sig)| keychain.verify_schnorr(&header, sig, *peer_id))
1095            {
1096                Ok(signed_session_outcome)
1097            } else {
1098                Err(ServerError::InvalidResponse(anyhow!("Invalid signatures")))
1099            }
1100        };
1101
1102        federation_api
1103            .request_with_strategy_retry(
1104                FilterMap::new(filter_map.clone()),
1105                AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
1106                ApiRequestErased::new(index),
1107            )
1108            .await
1109    }
1110
1111    /// Returns the number of sessions already saved in the database. This count
1112    /// **does not** include the currently running session.
1113    async fn get_finished_session_count(&self) -> u64 {
1114        get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
1115    }
1116}
1117
1118pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
1119    dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
1120        .await
1121        .next()
1122        .await
1123        .map_or(0, |entry| (entry.0.0) + 1)
1124}