fedimint_server/consensus/
engine.rs

1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use aleph_bft::Keychain as KeychainTrait;
8use anyhow::{anyhow, bail};
9use async_channel::Receiver;
10use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, PeerError};
11use fedimint_api_client::query::FilterMap;
12use fedimint_core::config::P2PMessage;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::module::audit::Audit;
19use fedimint_core::module::registry::ModuleDecoderRegistry;
20use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
21use fedimint_core::net::peers::DynP2PConnections;
22use fedimint_core::runtime::spawn;
23use fedimint_core::session_outcome::{
24    AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome,
25};
26use fedimint_core::task::{TaskGroup, TaskHandle, sleep};
27use fedimint_core::timing::TimeReporter;
28use fedimint_core::util::FmtCompact as _;
29use fedimint_core::{NumPeers, NumPeersExt, PeerId, timing};
30use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
31use futures::StreamExt;
32use rand::Rng;
33use tokio::sync::watch;
34use tracing::{Level, debug, info, instrument, trace, warn};
35
36use crate::LOG_CONSENSUS;
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{DataProvider, UnitData};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::to_node_index;
45use crate::consensus::db::{
46    AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47    SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
51use crate::metrics::{
52    CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
53    CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ITEMS_PROCESSED_TOTAL,
54    CONSENSUS_ORDERING_LATENCY_SECONDS, CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX,
55    CONSENSUS_SESSION_COUNT,
56};
57
58// The name of the directory where the database checkpoints are stored.
59const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61/// Runs the main server consensus loop
62pub struct ConsensusEngine {
63    pub modules: ServerModuleRegistry,
64    pub db: Database,
65    pub federation_api: DynGlobalApi,
66    pub cfg: ServerConfig,
67    pub submission_receiver: Receiver<ConsensusItem>,
68    pub shutdown_receiver: watch::Receiver<Option<u64>>,
69    pub connections: DynP2PConnections<P2PMessage>,
70    pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
71    pub ord_latency_sender: watch::Sender<Option<Duration>>,
72    pub task_group: TaskGroup,
73    pub data_dir: PathBuf,
74    pub checkpoint_retention: u64,
75}
76
77impl ConsensusEngine {
78    fn num_peers(&self) -> NumPeers {
79        self.cfg.consensus.broadcast_public_keys.to_num_peers()
80    }
81
82    fn identity(&self) -> PeerId {
83        self.cfg.local.identity
84    }
85
86    #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
87    pub async fn run(self) -> anyhow::Result<()> {
88        if self.num_peers().total() == 1 {
89            self.run_single_guardian(self.task_group.make_handle())
90                .await
91        } else {
92            self.run_consensus(self.task_group.make_handle()).await
93        }
94    }
95
96    pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
97        assert_eq!(self.num_peers(), NumPeers::from(1));
98
99        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
100
101        while !task_handle.is_shutting_down() {
102            let session_index = self.get_finished_session_count().await;
103
104            CONSENSUS_SESSION_COUNT.set(session_index as i64);
105
106            let mut item_index = self.pending_accepted_items().await.len() as u64;
107
108            let session_start_time = std::time::Instant::now();
109
110            while let Ok(item) = self.submission_receiver.recv().await {
111                if self
112                    .process_consensus_item(session_index, item_index, item, self.identity())
113                    .await
114                    .is_ok()
115                {
116                    item_index += 1;
117                }
118
119                // we rely on the module consensus items to notice the timeout
120                if session_start_time.elapsed() > Duration::from_secs(60) {
121                    break;
122                }
123            }
124
125            let session_outcome = SessionOutcome {
126                items: self.pending_accepted_items().await,
127            };
128
129            let header = session_outcome.header(session_index);
130            let signature = Keychain::new(&self.cfg).sign(&header);
131            let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
132
133            self.complete_session(
134                session_index,
135                SignedSessionOutcome {
136                    session_outcome,
137                    signatures,
138                },
139            )
140            .await;
141
142            self.checkpoint_database(session_index);
143
144            info!(target: LOG_CONSENSUS, "Session {session_index} completed");
145
146            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
147                break;
148            }
149        }
150
151        info!(target: LOG_CONSENSUS, "Consensus task shut down");
152
153        Ok(())
154    }
155
156    pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
157        // We need four peers to run the atomic broadcast
158        assert!(self.num_peers().total() >= 4);
159
160        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
161
162        while !task_handle.is_shutting_down() {
163            let session_index = self.get_finished_session_count().await;
164
165            CONSENSUS_SESSION_COUNT.set(session_index as i64);
166
167            info!(target: LOG_CONSENSUS, session_index, "Starting consensus session");
168
169            self.run_session(self.connections.clone(), session_index)
170                .await?;
171
172            info!(target: LOG_CONSENSUS, session_index, "Completed consensus session");
173
174            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
175                info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
176
177                sleep(Duration::from_secs(60)).await;
178
179                break;
180            }
181        }
182
183        info!(target: LOG_CONSENSUS, "Consensus task shut down");
184
185        Ok(())
186    }
187
188    pub async fn run_session(
189        &self,
190        connections: DynP2PConnections<P2PMessage>,
191        session_index: u64,
192    ) -> anyhow::Result<()> {
193        // In order to bound a sessions RAM consumption we need to bound its number of
194        // units and therefore its number of rounds. Since we use a session to
195        // create a naive secp256k1 threshold signature for the header of session
196        // outcome we have to guarantee that an attacker cannot exhaust our
197        // memory by preventing the creation of a threshold signature, thereby
198        // keeping the session open indefinitely. Hence, after a certain round
199        // index, we increase the delay between rounds exponentially such that
200        // the end of the aleph bft session would only be reached after a minimum
201        // of 10 years. In case of such an attack the broadcast stops ordering any
202        // items until the attack subsides as no items are ordered while the
203        // signatures are collected. The maximum RAM consumption of the aleph bft
204        // broadcast instance is therefore bound by:
205        //
206        // self.keychain.peer_count()
207        //      * (broadcast_rounds_per_session + EXP_SLOWDOWN_ROUNDS)
208        //      * ALEPH_BFT_UNIT_BYTE_LIMIT
209
210        const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
211        const BASE: f64 = 1.02;
212
213        let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
214        let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
215
216        let mut delay_config = aleph_bft::default_delay_config();
217
218        delay_config.unit_creation_delay = Arc::new(move |round_index| {
219            let delay = if round_index == 0 {
220                0.0
221            } else {
222                round_delay
223                    * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
224                    * rand::thread_rng().gen_range(0.5..=1.5)
225            };
226
227            Duration::from_millis(delay.round() as u64)
228        });
229
230        let config = aleph_bft::create_config(
231            self.num_peers().total().into(),
232            self.identity().to_usize().into(),
233            session_index,
234            self.cfg
235                .consensus
236                .broadcast_rounds_per_session
237                .checked_add(EXP_SLOWDOWN_ROUNDS)
238                .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
239            delay_config,
240            Duration::from_secs(10 * 365 * 24 * 60 * 60),
241        )
242        .expect("The exponential slowdown exceeds 10 years");
243
244        // we can use an unbounded channel here since the number and size of units
245        // ordered in a single aleph session is bounded as described above
246        let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
247        let (signature_sender, signature_receiver) = watch::channel(None);
248        let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
249        let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
250
251        let aleph_handle = spawn(
252            "aleph run session",
253            aleph_bft::run_session(
254                config,
255                aleph_bft::LocalIO::new(
256                    DataProvider::new(
257                        self.submission_receiver.clone(),
258                        signature_receiver,
259                        timestamp_sender,
260                        self.is_recovery().await,
261                    ),
262                    FinalizationHandler::new(unit_data_sender),
263                    BackupWriter::new(self.db.clone()).await,
264                    BackupReader::new(self.db.clone()),
265                ),
266                Network::new(connections),
267                Keychain::new(&self.cfg),
268                Spawner::new(self.task_group.make_subgroup()),
269                aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
270            ),
271        );
272
273        self.ord_latency_sender.send_replace(None);
274
275        let signed_session_outcome = self
276            .complete_signed_session_outcome(
277                session_index,
278                unit_data_receiver,
279                signature_sender,
280                timestamp_receiver,
281            )
282            .await?;
283
284        // We can terminate the session instead of waiting for other peers to complete
285        // it since they can always download the signed session outcome from us
286        terminator_sender.send(()).ok();
287        aleph_handle.await.ok();
288
289        // This method removes the backup of the current session from the database
290        // and therefore has to be called after we have waited for the session to
291        // shut down, or we risk write-write conflicts with the UnitSaver
292        self.complete_session(session_index, signed_session_outcome)
293            .await;
294
295        self.checkpoint_database(session_index);
296
297        Ok(())
298    }
299
300    async fn is_recovery(&self) -> bool {
301        self.db
302            .begin_transaction_nc()
303            .await
304            .find_by_prefix(&AlephUnitsPrefix)
305            .await
306            .next()
307            .await
308            .is_some()
309    }
310
311    pub async fn complete_signed_session_outcome(
312        &self,
313        session_index: u64,
314        ordered_unit_receiver: Receiver<OrderedUnit>,
315        signature_sender: watch::Sender<Option<SchnorrSignature>>,
316        timestamp_receiver: Receiver<Instant>,
317    ) -> anyhow::Result<SignedSessionOutcome> {
318        // It is guaranteed that aleph bft will always replay all previously processed
319        // items from the current session from index zero
320        let mut item_index = 0;
321
322        let mut request_signed_session_outcome = Box::pin(async {
323            self.request_signed_session_outcome(&self.federation_api, session_index)
324                .await
325        });
326
327        // We build a session outcome out of the ordered batches until either we have
328        // processed broadcast_rounds_per_session rounds or a threshold signed
329        // session outcome is obtained from our peers
330        loop {
331            tokio::select! {
332                ordered_unit = ordered_unit_receiver.recv() => {
333                    let ordered_unit = ordered_unit?;
334
335                    if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
336                        break;
337                    }
338
339                    if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
340                        if ordered_unit.creator == self.identity() {
341                            match timestamp_receiver.try_recv() {
342                                Ok(timestamp) => {
343                                    let latency = match *self.ord_latency_sender.borrow() {
344                                        Some(latency) => (9 * latency +  timestamp.elapsed()) / 10,
345                                        None => timestamp.elapsed()
346                                    };
347
348                                    self.ord_latency_sender.send_replace(Some(latency));
349
350                                    CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
351                                }
352                                Err(err) => {
353                                    debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal in recovery");
354                                }
355                            }
356                        }
357
358                        if let Ok(items) = Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()){
359                            for item in items {
360                                if self.process_consensus_item(
361                                    session_index,
362                                    item_index,
363                                    item.clone(),
364                                    ordered_unit.creator
365                                ).await
366                                .is_ok() {
367                                    item_index += 1;
368                                }
369                            }
370                        }
371                    }
372                },
373                signed_session_outcome = &mut request_signed_session_outcome => {
374                    let pending_accepted_items = self.pending_accepted_items().await;
375
376                    // this panics if we have more accepted items than the signed session outcome
377                    let (processed, unprocessed) = signed_session_outcome
378                        .session_outcome
379                        .items
380                        .split_at(pending_accepted_items.len());
381
382                    assert!(
383                        processed.iter().eq(pending_accepted_items.iter()),
384                        "Consensus Failure: pending accepted items disagree with federation consensus"
385                    );
386
387                    for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
388                        if let Err(err) = self.process_consensus_item(
389                            session_index,
390                            item_index as u64,
391                            accepted_item.item.clone(),
392                            accepted_item.peer
393                        ).await {
394                            panic!(
395                                "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
396                                processed.len(),
397                                unprocessed.len(),
398                            );
399                        }
400                    }
401
402                    return Ok(signed_session_outcome);
403                }
404            }
405        }
406
407        let items = self.pending_accepted_items().await;
408
409        assert_eq!(item_index, items.len() as u64);
410
411        let session_outcome = SessionOutcome { items };
412
413        let header = session_outcome.header(session_index);
414
415        let keychain = Keychain::new(&self.cfg);
416
417        // We send our own signature to the data provider to be submitted to the atomic
418        // broadcast and collected by our peers
419        #[allow(clippy::disallowed_methods)]
420        signature_sender.send(Some(keychain.sign(&header)))?;
421
422        let mut signatures = BTreeMap::new();
423
424        let items_dump = tokio::sync::OnceCell::new();
425
426        // We collect the ordered signatures until we either obtain a threshold
427        // signature or a signed session outcome arrives from our peers
428        while signatures.len() < self.num_peers().threshold() {
429            tokio::select! {
430                ordered_unit = ordered_unit_receiver.recv() => {
431                    let ordered_unit = ordered_unit?;
432
433                    if let Some(UnitData::Signature(signature)) = ordered_unit.data {
434                        if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
435                            signatures.insert(ordered_unit.creator, signature);
436                        } else {
437                            warn!(target: LOG_CONSENSUS, "Consensus Failure: invalid header signature from {}", ordered_unit.creator);
438
439                            items_dump.get_or_init(|| async {
440                                for (idx, item) in session_outcome.items.iter().enumerate() {
441                                    info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
442                                }
443                            }).await;
444                        }
445                    }
446                }
447                signed_session_outcome = &mut request_signed_session_outcome => {
448                    assert_eq!(
449                        header,
450                        signed_session_outcome.session_outcome.header(session_index),
451                        "Consensus Failure: header disagrees with federation consensus"
452                    );
453
454                    return Ok(signed_session_outcome);
455                }
456            }
457        }
458
459        Ok(SignedSessionOutcome {
460            session_outcome,
461            signatures,
462        })
463    }
464
465    fn decoders(&self) -> ModuleDecoderRegistry {
466        self.modules.decoder_registry()
467    }
468
469    pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
470        self.db
471            .begin_transaction_nc()
472            .await
473            .find_by_prefix(&AcceptedItemPrefix)
474            .await
475            .map(|entry| entry.1)
476            .collect()
477            .await
478    }
479
480    pub async fn complete_session(
481        &self,
482        session_index: u64,
483        signed_session_outcome: SignedSessionOutcome,
484    ) {
485        let mut dbtx = self.db.begin_transaction().await;
486
487        dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
488
489        dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
490
491        if dbtx
492            .insert_entry(
493                &SignedSessionOutcomeKey(session_index),
494                &signed_session_outcome,
495            )
496            .await
497            .is_some()
498        {
499            panic!("We tried to overwrite a signed session outcome");
500        }
501
502        dbtx.commit_tx_result()
503            .await
504            .expect("This is the only place where we write to this key");
505    }
506
507    /// Returns the full path where the database checkpoints are stored.
508    fn db_checkpoints_dir(&self) -> PathBuf {
509        self.data_dir.join(DB_CHECKPOINTS_DIR)
510    }
511
512    /// Creates the directory within the data directory for storing the database
513    /// checkpoints or deletes checkpoints before `current_session` -
514    /// `checkpoint_retention`.
515    fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
516        let checkpoint_dir = self.db_checkpoints_dir();
517
518        if checkpoint_dir.exists() {
519            debug!(
520                target: LOG_CONSENSUS,
521                ?current_session,
522                "Removing database checkpoints up to `current_session`"
523            );
524
525            for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
526                // Validate that the directory is a session index
527                if let Ok(file_name) = checkpoint.file_name().into_string() {
528                    if let Ok(session) = file_name.parse::<u64>() {
529                        if current_session >= self.checkpoint_retention
530                            && session < current_session - self.checkpoint_retention
531                        {
532                            fs::remove_dir_all(checkpoint.path())?;
533                        }
534                    }
535                }
536            }
537        } else {
538            fs::create_dir_all(&checkpoint_dir)?;
539        }
540
541        Ok(())
542    }
543
544    /// Creates a backup of the database in the checkpoint directory. These
545    /// checkpoints can be used to restore the database in case the
546    /// federation falls out of consensus (recommended for experts only).
547    fn checkpoint_database(&self, session_index: u64) {
548        // If `checkpoint_retention` has been turned off, don't checkpoint the database
549        // at all.
550        if self.checkpoint_retention == 0 {
551            return;
552        }
553
554        let checkpoint_dir = self.db_checkpoints_dir();
555        let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
556
557        {
558            let _timing /* logs on drop */ = timing::TimeReporter::new("database-checkpoint").level(Level::TRACE);
559            match self.db.checkpoint(&session_checkpoint_dir) {
560                Ok(()) => {
561                    debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
562                }
563                Err(e) => {
564                    warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, ?e, "Could not create db checkpoint");
565                }
566            }
567        }
568
569        {
570            // Check if any old checkpoint need to be cleaned up
571            let _timing /* logs on drop */ = timing::TimeReporter::new("remove-database-checkpoint").level(Level::TRACE);
572            if let Err(e) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
573                warn!(target: LOG_CONSENSUS, ?e, "Could not delete old checkpoints");
574            }
575        }
576    }
577
578    /// Deletes the database checkpoint directory equal to `session_index` -
579    /// `checkpoint_retention`
580    fn delete_old_database_checkpoint(
581        &self,
582        session_index: u64,
583        checkpoint_dir: &Path,
584    ) -> anyhow::Result<()> {
585        if self.checkpoint_retention > session_index {
586            return Ok(());
587        }
588
589        let delete_session_index = session_index - self.checkpoint_retention;
590        let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
591        if checkpoint_to_delete.exists() {
592            fs::remove_dir_all(checkpoint_to_delete)?;
593        }
594
595        Ok(())
596    }
597
598    #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
599    pub async fn process_consensus_item(
600        &self,
601        session_index: u64,
602        item_index: u64,
603        item: ConsensusItem,
604        peer: PeerId,
605    ) -> anyhow::Result<()> {
606        let _timing /* logs on drop */ = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
607
608        let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
609            .with_label_values(&[&peer.to_usize().to_string()])
610            .start_timer();
611
612        trace!(
613            target: LOG_CONSENSUS,
614            %peer,
615            item = ?DebugConsensusItem(&item),
616            "Processing consensus item"
617        );
618
619        self.ci_status_senders
620            .get(&peer)
621            .expect("No ci status sender for peer {peer}")
622            .send_replace(Some(session_index));
623
624        CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
625            .with_label_values(&[
626                &self.cfg.local.identity.to_usize().to_string(),
627                &peer.to_usize().to_string(),
628            ])
629            .set(session_index as i64);
630
631        let mut dbtx = self.db.begin_transaction().await;
632
633        dbtx.ignore_uncommitted();
634
635        // When we recover from a mid-session crash aleph bft will replay the units that
636        // were already processed before the crash. We therefore skip all consensus
637        // items until we have seen every previously accepted items again.
638        if let Some(existing_item) = dbtx
639            .get_value(&AcceptedItemKey(item_index.to_owned()))
640            .await
641        {
642            if existing_item.item == item && existing_item.peer == peer {
643                return Ok(());
644            }
645
646            bail!(
647                "Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}",
648                existing_item.peer
649            );
650        }
651
652        self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
653            .await?;
654
655        // After this point we have to commit the database transaction since the
656        // item has been fully processed without errors
657        dbtx.warn_uncommitted();
658
659        dbtx.insert_entry(
660            &AcceptedItemKey(item_index),
661            &AcceptedItem {
662                item: item.clone(),
663                peer,
664            },
665        )
666        .await;
667
668        debug!(
669            target: LOG_CONSENSUS,
670            %peer,
671            item = ?DebugConsensusItem(&item),
672            "Processed consensus item"
673        );
674        let mut audit = Audit::default();
675
676        for (module_instance_id, kind, module) in self.modules.iter_modules() {
677            let _module_audit_timing =
678                TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
679
680            let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
681                .with_label_values(&[&MODULE_INSTANCE_ID_GLOBAL.to_string(), kind.as_str()])
682                .start_timer();
683
684            module
685                .audit(
686                    &mut dbtx
687                        .to_ref_with_prefix_module_id(module_instance_id)
688                        .0
689                        .into_nc(),
690                    &mut audit,
691                    module_instance_id,
692                )
693                .await;
694
695            timing_prom.observe_duration();
696        }
697
698        assert!(
699            audit
700                .net_assets()
701                .expect("Overflow while checking balance sheet")
702                .milli_sat
703                >= 0,
704            "Balance sheet of the fed has gone negative, this should never happen! {audit}"
705        );
706
707        dbtx.commit_tx_result()
708            .await
709            .expect("Committing consensus epoch failed");
710
711        CONSENSUS_ITEMS_PROCESSED_TOTAL
712            .with_label_values(&[&peer.to_usize().to_string()])
713            .inc();
714
715        timing_prom.observe_duration();
716
717        Ok(())
718    }
719
720    async fn process_consensus_item_with_db_transaction(
721        &self,
722        dbtx: &mut DatabaseTransaction<'_>,
723        consensus_item: ConsensusItem,
724        peer_id: PeerId,
725    ) -> anyhow::Result<()> {
726        // We rely on decoding rejecting any unknown module instance ids to avoid
727        // peer-triggered panic here
728        self.decoders().assert_reject_mode();
729
730        match consensus_item {
731            ConsensusItem::Module(module_item) => {
732                let instance_id = module_item.module_instance_id();
733
734                let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
735
736                self.modules
737                    .get_expect(instance_id)
738                    .process_consensus_item(module_dbtx, &module_item, peer_id)
739                    .await
740            }
741            ConsensusItem::Transaction(transaction) => {
742                let txid = transaction.tx_hash();
743                if dbtx
744                    .get_value(&AcceptedTransactionKey(txid))
745                    .await
746                    .is_some()
747                {
748                    debug!(
749                        target: LOG_CONSENSUS,
750                        %txid,
751                        "Transaction already accepted"
752                    );
753                    bail!("Transaction is already accepted");
754                }
755
756                let modules_ids = transaction
757                    .outputs
758                    .iter()
759                    .map(DynOutput::module_instance_id)
760                    .collect::<Vec<_>>();
761
762                process_transaction_with_dbtx(
763                    self.modules.clone(),
764                    dbtx,
765                    &transaction,
766                    self.cfg.consensus.version,
767                    TxProcessingMode::Consensus,
768                )
769                .await
770                .map_err(|error| anyhow!(error.to_string()))?;
771
772                debug!(target: LOG_CONSENSUS, %txid,  "Transaction accepted");
773                dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
774                    .await;
775
776                Ok(())
777            }
778            ConsensusItem::Default { variant, .. } => {
779                warn!(
780                    target: LOG_CONSENSUS,
781                    "Minor consensus version mismatch: unexpected consensus item type: {variant}"
782                );
783
784                panic!("Unexpected consensus item type: {variant}")
785            }
786        }
787    }
788
789    async fn request_signed_session_outcome(
790        &self,
791        federation_api: &DynGlobalApi,
792        index: u64,
793    ) -> SignedSessionOutcome {
794        let decoders = self.decoders();
795        let keychain = Keychain::new(&self.cfg);
796        let threshold = self.num_peers().threshold();
797
798        let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
799            let signed_session_outcome = response
800                .try_into_inner(&decoders)
801                .map_err(|x| PeerError::ResponseDeserialization(x.into()))?;
802            let header = signed_session_outcome.session_outcome.header(index);
803            if signed_session_outcome.signatures.len() == threshold
804                && signed_session_outcome
805                    .signatures
806                    .iter()
807                    .all(|(peer_id, sig)| keychain.verify(&header, sig, to_node_index(*peer_id)))
808            {
809                Ok(signed_session_outcome)
810            } else {
811                Err(PeerError::InvalidResponse(anyhow!("Invalid signatures")))
812            }
813        };
814
815        loop {
816            let result = federation_api
817                .request_with_strategy(
818                    FilterMap::new(filter_map.clone()),
819                    AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
820                    ApiRequestErased::new(index),
821                )
822                .await;
823
824            match result {
825                Ok(signed_session_outcome) => return signed_session_outcome,
826                Err(error) => {
827                    error.report_if_unusual("Requesting Session Outcome");
828                }
829            }
830        }
831    }
832
833    /// Returns the number of sessions already saved in the database. This count
834    /// **does not** include the currently running session.
835    async fn get_finished_session_count(&self) -> u64 {
836        get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
837    }
838}
839
840pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
841    dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
842        .await
843        .next()
844        .await
845        .map_or(0, |entry| (entry.0.0) + 1)
846}