fedimint_server/consensus/
engine.rs

1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use aleph_bft::Keychain as KeychainTrait;
8use anyhow::{anyhow, bail};
9use async_channel::Receiver;
10use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, PeerError};
11use fedimint_api_client::query::FilterMap;
12use fedimint_core::config::P2PMessage;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::module::audit::Audit;
19use fedimint_core::module::registry::ModuleDecoderRegistry;
20use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
21use fedimint_core::net::peers::DynP2PConnections;
22use fedimint_core::runtime::spawn;
23use fedimint_core::session_outcome::{
24    AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome,
25};
26use fedimint_core::task::{TaskGroup, TaskHandle, sleep};
27use fedimint_core::timing::TimeReporter;
28use fedimint_core::util::FmtCompact as _;
29use fedimint_core::{NumPeers, NumPeersExt, PeerId, timing};
30use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
31use futures::StreamExt;
32use rand::Rng;
33use tokio::sync::watch;
34use tracing::{Level, debug, info, instrument, trace, warn};
35
36use crate::LOG_CONSENSUS;
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{DataProvider, UnitData, get_citem_bytes_chsum};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::to_node_index;
45use crate::consensus::db::{
46    AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47    SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
51use crate::metrics::{
52    CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
53    CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ITEMS_PROCESSED_TOTAL,
54    CONSENSUS_ORDERING_LATENCY_SECONDS, CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX,
55    CONSENSUS_SESSION_COUNT,
56};
57
58// The name of the directory where the database checkpoints are stored.
59const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61/// Runs the main server consensus loop
62pub struct ConsensusEngine {
63    pub modules: ServerModuleRegistry,
64    pub db: Database,
65    pub federation_api: DynGlobalApi,
66    pub cfg: ServerConfig,
67    pub submission_receiver: Receiver<ConsensusItem>,
68    pub shutdown_receiver: watch::Receiver<Option<u64>>,
69    pub connections: DynP2PConnections<P2PMessage>,
70    pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
71    /// Just a string version of `cfg.local.identity` for performance
72    pub self_id_str: String,
73    /// Just a string version of peer ids for performance
74    pub peer_id_str: Vec<String>,
75    pub task_group: TaskGroup,
76    pub data_dir: PathBuf,
77    pub checkpoint_retention: u64,
78}
79
80impl ConsensusEngine {
81    fn num_peers(&self) -> NumPeers {
82        self.cfg.consensus.broadcast_public_keys.to_num_peers()
83    }
84
85    fn identity(&self) -> PeerId {
86        self.cfg.local.identity
87    }
88
89    #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
90    pub async fn run(self) -> anyhow::Result<()> {
91        if self.num_peers().total() == 1 {
92            self.run_single_guardian(self.task_group.make_handle())
93                .await
94        } else {
95            self.run_consensus(self.task_group.make_handle()).await
96        }
97    }
98
99    pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
100        assert_eq!(self.num_peers(), NumPeers::from(1));
101
102        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
103
104        while !task_handle.is_shutting_down() {
105            let session_index = self.get_finished_session_count().await;
106
107            CONSENSUS_SESSION_COUNT.set(session_index as i64);
108
109            let mut item_index = self.pending_accepted_items().await.len() as u64;
110
111            let session_start_time = std::time::Instant::now();
112
113            while let Ok(item) = self.submission_receiver.recv().await {
114                if self
115                    .process_consensus_item(session_index, item_index, item, self.identity())
116                    .await
117                    .is_ok()
118                {
119                    item_index += 1;
120                }
121
122                // we rely on the module consensus items to notice the timeout
123                if session_start_time.elapsed() > Duration::from_secs(60) {
124                    break;
125                }
126            }
127
128            let session_outcome = SessionOutcome {
129                items: self.pending_accepted_items().await,
130            };
131
132            let header = session_outcome.header(session_index);
133            let signature = Keychain::new(&self.cfg).sign(&header);
134            let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
135
136            self.complete_session(
137                session_index,
138                SignedSessionOutcome {
139                    session_outcome,
140                    signatures,
141                },
142            )
143            .await;
144
145            self.checkpoint_database(session_index);
146
147            info!(target: LOG_CONSENSUS, "Session {session_index} completed");
148
149            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
150                break;
151            }
152        }
153
154        info!(target: LOG_CONSENSUS, "Consensus task shut down");
155
156        Ok(())
157    }
158
159    pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
160        // We need four peers to run the atomic broadcast
161        assert!(self.num_peers().total() >= 4);
162
163        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
164
165        while !task_handle.is_shutting_down() {
166            let session_index = self.get_finished_session_count().await;
167
168            CONSENSUS_SESSION_COUNT.set(session_index as i64);
169
170            info!(target: LOG_CONSENSUS, session_index, "Starting consensus session");
171
172            self.run_session(self.connections.clone(), session_index)
173                .await?;
174
175            info!(target: LOG_CONSENSUS, session_index, "Completed consensus session");
176
177            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
178                info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
179
180                sleep(Duration::from_secs(60)).await;
181
182                break;
183            }
184        }
185
186        info!(target: LOG_CONSENSUS, "Consensus task shut down");
187
188        Ok(())
189    }
190
191    pub async fn run_session(
192        &self,
193        connections: DynP2PConnections<P2PMessage>,
194        session_index: u64,
195    ) -> anyhow::Result<()> {
196        // In order to bound a sessions RAM consumption we need to bound its number of
197        // units and therefore its number of rounds. Since we use a session to
198        // create a naive secp256k1 threshold signature for the header of session
199        // outcome we have to guarantee that an attacker cannot exhaust our
200        // memory by preventing the creation of a threshold signature, thereby
201        // keeping the session open indefinitely. Hence, after a certain round
202        // index, we increase the delay between rounds exponentially such that
203        // the end of the aleph bft session would only be reached after a minimum
204        // of 10 years. In case of such an attack the broadcast stops ordering any
205        // items until the attack subsides as no items are ordered while the
206        // signatures are collected. The maximum RAM consumption of the aleph bft
207        // broadcast instance is therefore bound by:
208        //
209        // self.keychain.peer_count()
210        //      * (broadcast_rounds_per_session + EXP_SLOWDOWN_ROUNDS)
211        //      * ALEPH_BFT_UNIT_BYTE_LIMIT
212
213        const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
214        const BASE: f64 = 1.02;
215
216        let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
217        let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
218
219        let mut delay_config = aleph_bft::default_delay_config();
220
221        delay_config.unit_creation_delay = Arc::new(move |round_index| {
222            let delay = if round_index == 0 {
223                0.0
224            } else {
225                round_delay
226                    * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
227                    * rand::thread_rng().gen_range(0.5..=1.5)
228            };
229
230            Duration::from_millis(delay.round() as u64)
231        });
232
233        let config = aleph_bft::create_config(
234            self.num_peers().total().into(),
235            self.identity().to_usize().into(),
236            session_index,
237            self.cfg
238                .consensus
239                .broadcast_rounds_per_session
240                .checked_add(EXP_SLOWDOWN_ROUNDS)
241                .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
242            delay_config,
243            Duration::from_secs(10 * 365 * 24 * 60 * 60),
244        )
245        .expect("The exponential slowdown exceeds 10 years");
246
247        // we can use an unbounded channel here since the number and size of units
248        // ordered in a single aleph session is bounded as described above
249        let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
250        let (signature_sender, signature_receiver) = watch::channel(None);
251        let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
252        let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
253
254        let aleph_handle = spawn(
255            "aleph run session",
256            aleph_bft::run_session(
257                config,
258                aleph_bft::LocalIO::new(
259                    DataProvider::new(
260                        self.submission_receiver.clone(),
261                        signature_receiver,
262                        timestamp_sender,
263                    ),
264                    FinalizationHandler::new(unit_data_sender),
265                    BackupWriter::new(self.db.clone()).await,
266                    BackupReader::new(self.db.clone()),
267                ),
268                Network::new(connections),
269                Keychain::new(&self.cfg),
270                Spawner::new(self.task_group.make_subgroup()),
271                aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
272            ),
273        );
274
275        let signed_session_outcome = self
276            .complete_signed_session_outcome(
277                session_index,
278                unit_data_receiver,
279                signature_sender,
280                timestamp_receiver,
281            )
282            .await?;
283
284        // We can terminate the session instead of waiting for other peers to complete
285        // it since they can always download the signed session outcome from us
286        terminator_sender.send(()).ok();
287        aleph_handle.await.ok();
288
289        // This method removes the backup of the current session from the database
290        // and therefore has to be called after we have waited for the session to
291        // shut down, or we risk write-write conflicts with the UnitSaver
292        self.complete_session(session_index, signed_session_outcome)
293            .await;
294
295        self.checkpoint_database(session_index);
296
297        Ok(())
298    }
299
300    pub async fn complete_signed_session_outcome(
301        &self,
302        session_index: u64,
303        ordered_unit_receiver: Receiver<OrderedUnit>,
304        signature_sender: watch::Sender<Option<SchnorrSignature>>,
305        timestamp_receiver: Receiver<(Instant, u64)>,
306    ) -> anyhow::Result<SignedSessionOutcome> {
307        // It is guaranteed that aleph bft will always replay all previously processed
308        // items from the current session from index zero
309        let mut item_index = 0;
310
311        let mut request_signed_session_outcome = Box::pin(async {
312            self.request_signed_session_outcome(&self.federation_api, session_index)
313                .await
314        });
315
316        // We build a session outcome out of the ordered batches until either we have
317        // processed broadcast_rounds_per_session rounds or a threshold signed
318        // session outcome is obtained from our peers
319        loop {
320            tokio::select! {
321                ordered_unit = ordered_unit_receiver.recv() => {
322                    let ordered_unit = ordered_unit?;
323
324                    if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
325                        break;
326                    }
327
328                    if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
329                        if ordered_unit.creator == self.identity() {
330                            loop {
331                                 match timestamp_receiver.try_recv() {
332                                    Ok((timestamp, chsum)) => {
333                                        if get_citem_bytes_chsum(&bytes) == chsum {
334                                            CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
335                                            break;
336                                        }
337                                        warn!(target: LOG_CONSENSUS, "Not reporting ordering latency on possibly out of sync item");
338                                    }
339                                    Err(err) => {
340                                        debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal on start");
341                                        break;
342                                    }
343                                }
344                            }
345                        }
346
347                        if let Ok(items) = Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()){
348                            for item in items {
349                                if self.process_consensus_item(
350                                    session_index,
351                                    item_index,
352                                    item.clone(),
353                                    ordered_unit.creator
354                                ).await
355                                .is_ok() {
356                                    item_index += 1;
357                                }
358                            }
359                        }
360                    }
361                },
362                signed_session_outcome = &mut request_signed_session_outcome => {
363                    let pending_accepted_items = self.pending_accepted_items().await;
364
365                    // this panics if we have more accepted items than the signed session outcome
366                    let (processed, unprocessed) = signed_session_outcome
367                        .session_outcome
368                        .items
369                        .split_at(pending_accepted_items.len());
370
371                    assert!(
372                        processed.iter().eq(pending_accepted_items.iter()),
373                        "Consensus Failure: pending accepted items disagree with federation consensus"
374                    );
375
376                    for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
377                        if let Err(err) = self.process_consensus_item(
378                            session_index,
379                            item_index as u64,
380                            accepted_item.item.clone(),
381                            accepted_item.peer
382                        ).await {
383                            panic!(
384                                "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
385                                processed.len(),
386                                unprocessed.len(),
387                            );
388                        }
389                    }
390
391                    return Ok(signed_session_outcome);
392                }
393            }
394        }
395
396        let items = self.pending_accepted_items().await;
397
398        assert_eq!(item_index, items.len() as u64);
399
400        let session_outcome = SessionOutcome { items };
401
402        let header = session_outcome.header(session_index);
403
404        let keychain = Keychain::new(&self.cfg);
405
406        // We send our own signature to the data provider to be submitted to the atomic
407        // broadcast and collected by our peers
408        signature_sender.send(Some(keychain.sign(&header)))?;
409
410        let mut signatures = BTreeMap::new();
411
412        let items_dump = tokio::sync::OnceCell::new();
413
414        // We collect the ordered signatures until we either obtain a threshold
415        // signature or a signed session outcome arrives from our peers
416        while signatures.len() < self.num_peers().threshold() {
417            tokio::select! {
418                ordered_unit = ordered_unit_receiver.recv() => {
419                    let ordered_unit = ordered_unit?;
420
421                    if let Some(UnitData::Signature(signature)) = ordered_unit.data {
422                        if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
423                            signatures.insert(ordered_unit.creator, signature);
424                        } else {
425                            warn!(target: LOG_CONSENSUS, "Consensus Failure: invalid header signature from {}", ordered_unit.creator);
426
427                            items_dump.get_or_init(|| async {
428                                for (idx, item) in session_outcome.items.iter().enumerate() {
429                                    info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
430                                }
431                            }).await;
432                        }
433                    }
434                }
435                signed_session_outcome = &mut request_signed_session_outcome => {
436                    assert_eq!(
437                        header,
438                        signed_session_outcome.session_outcome.header(session_index),
439                        "Consensus Failure: header disagrees with federation consensus"
440                    );
441
442                    return Ok(signed_session_outcome);
443                }
444            }
445        }
446
447        Ok(SignedSessionOutcome {
448            session_outcome,
449            signatures,
450        })
451    }
452
453    fn decoders(&self) -> ModuleDecoderRegistry {
454        self.modules.decoder_registry()
455    }
456
457    pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
458        self.db
459            .begin_transaction_nc()
460            .await
461            .find_by_prefix(&AcceptedItemPrefix)
462            .await
463            .map(|entry| entry.1)
464            .collect()
465            .await
466    }
467
468    pub async fn complete_session(
469        &self,
470        session_index: u64,
471        signed_session_outcome: SignedSessionOutcome,
472    ) {
473        let mut dbtx = self.db.begin_transaction().await;
474
475        dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
476
477        dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
478
479        if dbtx
480            .insert_entry(
481                &SignedSessionOutcomeKey(session_index),
482                &signed_session_outcome,
483            )
484            .await
485            .is_some()
486        {
487            panic!("We tried to overwrite a signed session outcome");
488        }
489
490        dbtx.commit_tx_result()
491            .await
492            .expect("This is the only place where we write to this key");
493    }
494
495    /// Returns the full path where the database checkpoints are stored.
496    fn db_checkpoints_dir(&self) -> PathBuf {
497        self.data_dir.join(DB_CHECKPOINTS_DIR)
498    }
499
500    /// Creates the directory within the data directory for storing the database
501    /// checkpoints or deletes checkpoints before `current_session` -
502    /// `checkpoint_retention`.
503    fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
504        let checkpoint_dir = self.db_checkpoints_dir();
505
506        if checkpoint_dir.exists() {
507            debug!(
508                target: LOG_CONSENSUS,
509                ?current_session,
510                "Removing database checkpoints up to `current_session`"
511            );
512
513            for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
514                // Validate that the directory is a session index
515                if let Ok(file_name) = checkpoint.file_name().into_string() {
516                    if let Ok(session) = file_name.parse::<u64>() {
517                        if current_session >= self.checkpoint_retention
518                            && session < current_session - self.checkpoint_retention
519                        {
520                            fs::remove_dir_all(checkpoint.path())?;
521                        }
522                    }
523                }
524            }
525        } else {
526            fs::create_dir_all(&checkpoint_dir)?;
527        }
528
529        Ok(())
530    }
531
532    /// Creates a backup of the database in the checkpoint directory. These
533    /// checkpoints can be used to restore the database in case the
534    /// federation falls out of consensus (recommended for experts only).
535    fn checkpoint_database(&self, session_index: u64) {
536        // If `checkpoint_retention` has been turned off, don't checkpoint the database
537        // at all.
538        if self.checkpoint_retention == 0 {
539            return;
540        }
541
542        let checkpoint_dir = self.db_checkpoints_dir();
543        let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
544
545        {
546            let _timing /* logs on drop */ = timing::TimeReporter::new("database-checkpoint").level(Level::TRACE);
547            match self.db.checkpoint(&session_checkpoint_dir) {
548                Ok(()) => {
549                    debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
550                }
551                Err(e) => {
552                    warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, ?e, "Could not create db checkpoint");
553                }
554            }
555        }
556
557        {
558            // Check if any old checkpoint need to be cleaned up
559            let _timing /* logs on drop */ = timing::TimeReporter::new("remove-database-checkpoint").level(Level::TRACE);
560            if let Err(e) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
561                warn!(target: LOG_CONSENSUS, ?e, "Could not delete old checkpoints");
562            }
563        }
564    }
565
566    /// Deletes the database checkpoint directory equal to `session_index` -
567    /// `checkpoint_retention`
568    fn delete_old_database_checkpoint(
569        &self,
570        session_index: u64,
571        checkpoint_dir: &Path,
572    ) -> anyhow::Result<()> {
573        if self.checkpoint_retention > session_index {
574            return Ok(());
575        }
576
577        let delete_session_index = session_index - self.checkpoint_retention;
578        let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
579        if checkpoint_to_delete.exists() {
580            fs::remove_dir_all(checkpoint_to_delete)?;
581        }
582
583        Ok(())
584    }
585
586    #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
587    pub async fn process_consensus_item(
588        &self,
589        session_index: u64,
590        item_index: u64,
591        item: ConsensusItem,
592        peer: PeerId,
593    ) -> anyhow::Result<()> {
594        let peer_id_str = &self.peer_id_str[peer.to_usize()];
595        let _timing /* logs on drop */ = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
596        let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
597            .with_label_values(&[peer_id_str])
598            .start_timer();
599
600        trace!(
601            target: LOG_CONSENSUS,
602            %peer,
603            item = ?DebugConsensusItem(&item),
604            "Processing consensus item"
605        );
606
607        self.ci_status_senders
608            .get(&peer)
609            .expect("No ci status sender for peer {peer}")
610            .send(Some(session_index))
611            .inspect_err(|e| warn!(target: LOG_CONSENSUS, "Failed to update ci status {e}"))
612            .ok();
613
614        CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
615            .with_label_values(&[&self.self_id_str, peer_id_str])
616            .set(session_index as i64);
617
618        let mut dbtx = self.db.begin_transaction().await;
619
620        dbtx.ignore_uncommitted();
621
622        // When we recover from a mid-session crash aleph bft will replay the units that
623        // were already processed before the crash. We therefore skip all consensus
624        // items until we have seen every previously accepted items again.
625        if let Some(existing_item) = dbtx
626            .get_value(&AcceptedItemKey(item_index.to_owned()))
627            .await
628        {
629            if existing_item.item == item && existing_item.peer == peer {
630                return Ok(());
631            }
632
633            bail!(
634                "Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}",
635                existing_item.peer
636            );
637        }
638
639        self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
640            .await?;
641
642        // After this point we have to commit the database transaction since the
643        // item has been fully processed without errors
644        dbtx.warn_uncommitted();
645
646        dbtx.insert_entry(
647            &AcceptedItemKey(item_index),
648            &AcceptedItem {
649                item: item.clone(),
650                peer,
651            },
652        )
653        .await;
654
655        debug!(
656            target: LOG_CONSENSUS,
657            %peer,
658            item = ?DebugConsensusItem(&item),
659            "Processed consensus item"
660        );
661        let mut audit = Audit::default();
662
663        for (module_instance_id, kind, module) in self.modules.iter_modules() {
664            let _module_audit_timing =
665                TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
666
667            let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
668                .with_label_values(&[&MODULE_INSTANCE_ID_GLOBAL.to_string(), kind.as_str()])
669                .start_timer();
670
671            module
672                .audit(
673                    &mut dbtx
674                        .to_ref_with_prefix_module_id(module_instance_id)
675                        .0
676                        .into_nc(),
677                    &mut audit,
678                    module_instance_id,
679                )
680                .await;
681            timing_prom.observe_duration();
682        }
683
684        assert!(
685            audit
686                .net_assets()
687                .expect("Overflow while checking balance sheet")
688                .milli_sat
689                >= 0,
690            "Balance sheet of the fed has gone negative, this should never happen! {audit}"
691        );
692
693        dbtx.commit_tx_result()
694            .await
695            .expect("Committing consensus epoch failed");
696
697        CONSENSUS_ITEMS_PROCESSED_TOTAL
698            .with_label_values(&[&self.peer_id_str[peer.to_usize()]])
699            .inc();
700        timing_prom.observe_duration();
701
702        Ok(())
703    }
704
705    async fn process_consensus_item_with_db_transaction(
706        &self,
707        dbtx: &mut DatabaseTransaction<'_>,
708        consensus_item: ConsensusItem,
709        peer_id: PeerId,
710    ) -> anyhow::Result<()> {
711        // We rely on decoding rejecting any unknown module instance ids to avoid
712        // peer-triggered panic here
713        self.decoders().assert_reject_mode();
714
715        match consensus_item {
716            ConsensusItem::Module(module_item) => {
717                let instance_id = module_item.module_instance_id();
718
719                let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
720
721                self.modules
722                    .get_expect(instance_id)
723                    .process_consensus_item(module_dbtx, &module_item, peer_id)
724                    .await
725            }
726            ConsensusItem::Transaction(transaction) => {
727                let txid = transaction.tx_hash();
728                if dbtx
729                    .get_value(&AcceptedTransactionKey(txid))
730                    .await
731                    .is_some()
732                {
733                    debug!(
734                        target: LOG_CONSENSUS,
735                        %txid,
736                        "Transaction already accepted"
737                    );
738                    bail!("Transaction is already accepted");
739                }
740
741                let modules_ids = transaction
742                    .outputs
743                    .iter()
744                    .map(DynOutput::module_instance_id)
745                    .collect::<Vec<_>>();
746
747                process_transaction_with_dbtx(
748                    self.modules.clone(),
749                    dbtx,
750                    &transaction,
751                    self.cfg.consensus.version,
752                    TxProcessingMode::Consensus,
753                )
754                .await
755                .map_err(|error| anyhow!(error.to_string()))?;
756
757                debug!(target: LOG_CONSENSUS, %txid,  "Transaction accepted");
758                dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
759                    .await;
760
761                Ok(())
762            }
763            ConsensusItem::Default { variant, .. } => {
764                warn!(
765                    target: LOG_CONSENSUS,
766                    "Minor consensus version mismatch: unexpected consensus item type: {variant}"
767                );
768
769                panic!("Unexpected consensus item type: {variant}")
770            }
771        }
772    }
773
774    async fn request_signed_session_outcome(
775        &self,
776        federation_api: &DynGlobalApi,
777        index: u64,
778    ) -> SignedSessionOutcome {
779        let decoders = self.decoders();
780        let keychain = Keychain::new(&self.cfg);
781        let threshold = self.num_peers().threshold();
782
783        let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
784            let signed_session_outcome = response
785                .try_into_inner(&decoders)
786                .map_err(|x| PeerError::ResponseDeserialization(x.into()))?;
787            let header = signed_session_outcome.session_outcome.header(index);
788            if signed_session_outcome.signatures.len() == threshold
789                && signed_session_outcome
790                    .signatures
791                    .iter()
792                    .all(|(peer_id, sig)| keychain.verify(&header, sig, to_node_index(*peer_id)))
793            {
794                Ok(signed_session_outcome)
795            } else {
796                Err(PeerError::InvalidResponse(anyhow!("Invalid signatures")))
797            }
798        };
799
800        loop {
801            let result = federation_api
802                .request_with_strategy(
803                    FilterMap::new(filter_map.clone()),
804                    AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
805                    ApiRequestErased::new(index),
806                )
807                .await;
808
809            match result {
810                Ok(signed_session_outcome) => return signed_session_outcome,
811                Err(error) => {
812                    error.report_if_unusual("Requesting Session Outcome");
813                }
814            }
815        }
816    }
817
818    /// Returns the number of sessions already saved in the database. This count
819    /// **does not** include the currently running session.
820    async fn get_finished_session_count(&self) -> u64 {
821        get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
822    }
823}
824
825pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
826    dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
827        .await
828        .next()
829        .await
830        .map_or(0, |entry| (entry.0.0) + 1)
831}