fedimint_server/consensus/
engine.rs

1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use aleph_bft::Keychain as KeychainTrait;
8use anyhow::{anyhow, bail};
9use async_channel::Receiver;
10use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, ServerError};
11use fedimint_api_client::query::FilterMap;
12use fedimint_core::config::P2PMessage;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::module::audit::Audit;
19use fedimint_core::module::registry::ModuleDecoderRegistry;
20use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
21use fedimint_core::net::peers::DynP2PConnections;
22use fedimint_core::runtime::spawn;
23use fedimint_core::secp256k1::schnorr;
24use fedimint_core::session_outcome::{AcceptedItem, SessionOutcome, SignedSessionOutcome};
25use fedimint_core::task::{TaskGroup, TaskHandle, sleep};
26use fedimint_core::timing::TimeReporter;
27use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
28use fedimint_core::{NumPeers, NumPeersExt, PeerId, timing};
29use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
30use futures::StreamExt;
31use rand::Rng;
32use rand::seq::IteratorRandom;
33use tokio::sync::watch;
34use tracing::{Level, debug, error, info, instrument, trace, warn};
35
36use crate::LOG_CONSENSUS;
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{DataProvider, UnitData};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::to_node_index;
45use crate::consensus::db::{
46    AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47    SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
51use crate::metrics::{
52    CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
53    CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ITEMS_PROCESSED_TOTAL,
54    CONSENSUS_ORDERING_LATENCY_SECONDS, CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX,
55    CONSENSUS_SESSION_COUNT,
56};
57
58// The name of the directory where the database checkpoints are stored.
59const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61/// Runs the main server consensus loop
62pub struct ConsensusEngine {
63    pub modules: ServerModuleRegistry,
64    pub db: Database,
65    pub federation_api: DynGlobalApi,
66    pub cfg: ServerConfig,
67    pub submission_receiver: Receiver<ConsensusItem>,
68    pub shutdown_receiver: watch::Receiver<Option<u64>>,
69    pub connections: DynP2PConnections<P2PMessage>,
70    pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
71    pub ord_latency_sender: watch::Sender<Option<Duration>>,
72    pub task_group: TaskGroup,
73    pub data_dir: PathBuf,
74    pub db_checkpoint_retention: u64,
75}
76
77impl ConsensusEngine {
78    fn num_peers(&self) -> NumPeers {
79        self.cfg.consensus.broadcast_public_keys.to_num_peers()
80    }
81
82    fn identity(&self) -> PeerId {
83        self.cfg.local.identity
84    }
85
86    #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
87    pub async fn run(self) -> anyhow::Result<()> {
88        if self.num_peers().total() == 1 {
89            self.run_single_guardian(self.task_group.make_handle())
90                .await
91        } else {
92            self.run_consensus(self.task_group.make_handle()).await
93        }
94    }
95
96    pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
97        assert_eq!(self.num_peers(), NumPeers::from(1));
98
99        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
100
101        while !task_handle.is_shutting_down() {
102            let session_index = self.get_finished_session_count().await;
103
104            CONSENSUS_SESSION_COUNT.set(session_index as i64);
105
106            let mut item_index = self.pending_accepted_items().await.len() as u64;
107
108            let session_start_time = std::time::Instant::now();
109
110            while let Ok(item) = self.submission_receiver.recv().await {
111                if self
112                    .process_consensus_item(session_index, item_index, item, self.identity())
113                    .await
114                    .is_ok()
115                {
116                    item_index += 1;
117                }
118
119                // we rely on the module consensus items to notice the timeout
120                if session_start_time.elapsed() > Duration::from_secs(60) {
121                    break;
122                }
123            }
124
125            let session_outcome = SessionOutcome {
126                items: self.pending_accepted_items().await,
127            };
128
129            let header = session_outcome.header(session_index);
130            let signature = Keychain::new(&self.cfg).sign_schnorr(&header);
131            let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
132
133            self.complete_session(
134                session_index,
135                SignedSessionOutcome {
136                    session_outcome,
137                    signatures,
138                },
139            )
140            .await;
141
142            self.checkpoint_database(session_index);
143
144            info!(target: LOG_CONSENSUS, "Session {session_index} completed");
145
146            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
147                break;
148            }
149        }
150
151        info!(target: LOG_CONSENSUS, "Consensus task shut down");
152
153        Ok(())
154    }
155
156    pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
157        // We need four peers to run the atomic broadcast
158        assert!(self.num_peers().total() >= 4);
159
160        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
161
162        while !task_handle.is_shutting_down() {
163            let session_index = self.get_finished_session_count().await;
164
165            CONSENSUS_SESSION_COUNT.set(session_index as i64);
166
167            let is_recovery = self.is_recovery().await;
168
169            info!(
170                target: LOG_CONSENSUS,
171                session_index,
172                is_recovery,
173                "Starting consensus session"
174            );
175
176            if self
177                .run_session(self.connections.clone(), session_index)
178                .await
179                .is_none()
180            {
181                return Ok(());
182            }
183
184            info!(target: LOG_CONSENSUS, ?session_index, "Completed consensus session");
185
186            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
187                info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
188
189                sleep(Duration::from_secs(60)).await;
190
191                break;
192            }
193        }
194
195        info!(target: LOG_CONSENSUS, "Consensus task shut down");
196
197        Ok(())
198    }
199
200    pub async fn run_session(
201        &self,
202        connections: DynP2PConnections<P2PMessage>,
203        session_index: u64,
204    ) -> Option<()> {
205        // In order to bound a sessions RAM consumption we need to bound its number of
206        // units and therefore its number of rounds. Since we use a session to
207        // create a naive secp256k1 threshold signature for the header of session
208        // outcome we have to guarantee that an attacker cannot exhaust our
209        // memory by preventing the creation of a threshold signature, thereby
210        // keeping the session open indefinitely. Hence, after a certain round
211        // index, we increase the delay between rounds exponentially such that
212        // the end of the aleph bft session would only be reached after a minimum
213        // of 10 years. In case of such an attack the broadcast stops ordering any
214        // items until the attack subsides as no items are ordered while the
215        // signatures are collected. The maximum RAM consumption of the aleph bft
216        // broadcast instance is therefore bound by:
217        //
218        // self.keychain.peer_count()
219        //      * (broadcast_rounds_per_session + EXP_SLOWDOWN_ROUNDS)
220        //      * ALEPH_BFT_UNIT_BYTE_LIMIT
221
222        const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
223        const BASE: f64 = 1.02;
224
225        let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
226        let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
227
228        let mut delay_config = aleph_bft::default_delay_config();
229
230        delay_config.unit_creation_delay = Arc::new(move |round_index| {
231            let delay = if round_index == 0 {
232                0.0
233            } else {
234                round_delay
235                    * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
236                    * rand::thread_rng().gen_range(0.5..=1.5)
237            };
238
239            Duration::from_millis(delay.round() as u64)
240        });
241
242        let config = aleph_bft::create_config(
243            self.num_peers().total().into(),
244            self.identity().to_usize().into(),
245            session_index,
246            self.cfg
247                .consensus
248                .broadcast_rounds_per_session
249                .checked_add(EXP_SLOWDOWN_ROUNDS)
250                .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
251            delay_config,
252            Duration::from_secs(10 * 365 * 24 * 60 * 60),
253        )
254        .expect("The exponential slowdown exceeds 10 years");
255
256        // we can use an unbounded channel here since the number and size of units
257        // ordered in a single aleph session is bounded as described above
258        let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
259        let (signature_sender, signature_receiver) = watch::channel(None);
260        let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
261        let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
262
263        // Create channels for P2P session sync
264        let (signed_outcomes_sender, signed_outcomes_receiver) = async_channel::unbounded();
265        let (signatures_sender, signatures_receiver) = async_channel::unbounded();
266
267        let aleph_handle = spawn(
268            "aleph run session",
269            aleph_bft::run_session(
270                config,
271                aleph_bft::LocalIO::new(
272                    DataProvider::new(
273                        self.submission_receiver.clone(),
274                        signature_receiver,
275                        timestamp_sender,
276                        self.is_recovery().await,
277                    ),
278                    FinalizationHandler::new(unit_data_sender),
279                    BackupWriter::new(self.db.clone()).await,
280                    BackupReader::new(self.db.clone()),
281                ),
282                Network::new(
283                    connections.clone(),
284                    signed_outcomes_sender,
285                    signatures_sender,
286                    self.db.clone(),
287                ),
288                Keychain::new(&self.cfg),
289                Spawner::new(self.task_group.make_subgroup()),
290                aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
291            ),
292        );
293
294        self.ord_latency_sender.send_replace(None);
295
296        let signed_session_outcome = self
297            .complete_signed_session_outcome(
298                session_index,
299                unit_data_receiver,
300                signature_sender,
301                timestamp_receiver,
302                signed_outcomes_receiver,
303                signatures_receiver,
304                connections,
305            )
306            .await?;
307
308        assert!(
309            self.validate_signed_session_outcome(&signed_session_outcome, session_index),
310            "Our created signed session outcome fails validation"
311        );
312
313        info!(target: LOG_CONSENSUS, ?session_index, "Terminating Aleph BFT session");
314
315        // We can terminate the session instead of waiting for other peers to complete
316        // it since they can always download the signed session outcome from us
317        terminator_sender.send(()).ok();
318        aleph_handle.await.ok();
319
320        // This method removes the backup of the current session from the database
321        // and therefore has to be called after we have waited for the session to
322        // shut down, or we risk write-write conflicts with the UnitSaver
323        self.complete_session(session_index, signed_session_outcome)
324            .await;
325
326        self.checkpoint_database(session_index);
327
328        Some(())
329    }
330
331    async fn is_recovery(&self) -> bool {
332        self.db
333            .begin_transaction_nc()
334            .await
335            .find_by_prefix(&AlephUnitsPrefix)
336            .await
337            .next()
338            .await
339            .is_some()
340    }
341
342    #[allow(clippy::too_many_arguments)]
343    pub async fn complete_signed_session_outcome(
344        &self,
345        session_index: u64,
346        ordered_unit_receiver: Receiver<OrderedUnit>,
347        signature_sender: watch::Sender<Option<schnorr::Signature>>,
348        timestamp_receiver: Receiver<Instant>,
349        signed_outcomes_receiver: Receiver<(PeerId, SignedSessionOutcome)>,
350        signatures_receiver: Receiver<(PeerId, schnorr::Signature)>,
351        _connections: DynP2PConnections<P2PMessage>,
352    ) -> Option<SignedSessionOutcome> {
353        // It is guaranteed that aleph bft will always replay all previously processed
354        // items from the current session from index zero
355        let mut item_index = 0;
356
357        // We request the signed session outcome every three seconds from a random peer
358        let mut index_broadcast_interval = tokio::time::interval(Duration::from_secs(3));
359
360        let mut request_signed_session_outcome = Box::pin(async {
361            self.request_signed_session_outcome(&self.federation_api, session_index)
362                .await
363        });
364
365        // We build a session outcome out of the ordered batches until either we have
366        // processed broadcast_rounds_per_session rounds or a threshold signed
367        // session outcome is obtained from our peers
368        loop {
369            tokio::select! {
370                result = ordered_unit_receiver.recv() => {
371                    let ordered_unit = result.ok()?;
372
373                    if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
374                        info!(
375                            target: LOG_CONSENSUS,
376                            session_index,
377                            "Reached Aleph BFT round limit, stopping item collection"
378                        );
379                        break;
380                    }
381
382                    if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
383                        if ordered_unit.creator == self.identity() {
384                            match timestamp_receiver.try_recv() {
385                                Ok(timestamp) => {
386                                    let latency = match *self.ord_latency_sender.borrow() {
387                                        Some(latency) => (9 * latency +  timestamp.elapsed()) / 10,
388                                        None => timestamp.elapsed()
389                                    };
390
391                                    self.ord_latency_sender.send_replace(Some(latency));
392
393                                    CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
394                                }
395                                Err(err) => {
396                                    debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal in recovery");
397                                }
398                            }
399                        }
400
401                        match Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()) {
402                            Ok(items) => {
403                                for item in items {
404                                    if let Ok(()) = self.process_consensus_item(
405                                        session_index,
406                                        item_index,
407                                        item.clone(),
408                                        ordered_unit.creator
409                                    ).await {
410                                        item_index += 1;
411                                    }
412                                }
413                            }
414                            Err(err) => {
415                                error!(
416                                    target: LOG_CONSENSUS,
417                                    session_index,
418                                    peer = %ordered_unit.creator,
419                                    err = %err.fmt_compact(),
420                                    "Failed to decode consensus items from peer"
421                                );
422                            }
423                        }
424                    }
425                },
426                // TODO: remove this branch in 0.11.0
427                signed_session_outcome = &mut request_signed_session_outcome => {
428                    info!(
429                        target: LOG_CONSENSUS,
430                        ?session_index,
431                        "Recovered signed session outcome from peers while processing consensus items"
432                    );
433
434                    let pending_accepted_items = self.pending_accepted_items().await;
435
436                    // this panics if we have more accepted items than the signed session outcome
437                    let (processed, unprocessed) = signed_session_outcome
438                        .session_outcome
439                        .items
440                        .split_at(pending_accepted_items.len());
441
442                    info!(
443                        target: LOG_CONSENSUS,
444                        session_index,
445                        processed = %processed.len(),
446                        unprocessed = %unprocessed.len(),
447                        "Processing remaining items..."
448                    );
449
450                    assert!(
451                        processed.iter().eq(pending_accepted_items.iter()),
452                        "Consensus Failure: pending accepted items disagree with federation consensus"
453                    );
454
455                    for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
456                        if let Err(err) = self.process_consensus_item(
457                            session_index,
458                            item_index as u64,
459                            accepted_item.item.clone(),
460                            accepted_item.peer
461                        ).await {
462                            panic!(
463                                "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
464                                processed.len(),
465                                unprocessed.len(),
466                            );
467                        }
468                    }
469
470                    return Some(signed_session_outcome);
471                },
472                result = signed_outcomes_receiver.recv() => {
473                    let (peer_id, p2p_outcome) = result.ok()?;
474
475                    // Validate signatures
476                    if self.validate_signed_session_outcome(&p2p_outcome, session_index) {
477                        info!(
478                            target: LOG_CONSENSUS,
479                            session_index,
480                            peer_id = %peer_id,
481                            "Received SignedSessionOutcome via P2P while collection signatures"
482                        );
483
484                        let pending_accepted_items = self.pending_accepted_items().await;
485
486                        // this panics if we have more accepted items than the signed session outcome
487                        let (processed, unprocessed) = p2p_outcome
488                            .session_outcome
489                            .items
490                            .split_at(pending_accepted_items.len());
491
492                        info!(
493                            target: LOG_CONSENSUS,
494                            ?session_index,
495                            processed = %processed.len(),
496                            unprocessed = %unprocessed.len(),
497                            "Processing remaining items..."
498                        );
499
500                        assert!(
501                            processed.iter().eq(pending_accepted_items.iter()),
502                            "Consensus Failure: pending accepted items disagree with federation consensus"
503                        );
504
505                        for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
506                            if let Err(err) = self.process_consensus_item(
507                                session_index,
508                                item_index as u64,
509                                accepted_item.item.clone(),
510                                accepted_item.peer
511                            ).await {
512                                panic!(
513                                    "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
514                                    processed.len(),
515                                    unprocessed.len(),
516                                );
517                            }
518                        }
519
520                        info!(
521                            target: LOG_CONSENSUS,
522                            ?session_index,
523                            peer_id = %peer_id,
524                            "Successfully recovered session via P2P"
525                        );
526
527                        return Some(p2p_outcome);
528                    }
529
530                    debug!(
531                        target: LOG_CONSENSUS,
532                        %peer_id,
533                        "Invalid P2P SignedSessionOutcome"
534                    );
535                }
536                _ = index_broadcast_interval.tick() => {
537                    // TODO: start sending the new messages in 0.11.0
538                    // connections.send(
539                    //     Recipient::Peer(self.random_peer()),
540                    //     P2PMessage::SessionIndex(session_index),
541                    // );
542                }
543            }
544        }
545
546        let items = self.pending_accepted_items().await;
547
548        assert_eq!(item_index, items.len() as u64);
549
550        info!(target: LOG_CONSENSUS, ?session_index, ?item_index, "Processed all items for session");
551
552        let session_outcome = SessionOutcome { items };
553
554        let header = session_outcome.header(session_index);
555
556        info!(
557            target: LOG_CONSENSUS,
558            ?session_index,
559            "Signing session header..."
560        );
561
562        let keychain = Keychain::new(&self.cfg);
563
564        let our_signature = keychain.sign_schnorr(&header);
565
566        // Send our own signature to the data provider to be submitted to AlephBFT
567        #[allow(clippy::disallowed_methods)]
568        signature_sender.send(Some(our_signature)).ok()?;
569
570        let mut signatures = BTreeMap::from_iter([(self.identity(), our_signature)]);
571
572        let items_dump = tokio::sync::OnceCell::new();
573
574        // We request the session signature every second to all peers
575        let mut signature_broadcast_interval = tokio::time::interval(Duration::from_secs(1));
576
577        // We collect the ordered signatures until we either obtain a threshold
578        // signature or a signed session outcome arrives from our peers
579        while signatures.len() < self.num_peers().threshold() {
580            tokio::select! {
581                // TODO: remove this branch in 0.11.0
582                result = ordered_unit_receiver.recv() => {
583                    let ordered_unit = result.ok()?;
584
585                    if let Some(UnitData::Signature(signature)) = ordered_unit.data {
586                        info!(
587                            target: LOG_CONSENSUS,
588                            ?session_index,
589                            peer = %ordered_unit.creator,
590                            "Collected signature from peer via AlephBFT, verifying..."
591                        );
592
593                        if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
594                            signatures.insert(ordered_unit.creator, schnorr::Signature::from_slice(&signature).expect("AlephBFT signature is valid"));
595                        } else {
596                            error!(
597                                target: LOG_CONSENSUS,
598                                ?session_index,
599                                peer = %ordered_unit.creator,
600                                "Consensus Failure: invalid header signature from peer"
601                            );
602
603                            items_dump.get_or_init(|| async {
604                                for (idx, item) in session_outcome.items.iter().enumerate() {
605                                    info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
606                                }
607                            }).await;
608                        }
609                    }
610                }
611                result = signatures_receiver.recv() => {
612                    let (peer_id, signature) = result.ok()?;
613
614                    if keychain.verify_schnorr(&header, &signature, peer_id) {
615                        signatures.insert(peer_id, signature);
616
617                        info!(
618                            target: LOG_CONSENSUS,
619                            session_index,
620                            peer_id = %peer_id,
621                            "Collected signature from peer via P2P"
622                        );
623                    }
624
625                    debug!(
626                        target: LOG_CONSENSUS,
627                        session_index,
628                        peer_id = %peer_id,
629                        "Invalid P2P signature from peer"
630                    );
631                }
632                // TODO: remove this branch in 0.11.0
633                signed_session_outcome = &mut request_signed_session_outcome => {
634                    info!(
635                        target: LOG_CONSENSUS,
636                        ?session_index,
637                        "Recovered signed session outcome from peers while collecting signatures"
638                    );
639
640                    assert_eq!(
641                        header,
642                        signed_session_outcome.session_outcome.header(session_index),
643                        "Consensus Failure: header disagrees with federation consensus"
644                    );
645
646                    return Some(signed_session_outcome);
647                },
648                result = signed_outcomes_receiver.recv() => {
649                    let (peer_id, p2p_outcome) = result.ok()?;
650
651                    if self.validate_signed_session_outcome(&p2p_outcome, session_index) {
652                        assert_eq!(
653                            header,
654                            p2p_outcome.session_outcome.header(session_index),
655                            "Consensus Failure: header disagrees with federation consensus"
656                        );
657
658                        info!(
659                            target: LOG_CONSENSUS,
660                            session_index,
661                            %peer_id,
662                            "Recovered session via P2P while collecting signatures"
663                        );
664
665                        return Some(p2p_outcome);
666                    }
667
668                    debug!(
669                        target: LOG_CONSENSUS,
670                        %peer_id,
671                        "Invalid P2P SignedSessionOutcome"
672                    );
673                }
674                _ = signature_broadcast_interval.tick() => {
675                    // TODO: start sending the new messages in 0.11.0
676                    // connections.send(
677                    //     Recipient::Everyone,
678                    //     P2PMessage::SessionSignature(our_signature),
679                    // );
680                }
681                _ = index_broadcast_interval.tick() => {
682                    // TODO: start sending the new messages in 0.11.0
683                    // connections.send(
684                    //     Recipient::Peer(self.random_peer()),
685                    //     P2PMessage::SessionIndex(session_index),
686                    // );
687                }
688            }
689        }
690
691        info!(
692            target: LOG_CONSENSUS,
693            session_index,
694            "Successfully collected threshold of signatures"
695        );
696
697        Some(SignedSessionOutcome {
698            session_outcome,
699            signatures,
700        })
701    }
702
703    /// Returns a random peer ID excluding ourselves
704    #[allow(unused)]
705    fn random_peer(&self) -> PeerId {
706        self.num_peers()
707            .peer_ids()
708            .filter(|p| *p != self.identity())
709            .choose(&mut rand::thread_rng())
710            .expect("We have at least three peers")
711    }
712
713    /// Validate a SignedSessionOutcome received via P2P
714    fn validate_signed_session_outcome(
715        &self,
716        outcome: &SignedSessionOutcome,
717        session_index: u64,
718    ) -> bool {
719        if outcome.signatures.len() != self.num_peers().threshold() {
720            return false;
721        }
722
723        let keychain = Keychain::new(&self.cfg);
724        let header = outcome.session_outcome.header(session_index);
725
726        outcome
727            .signatures
728            .iter()
729            .all(|(signer_id, sig)| keychain.verify_schnorr(&header, sig, *signer_id))
730    }
731
732    fn decoders(&self) -> ModuleDecoderRegistry {
733        self.modules.decoder_registry()
734    }
735
736    pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
737        self.db
738            .begin_transaction_nc()
739            .await
740            .find_by_prefix(&AcceptedItemPrefix)
741            .await
742            .map(|entry| entry.1)
743            .collect()
744            .await
745    }
746
747    pub async fn complete_session(
748        &self,
749        session_index: u64,
750        signed_session_outcome: SignedSessionOutcome,
751    ) {
752        let mut dbtx = self.db.begin_transaction().await;
753
754        dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
755
756        dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
757
758        if dbtx
759            .insert_entry(
760                &SignedSessionOutcomeKey(session_index),
761                &signed_session_outcome,
762            )
763            .await
764            .is_some()
765        {
766            panic!("We tried to overwrite a signed session outcome");
767        }
768
769        dbtx.commit_tx_result()
770            .await
771            .expect("This is the only place where we write to this key");
772    }
773
774    /// Returns the full path where the database checkpoints are stored.
775    fn db_checkpoints_dir(&self) -> PathBuf {
776        self.data_dir.join(DB_CHECKPOINTS_DIR)
777    }
778
779    /// Creates the directory within the data directory for storing the database
780    /// checkpoints or deletes checkpoints before `current_session` -
781    /// `checkpoint_retention`.
782    fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
783        let checkpoint_dir = self.db_checkpoints_dir();
784
785        if checkpoint_dir.exists() {
786            debug!(
787                target: LOG_CONSENSUS,
788                ?current_session,
789                "Removing database checkpoints up to `current_session`"
790            );
791
792            for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
793                // Validate that the directory is a session index
794                if let Ok(file_name) = checkpoint.file_name().into_string()
795                    && let Ok(session) = file_name.parse::<u64>()
796                    && current_session >= self.db_checkpoint_retention
797                    && session < current_session - self.db_checkpoint_retention
798                {
799                    fs::remove_dir_all(checkpoint.path())?;
800                }
801            }
802        } else {
803            fs::create_dir_all(&checkpoint_dir)?;
804        }
805
806        Ok(())
807    }
808
809    /// Creates a backup of the database in the checkpoint directory. These
810    /// checkpoints can be used to restore the database in case the
811    /// federation falls out of consensus (recommended for experts only).
812    fn checkpoint_database(&self, session_index: u64) {
813        // If `checkpoint_retention` has been turned off, don't checkpoint the database
814        // at all.
815        if self.db_checkpoint_retention == 0 {
816            return;
817        }
818
819        let checkpoint_dir = self.db_checkpoints_dir();
820        let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
821
822        {
823            let _timing /* logs on drop */ = timing::TimeReporter::new("database-checkpoint").level(Level::TRACE);
824            match self.db.checkpoint(&session_checkpoint_dir) {
825                Ok(()) => {
826                    debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
827                }
828                Err(err) => {
829                    warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, err = %err.fmt_compact_anyhow(), "Could not create db checkpoint");
830                }
831            }
832        }
833
834        {
835            // Check if any old checkpoint need to be cleaned up
836            let _timing /* logs on drop */ = timing::TimeReporter::new("remove-database-checkpoint").level(Level::TRACE);
837            if let Err(err) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
838                warn!(target: LOG_CONSENSUS, err = %err.fmt_compact_anyhow(), "Could not delete old checkpoints");
839            }
840        }
841    }
842
843    /// Deletes the database checkpoint directory equal to `session_index` -
844    /// `checkpoint_retention`
845    fn delete_old_database_checkpoint(
846        &self,
847        session_index: u64,
848        checkpoint_dir: &Path,
849    ) -> anyhow::Result<()> {
850        if self.db_checkpoint_retention > session_index {
851            return Ok(());
852        }
853
854        let delete_session_index = session_index - self.db_checkpoint_retention;
855        let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
856        if checkpoint_to_delete.exists() {
857            fs::remove_dir_all(checkpoint_to_delete)?;
858        }
859
860        Ok(())
861    }
862
863    #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
864    pub async fn process_consensus_item(
865        &self,
866        session_index: u64,
867        item_index: u64,
868        item: ConsensusItem,
869        peer: PeerId,
870    ) -> anyhow::Result<()> {
871        let _timing /* logs on drop */ = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
872
873        let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
874            .with_label_values(&[&peer.to_usize().to_string()])
875            .start_timer();
876
877        trace!(
878            target: LOG_CONSENSUS,
879            %peer,
880            item = ?DebugConsensusItem(&item),
881            "Processing consensus item"
882        );
883
884        self.ci_status_senders
885            .get(&peer)
886            .expect("No ci status sender for peer")
887            .send_replace(Some(session_index));
888
889        CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
890            .with_label_values(&[
891                &self.cfg.local.identity.to_usize().to_string(),
892                &peer.to_usize().to_string(),
893            ])
894            .set(session_index as i64);
895
896        let mut dbtx = self.db.begin_transaction().await;
897
898        dbtx.ignore_uncommitted();
899
900        // When we recover from a mid-session crash aleph bft will replay the units that
901        // were already processed before the crash. We therefore skip all consensus
902        // items until we have seen every previously accepted items again.
903        if let Some(existing_item) = dbtx
904            .get_value(&AcceptedItemKey(item_index.to_owned()))
905            .await
906        {
907            if existing_item.item == item && existing_item.peer == peer {
908                return Ok(());
909            }
910
911            bail!(
912                "Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}",
913                existing_item.peer
914            );
915        }
916
917        self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
918            .await
919            .inspect_err(|err| {
920                // Rejected items are very common, so only trace level
921                trace!(
922                    target: LOG_CONSENSUS,
923                    %peer,
924                    item = ?DebugConsensusItem(&item),
925                    err = %err.fmt_compact_anyhow(),
926                    "Rejected consensus item"
927                );
928            })?;
929
930        // After this point we have to commit the database transaction since the
931        // item has been fully processed without errors
932        dbtx.warn_uncommitted();
933
934        dbtx.insert_entry(
935            &AcceptedItemKey(item_index),
936            &AcceptedItem {
937                item: item.clone(),
938                peer,
939            },
940        )
941        .await;
942
943        debug!(
944            target: LOG_CONSENSUS,
945            %peer,
946            item = ?DebugConsensusItem(&item),
947            "Processed consensus item"
948        );
949        let mut audit = Audit::default();
950
951        for (module_instance_id, kind, module) in self.modules.iter_modules() {
952            let _module_audit_timing =
953                TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
954
955            let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
956                .with_label_values(&[
957                    MODULE_INSTANCE_ID_GLOBAL.to_string().as_str(),
958                    kind.as_str(),
959                ])
960                .start_timer();
961
962            module
963                .audit(
964                    &mut dbtx
965                        .to_ref_with_prefix_module_id(module_instance_id)
966                        .0
967                        .into_nc(),
968                    &mut audit,
969                    module_instance_id,
970                )
971                .await;
972
973            timing_prom.observe_duration();
974        }
975
976        assert!(
977            audit
978                .net_assets()
979                .expect("Overflow while checking balance sheet")
980                .milli_sat
981                >= 0,
982            "Balance sheet of the fed has gone negative, this should never happen! {audit}"
983        );
984
985        dbtx.commit_tx_result()
986            .await
987            .expect("Committing consensus epoch failed");
988
989        CONSENSUS_ITEMS_PROCESSED_TOTAL
990            .with_label_values(&[&peer.to_usize().to_string()])
991            .inc();
992
993        timing_prom.observe_duration();
994
995        Ok(())
996    }
997
998    async fn process_consensus_item_with_db_transaction(
999        &self,
1000        dbtx: &mut DatabaseTransaction<'_>,
1001        consensus_item: ConsensusItem,
1002        peer_id: PeerId,
1003    ) -> anyhow::Result<()> {
1004        // We rely on decoding rejecting any unknown module instance ids to avoid
1005        // peer-triggered panic here
1006        self.decoders().assert_reject_mode();
1007
1008        match consensus_item {
1009            ConsensusItem::Module(module_item) => {
1010                let instance_id = module_item.module_instance_id();
1011
1012                let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
1013
1014                self.modules
1015                    .get_expect(instance_id)
1016                    .process_consensus_item(module_dbtx, &module_item, peer_id)
1017                    .await
1018            }
1019            ConsensusItem::Transaction(transaction) => {
1020                let txid = transaction.tx_hash();
1021                if dbtx
1022                    .get_value(&AcceptedTransactionKey(txid))
1023                    .await
1024                    .is_some()
1025                {
1026                    debug!(
1027                        target: LOG_CONSENSUS,
1028                        %txid,
1029                        "Transaction already accepted"
1030                    );
1031                    bail!("Transaction is already accepted");
1032                }
1033
1034                let modules_ids = transaction
1035                    .outputs
1036                    .iter()
1037                    .map(DynOutput::module_instance_id)
1038                    .collect::<Vec<_>>();
1039
1040                process_transaction_with_dbtx(
1041                    self.modules.clone(),
1042                    dbtx,
1043                    &transaction,
1044                    self.cfg.consensus.version,
1045                    TxProcessingMode::Consensus,
1046                )
1047                .await
1048                .map_err(|error| anyhow!(error.to_string()))?;
1049
1050                debug!(target: LOG_CONSENSUS, %txid,  "Transaction accepted");
1051                dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
1052                    .await;
1053
1054                Ok(())
1055            }
1056            ConsensusItem::Default { variant, .. } => {
1057                warn!(
1058                    target: LOG_CONSENSUS,
1059                    "Minor consensus version mismatch: unexpected consensus item type: {variant}"
1060                );
1061
1062                panic!("Unexpected consensus item type: {variant}")
1063            }
1064        }
1065    }
1066
1067    async fn request_signed_session_outcome(
1068        &self,
1069        federation_api: &DynGlobalApi,
1070        index: u64,
1071    ) -> SignedSessionOutcome {
1072        let decoders = self.decoders();
1073        let keychain = Keychain::new(&self.cfg);
1074        let threshold = self.num_peers().threshold();
1075
1076        let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
1077            let signed_session_outcome = response
1078                .try_into_inner(&decoders)
1079                .map_err(|x| ServerError::ResponseDeserialization(x.into()))?;
1080            let header = signed_session_outcome.session_outcome.header(index);
1081            if signed_session_outcome.signatures.len() == threshold
1082                && signed_session_outcome
1083                    .signatures
1084                    .iter()
1085                    .all(|(peer_id, sig)| keychain.verify_schnorr(&header, sig, *peer_id))
1086            {
1087                Ok(signed_session_outcome)
1088            } else {
1089                Err(ServerError::InvalidResponse(anyhow!("Invalid signatures")))
1090            }
1091        };
1092
1093        let mut backoff = fedimint_core::util::backoff_util::api_networking_backoff();
1094        loop {
1095            let result = federation_api
1096                .request_with_strategy(
1097                    FilterMap::new(filter_map.clone()),
1098                    AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
1099                    ApiRequestErased::new(index),
1100                )
1101                .await;
1102
1103            match result {
1104                Ok(signed_session_outcome) => return signed_session_outcome,
1105                Err(error) => {
1106                    error.report_if_unusual("Requesting Session Outcome");
1107                }
1108            }
1109
1110            sleep(backoff.next().expect("infinite retries")).await;
1111        }
1112    }
1113
1114    /// Returns the number of sessions already saved in the database. This count
1115    /// **does not** include the currently running session.
1116    async fn get_finished_session_count(&self) -> u64 {
1117        get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
1118    }
1119}
1120
1121pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
1122    dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
1123        .await
1124        .next()
1125        .await
1126        .map_or(0, |entry| (entry.0.0) + 1)
1127}