1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use aleph_bft::Keychain as KeychainTrait;
8use anyhow::{Context, anyhow, bail};
9use async_channel::Receiver;
10use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, ServerError};
11use fedimint_api_client::query::FilterMap;
12use fedimint_core::config::P2PMessage;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::module::audit::Audit;
19use fedimint_core::module::registry::ModuleDecoderRegistry;
20use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
21use fedimint_core::net::peers::DynP2PConnections;
22use fedimint_core::runtime::spawn;
23use fedimint_core::secp256k1::schnorr;
24use fedimint_core::session_outcome::{AcceptedItem, SessionOutcome, SignedSessionOutcome};
25use fedimint_core::task::{TaskGroup, TaskHandle, sleep};
26use fedimint_core::timing::TimeReporter;
27use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
28use fedimint_core::{NumPeers, NumPeersExt, PeerId, timing};
29use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
30use futures::StreamExt;
31use rand::Rng;
32use rand::seq::IteratorRandom;
33use tokio::sync::watch;
34use tracing::{Level, debug, error, info, instrument, trace, warn};
35
36use crate::LOG_CONSENSUS;
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{DataProvider, UnitData};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::to_node_index;
45use crate::consensus::db::{
46 AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47 SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
51use crate::metrics::{
52 CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
53 CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ITEMS_PROCESSED_TOTAL,
54 CONSENSUS_ORDERING_LATENCY_SECONDS, CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX,
55 CONSENSUS_SESSION_COUNT,
56};
57
58const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61pub struct ConsensusEngine {
63 pub modules: ServerModuleRegistry,
64 pub db: Database,
65 pub federation_api: DynGlobalApi,
66 pub cfg: ServerConfig,
67 pub submission_receiver: Receiver<ConsensusItem>,
68 pub shutdown_receiver: watch::Receiver<Option<u64>>,
69 pub connections: DynP2PConnections<P2PMessage>,
70 pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
71 pub ord_latency_sender: watch::Sender<Option<Duration>>,
72 pub task_group: TaskGroup,
73 pub data_dir: PathBuf,
74 pub db_checkpoint_retention: u64,
75 pub session_timeout: Duration,
76}
77
78impl ConsensusEngine {
79 fn num_peers(&self) -> NumPeers {
80 self.cfg.consensus.broadcast_public_keys.to_num_peers()
81 }
82
83 fn identity(&self) -> PeerId {
84 self.cfg.local.identity
85 }
86
87 #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
88 pub async fn run(self) -> anyhow::Result<()> {
89 if self.num_peers().total() == 1 {
90 self.run_single_guardian(self.task_group.make_handle())
91 .await
92 } else {
93 self.run_consensus(self.task_group.make_handle()).await
94 }
95 }
96
97 pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
98 assert_eq!(self.num_peers(), NumPeers::from(1));
99
100 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
101
102 while !task_handle.is_shutting_down() {
103 let session_index = self.get_finished_session_count().await;
104
105 CONSENSUS_SESSION_COUNT.set(session_index as i64);
106
107 let mut item_index = self.pending_accepted_items().await.len() as u64;
108
109 let session_start_time = std::time::Instant::now();
110
111 while let Ok(item) = self.submission_receiver.recv().await {
112 if self
113 .process_consensus_item(session_index, item_index, item, self.identity())
114 .await
115 .is_ok()
116 {
117 item_index += 1;
118 }
119
120 if session_start_time.elapsed() > Duration::from_mins(1) {
122 break;
123 }
124 }
125
126 let session_outcome = SessionOutcome {
127 items: self.pending_accepted_items().await,
128 };
129
130 let header = session_outcome.header(session_index);
131 let signature = Keychain::new(&self.cfg).sign_schnorr(&header);
132 let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
133
134 self.complete_session(
135 session_index,
136 SignedSessionOutcome {
137 session_outcome,
138 signatures,
139 },
140 )
141 .await;
142
143 self.checkpoint_database(session_index);
144
145 info!(target: LOG_CONSENSUS, "Session {session_index} completed");
146
147 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
148 break;
149 }
150 }
151
152 info!(target: LOG_CONSENSUS, "Consensus task shut down");
153
154 Ok(())
155 }
156
157 pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
158 assert!(self.num_peers().total() >= 4);
160
161 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
162
163 while !task_handle.is_shutting_down() {
164 let session_index = self.get_finished_session_count().await;
165
166 CONSENSUS_SESSION_COUNT.set(session_index as i64);
167
168 let is_recovery = self.is_recovery().await;
169
170 info!(
171 target: LOG_CONSENSUS,
172 session_index,
173 is_recovery,
174 "Starting consensus session"
175 );
176
177 let session = tokio::time::timeout(
183 self.session_timeout,
184 self.run_session(self.connections.clone(), session_index),
185 )
186 .await
187 .context("Consensus session timed out, exiting...")?;
188
189 if session.is_none() {
190 return Ok(());
191 }
192
193 info!(target: LOG_CONSENSUS, ?session_index, "Completed consensus session");
194
195 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
196 info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
197
198 sleep(Duration::from_mins(1)).await;
199
200 break;
201 }
202 }
203
204 info!(target: LOG_CONSENSUS, "Consensus task shut down");
205
206 Ok(())
207 }
208
209 pub async fn run_session(
210 &self,
211 connections: DynP2PConnections<P2PMessage>,
212 session_index: u64,
213 ) -> Option<()> {
214 const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
232 const BASE: f64 = 1.02;
233
234 let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
235 let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
236
237 let mut delay_config = aleph_bft::default_delay_config();
238
239 delay_config.unit_creation_delay = Arc::new(move |round_index| {
240 let delay = if round_index == 0 {
241 0.0
242 } else {
243 round_delay
244 * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
245 * rand::thread_rng().gen_range(0.5..=1.5)
246 };
247
248 Duration::from_millis(delay.round() as u64)
249 });
250
251 let config = aleph_bft::create_config(
252 self.num_peers().total().into(),
253 self.identity().to_usize().into(),
254 session_index,
255 self.cfg
256 .consensus
257 .broadcast_rounds_per_session
258 .checked_add(EXP_SLOWDOWN_ROUNDS)
259 .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
260 delay_config,
261 Duration::from_hours(87600),
262 )
263 .expect("The exponential slowdown exceeds 10 years");
264
265 let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
268 let (signature_sender, signature_receiver) = watch::channel(None);
269 let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
270 let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
271
272 let (signed_outcomes_sender, signed_outcomes_receiver) = async_channel::unbounded();
274 let (signatures_sender, signatures_receiver) = async_channel::unbounded();
275
276 let aleph_handle = spawn(
277 "aleph run session",
278 aleph_bft::run_session(
279 config,
280 aleph_bft::LocalIO::new(
281 DataProvider::new(
282 self.submission_receiver.clone(),
283 signature_receiver,
284 timestamp_sender,
285 self.is_recovery().await,
286 ),
287 FinalizationHandler::new(unit_data_sender),
288 BackupWriter::new(self.db.clone()).await,
289 BackupReader::new(self.db.clone()),
290 ),
291 Network::new(
292 connections.clone(),
293 signed_outcomes_sender,
294 signatures_sender,
295 self.db.clone(),
296 ),
297 Keychain::new(&self.cfg),
298 Spawner::new(self.task_group.make_subgroup()),
299 aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
300 ),
301 );
302
303 self.ord_latency_sender.send_replace(None);
304
305 let signed_session_outcome = self
306 .complete_signed_session_outcome(
307 session_index,
308 unit_data_receiver,
309 signature_sender,
310 timestamp_receiver,
311 signed_outcomes_receiver,
312 signatures_receiver,
313 connections,
314 )
315 .await?;
316
317 assert!(
318 self.validate_signed_session_outcome(&signed_session_outcome, session_index),
319 "Our created signed session outcome fails validation"
320 );
321
322 info!(target: LOG_CONSENSUS, ?session_index, "Terminating Aleph BFT session");
323
324 terminator_sender.send(()).ok();
327 aleph_handle.await.ok();
328
329 self.complete_session(session_index, signed_session_outcome)
333 .await;
334
335 self.checkpoint_database(session_index);
336
337 Some(())
338 }
339
340 async fn is_recovery(&self) -> bool {
341 self.db
342 .begin_transaction_nc()
343 .await
344 .find_by_prefix(&AlephUnitsPrefix)
345 .await
346 .next()
347 .await
348 .is_some()
349 }
350
351 #[allow(clippy::too_many_arguments)]
352 pub async fn complete_signed_session_outcome(
353 &self,
354 session_index: u64,
355 ordered_unit_receiver: Receiver<OrderedUnit>,
356 signature_sender: watch::Sender<Option<schnorr::Signature>>,
357 timestamp_receiver: Receiver<Instant>,
358 signed_outcomes_receiver: Receiver<(PeerId, SignedSessionOutcome)>,
359 signatures_receiver: Receiver<(PeerId, schnorr::Signature)>,
360 _connections: DynP2PConnections<P2PMessage>,
361 ) -> Option<SignedSessionOutcome> {
362 let mut item_index = 0;
365
366 let mut index_broadcast_interval = tokio::time::interval(Duration::from_secs(3));
368
369 let mut request_signed_session_outcome = Box::pin(async {
370 self.request_signed_session_outcome(&self.federation_api, session_index)
371 .await
372 });
373
374 loop {
378 tokio::select! {
379 result = ordered_unit_receiver.recv() => {
380 let ordered_unit = result.ok()?;
381
382 if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
383 info!(
384 target: LOG_CONSENSUS,
385 session_index,
386 "Reached Aleph BFT round limit, stopping item collection"
387 );
388 break;
389 }
390
391 if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
392 if ordered_unit.creator == self.identity() {
393 match timestamp_receiver.try_recv() {
394 Ok(timestamp) => {
395 let latency = match *self.ord_latency_sender.borrow() {
396 Some(latency) => (9 * latency + timestamp.elapsed()) / 10,
397 None => timestamp.elapsed()
398 };
399
400 self.ord_latency_sender.send_replace(Some(latency));
401
402 CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
403 }
404 Err(err) => {
405 debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal in recovery");
406 }
407 }
408 }
409
410 match Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()) {
411 Ok(items) => {
412 for item in items {
413 if let Ok(()) = self.process_consensus_item(
414 session_index,
415 item_index,
416 item.clone(),
417 ordered_unit.creator
418 ).await {
419 item_index += 1;
420 }
421 }
422 }
423 Err(err) => {
424 error!(
425 target: LOG_CONSENSUS,
426 session_index,
427 peer = %ordered_unit.creator,
428 err = %err.fmt_compact(),
429 "Failed to decode consensus items from peer"
430 );
431 }
432 }
433 }
434 },
435 signed_session_outcome = &mut request_signed_session_outcome => {
437 info!(
438 target: LOG_CONSENSUS,
439 ?session_index,
440 "Recovered signed session outcome from peers while processing consensus items"
441 );
442
443 let pending_accepted_items = self.pending_accepted_items().await;
444
445 let (processed, unprocessed) = signed_session_outcome
447 .session_outcome
448 .items
449 .split_at(pending_accepted_items.len());
450
451 info!(
452 target: LOG_CONSENSUS,
453 session_index,
454 processed = %processed.len(),
455 unprocessed = %unprocessed.len(),
456 "Processing remaining items..."
457 );
458
459 assert!(
460 processed.iter().eq(pending_accepted_items.iter()),
461 "Consensus Failure: pending accepted items disagree with federation consensus"
462 );
463
464 for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
465 if let Err(err) = self.process_consensus_item(
466 session_index,
467 item_index as u64,
468 accepted_item.item.clone(),
469 accepted_item.peer
470 ).await {
471 panic!(
472 "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
473 processed.len(),
474 unprocessed.len(),
475 );
476 }
477 }
478
479 return Some(signed_session_outcome);
480 },
481 result = signed_outcomes_receiver.recv() => {
482 let (peer_id, p2p_outcome) = result.ok()?;
483
484 if self.validate_signed_session_outcome(&p2p_outcome, session_index) {
486 info!(
487 target: LOG_CONSENSUS,
488 session_index,
489 peer_id = %peer_id,
490 "Received SignedSessionOutcome via P2P while collection signatures"
491 );
492
493 let pending_accepted_items = self.pending_accepted_items().await;
494
495 let (processed, unprocessed) = p2p_outcome
497 .session_outcome
498 .items
499 .split_at(pending_accepted_items.len());
500
501 info!(
502 target: LOG_CONSENSUS,
503 ?session_index,
504 processed = %processed.len(),
505 unprocessed = %unprocessed.len(),
506 "Processing remaining items..."
507 );
508
509 assert!(
510 processed.iter().eq(pending_accepted_items.iter()),
511 "Consensus Failure: pending accepted items disagree with federation consensus"
512 );
513
514 for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
515 if let Err(err) = self.process_consensus_item(
516 session_index,
517 item_index as u64,
518 accepted_item.item.clone(),
519 accepted_item.peer
520 ).await {
521 panic!(
522 "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
523 processed.len(),
524 unprocessed.len(),
525 );
526 }
527 }
528
529 info!(
530 target: LOG_CONSENSUS,
531 ?session_index,
532 peer_id = %peer_id,
533 "Successfully recovered session via P2P"
534 );
535
536 return Some(p2p_outcome);
537 }
538
539 debug!(
540 target: LOG_CONSENSUS,
541 %peer_id,
542 "Invalid P2P SignedSessionOutcome"
543 );
544 }
545 _ = index_broadcast_interval.tick() => {
546 }
552 }
553 }
554
555 let items = self.pending_accepted_items().await;
556
557 assert_eq!(item_index, items.len() as u64);
558
559 info!(target: LOG_CONSENSUS, ?session_index, ?item_index, "Processed all items for session");
560
561 let session_outcome = SessionOutcome { items };
562
563 let header = session_outcome.header(session_index);
564
565 info!(
566 target: LOG_CONSENSUS,
567 ?session_index,
568 "Signing session header..."
569 );
570
571 let keychain = Keychain::new(&self.cfg);
572
573 let our_signature = keychain.sign_schnorr(&header);
574
575 #[allow(clippy::disallowed_methods)]
577 signature_sender.send(Some(our_signature)).ok()?;
578
579 let mut signatures = BTreeMap::from_iter([(self.identity(), our_signature)]);
580
581 let items_dump = tokio::sync::OnceCell::new();
582
583 let mut signature_broadcast_interval = tokio::time::interval(Duration::from_secs(1));
585
586 while signatures.len() < self.num_peers().threshold() {
589 tokio::select! {
590 result = ordered_unit_receiver.recv() => {
592 let ordered_unit = result.ok()?;
593
594 if let Some(UnitData::Signature(signature)) = ordered_unit.data {
595 info!(
596 target: LOG_CONSENSUS,
597 ?session_index,
598 peer = %ordered_unit.creator,
599 "Collected signature from peer via AlephBFT, verifying..."
600 );
601
602 if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
603 signatures.insert(ordered_unit.creator, schnorr::Signature::from_slice(&signature).expect("AlephBFT signature is valid"));
604 } else {
605 error!(
606 target: LOG_CONSENSUS,
607 ?session_index,
608 peer = %ordered_unit.creator,
609 "Consensus Failure: invalid header signature from peer"
610 );
611
612 items_dump.get_or_init(|| async {
613 for (idx, item) in session_outcome.items.iter().enumerate() {
614 info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
615 }
616 }).await;
617 }
618 }
619 }
620 result = signatures_receiver.recv() => {
621 let (peer_id, signature) = result.ok()?;
622
623 if keychain.verify_schnorr(&header, &signature, peer_id) {
624 signatures.insert(peer_id, signature);
625
626 info!(
627 target: LOG_CONSENSUS,
628 session_index,
629 peer_id = %peer_id,
630 "Collected signature from peer via P2P"
631 );
632 }
633
634 debug!(
635 target: LOG_CONSENSUS,
636 session_index,
637 peer_id = %peer_id,
638 "Invalid P2P signature from peer"
639 );
640 }
641 signed_session_outcome = &mut request_signed_session_outcome => {
643 info!(
644 target: LOG_CONSENSUS,
645 ?session_index,
646 "Recovered signed session outcome from peers while collecting signatures"
647 );
648
649 assert_eq!(
650 header,
651 signed_session_outcome.session_outcome.header(session_index),
652 "Consensus Failure: header disagrees with federation consensus"
653 );
654
655 return Some(signed_session_outcome);
656 },
657 result = signed_outcomes_receiver.recv() => {
658 let (peer_id, p2p_outcome) = result.ok()?;
659
660 if self.validate_signed_session_outcome(&p2p_outcome, session_index) {
661 assert_eq!(
662 header,
663 p2p_outcome.session_outcome.header(session_index),
664 "Consensus Failure: header disagrees with federation consensus"
665 );
666
667 info!(
668 target: LOG_CONSENSUS,
669 session_index,
670 %peer_id,
671 "Recovered session via P2P while collecting signatures"
672 );
673
674 return Some(p2p_outcome);
675 }
676
677 debug!(
678 target: LOG_CONSENSUS,
679 %peer_id,
680 "Invalid P2P SignedSessionOutcome"
681 );
682 }
683 _ = signature_broadcast_interval.tick() => {
684 }
690 _ = index_broadcast_interval.tick() => {
691 }
697 }
698 }
699
700 info!(
701 target: LOG_CONSENSUS,
702 session_index,
703 "Successfully collected threshold of signatures"
704 );
705
706 Some(SignedSessionOutcome {
707 session_outcome,
708 signatures,
709 })
710 }
711
712 #[allow(unused)]
714 fn random_peer(&self) -> PeerId {
715 self.num_peers()
716 .peer_ids()
717 .filter(|p| *p != self.identity())
718 .choose(&mut rand::thread_rng())
719 .expect("We have at least three peers")
720 }
721
722 fn validate_signed_session_outcome(
724 &self,
725 outcome: &SignedSessionOutcome,
726 session_index: u64,
727 ) -> bool {
728 if outcome.signatures.len() != self.num_peers().threshold() {
729 return false;
730 }
731
732 let keychain = Keychain::new(&self.cfg);
733 let header = outcome.session_outcome.header(session_index);
734
735 outcome
736 .signatures
737 .iter()
738 .all(|(signer_id, sig)| keychain.verify_schnorr(&header, sig, *signer_id))
739 }
740
741 fn decoders(&self) -> ModuleDecoderRegistry {
742 self.modules.decoder_registry()
743 }
744
745 pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
746 self.db
747 .begin_transaction_nc()
748 .await
749 .find_by_prefix(&AcceptedItemPrefix)
750 .await
751 .map(|entry| entry.1)
752 .collect()
753 .await
754 }
755
756 pub async fn complete_session(
757 &self,
758 session_index: u64,
759 signed_session_outcome: SignedSessionOutcome,
760 ) {
761 let mut dbtx = self.db.begin_transaction().await;
762
763 dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
764
765 dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
766
767 if dbtx
768 .insert_entry(
769 &SignedSessionOutcomeKey(session_index),
770 &signed_session_outcome,
771 )
772 .await
773 .is_some()
774 {
775 panic!("We tried to overwrite a signed session outcome");
776 }
777
778 dbtx.commit_tx_result()
779 .await
780 .expect("This is the only place where we write to this key");
781 }
782
783 fn db_checkpoints_dir(&self) -> PathBuf {
785 self.data_dir.join(DB_CHECKPOINTS_DIR)
786 }
787
788 fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
792 let checkpoint_dir = self.db_checkpoints_dir();
793
794 if checkpoint_dir.exists() {
795 debug!(
796 target: LOG_CONSENSUS,
797 ?current_session,
798 "Removing database checkpoints up to `current_session`"
799 );
800
801 for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
802 if let Ok(file_name) = checkpoint.file_name().into_string()
804 && let Ok(session) = file_name.parse::<u64>()
805 && current_session >= self.db_checkpoint_retention
806 && session < current_session - self.db_checkpoint_retention
807 {
808 fs::remove_dir_all(checkpoint.path())?;
809 }
810 }
811 } else {
812 fs::create_dir_all(&checkpoint_dir)?;
813 }
814
815 Ok(())
816 }
817
818 fn checkpoint_database(&self, session_index: u64) {
822 if self.db_checkpoint_retention == 0 {
825 return;
826 }
827
828 let checkpoint_dir = self.db_checkpoints_dir();
829 let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
830
831 {
832 let _timing = timing::TimeReporter::new("database-checkpoint").level(Level::TRACE);
833 match self.db.checkpoint(&session_checkpoint_dir) {
834 Ok(()) => {
835 debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
836 }
837 Err(err) => {
838 warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, err = %err.fmt_compact(), "Could not create db checkpoint");
839 }
840 }
841 }
842
843 {
844 let _timing = timing::TimeReporter::new("remove-database-checkpoint").level(Level::TRACE);
846 if let Err(err) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
847 warn!(target: LOG_CONSENSUS, err = %err.fmt_compact_anyhow(), "Could not delete old checkpoints");
848 }
849 }
850 }
851
852 fn delete_old_database_checkpoint(
855 &self,
856 session_index: u64,
857 checkpoint_dir: &Path,
858 ) -> anyhow::Result<()> {
859 if self.db_checkpoint_retention > session_index {
860 return Ok(());
861 }
862
863 let delete_session_index = session_index - self.db_checkpoint_retention;
864 let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
865 if checkpoint_to_delete.exists() {
866 fs::remove_dir_all(checkpoint_to_delete)?;
867 }
868
869 Ok(())
870 }
871
872 #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
873 pub async fn process_consensus_item(
874 &self,
875 session_index: u64,
876 item_index: u64,
877 item: ConsensusItem,
878 peer: PeerId,
879 ) -> anyhow::Result<()> {
880 let _timing = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
881
882 let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
883 .with_label_values(&[&peer.to_usize().to_string()])
884 .start_timer();
885
886 trace!(
887 target: LOG_CONSENSUS,
888 %peer,
889 item = ?DebugConsensusItem(&item),
890 "Processing consensus item"
891 );
892
893 self.ci_status_senders
894 .get(&peer)
895 .expect("No ci status sender for peer")
896 .send_replace(Some(session_index));
897
898 CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
899 .with_label_values(&[
900 &self.cfg.local.identity.to_usize().to_string(),
901 &peer.to_usize().to_string(),
902 ])
903 .set(session_index as i64);
904
905 let mut dbtx = self.db.begin_transaction().await;
906
907 dbtx.ignore_uncommitted();
908
909 if let Some(existing_item) = dbtx
913 .get_value(&AcceptedItemKey(item_index.to_owned()))
914 .await
915 {
916 if existing_item.item == item && existing_item.peer == peer {
917 return Ok(());
918 }
919
920 bail!(
921 "Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}",
922 existing_item.peer
923 );
924 }
925
926 self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
927 .await
928 .inspect_err(|err| {
929 trace!(
931 target: LOG_CONSENSUS,
932 %peer,
933 item = ?DebugConsensusItem(&item),
934 err = %err.fmt_compact_anyhow(),
935 "Rejected consensus item"
936 );
937 })?;
938
939 dbtx.warn_uncommitted();
942
943 dbtx.insert_entry(
944 &AcceptedItemKey(item_index),
945 &AcceptedItem {
946 item: item.clone(),
947 peer,
948 },
949 )
950 .await;
951
952 debug!(
953 target: LOG_CONSENSUS,
954 %peer,
955 item = ?DebugConsensusItem(&item),
956 "Processed consensus item"
957 );
958 let mut audit = Audit::default();
959
960 for (module_instance_id, kind, module) in self.modules.iter_modules() {
961 let _module_audit_timing =
962 TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
963
964 let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
965 .with_label_values(&[
966 MODULE_INSTANCE_ID_GLOBAL.to_string().as_str(),
967 kind.as_str(),
968 ])
969 .start_timer();
970
971 module
972 .audit(
973 &mut dbtx
974 .to_ref_with_prefix_module_id(module_instance_id)
975 .0
976 .into_nc(),
977 &mut audit,
978 module_instance_id,
979 )
980 .await;
981
982 timing_prom.observe_duration();
983 }
984
985 assert!(
986 audit
987 .net_assets()
988 .expect("Overflow while checking balance sheet")
989 .milli_sat
990 >= 0,
991 "Balance sheet of the fed has gone negative, this should never happen! {audit}"
992 );
993
994 dbtx.commit_tx_result()
995 .await
996 .expect("Committing consensus epoch failed");
997
998 CONSENSUS_ITEMS_PROCESSED_TOTAL
999 .with_label_values(&[&peer.to_usize().to_string()])
1000 .inc();
1001
1002 timing_prom.observe_duration();
1003
1004 Ok(())
1005 }
1006
1007 async fn process_consensus_item_with_db_transaction(
1008 &self,
1009 dbtx: &mut DatabaseTransaction<'_>,
1010 consensus_item: ConsensusItem,
1011 peer_id: PeerId,
1012 ) -> anyhow::Result<()> {
1013 self.decoders().assert_reject_mode();
1016
1017 match consensus_item {
1018 ConsensusItem::Module(module_item) => {
1019 let instance_id = module_item.module_instance_id();
1020
1021 let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
1022
1023 self.modules
1024 .get_expect(instance_id)
1025 .process_consensus_item(module_dbtx, &module_item, peer_id)
1026 .await
1027 }
1028 ConsensusItem::Transaction(transaction) => {
1029 let txid = transaction.tx_hash();
1030 if dbtx
1031 .get_value(&AcceptedTransactionKey(txid))
1032 .await
1033 .is_some()
1034 {
1035 debug!(
1036 target: LOG_CONSENSUS,
1037 %txid,
1038 "Transaction already accepted"
1039 );
1040 bail!("Transaction is already accepted");
1041 }
1042
1043 let modules_ids = transaction
1044 .outputs
1045 .iter()
1046 .map(DynOutput::module_instance_id)
1047 .collect::<Vec<_>>();
1048
1049 process_transaction_with_dbtx(
1050 self.modules.clone(),
1051 dbtx,
1052 &transaction,
1053 self.cfg.consensus.version,
1054 TxProcessingMode::Consensus,
1055 )
1056 .await
1057 .map_err(|error| anyhow!(error.to_string()))?;
1058
1059 debug!(target: LOG_CONSENSUS, %txid, "Transaction accepted");
1060 dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
1061 .await;
1062
1063 Ok(())
1064 }
1065 ConsensusItem::Default { variant, .. } => {
1066 warn!(
1067 target: LOG_CONSENSUS,
1068 "Minor consensus version mismatch: unexpected consensus item type: {variant}"
1069 );
1070
1071 panic!("Unexpected consensus item type: {variant}")
1072 }
1073 }
1074 }
1075
1076 async fn request_signed_session_outcome(
1077 &self,
1078 federation_api: &DynGlobalApi,
1079 index: u64,
1080 ) -> SignedSessionOutcome {
1081 let decoders = self.decoders();
1082 let keychain = Keychain::new(&self.cfg);
1083 let threshold = self.num_peers().threshold();
1084
1085 let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
1086 let signed_session_outcome = response
1087 .try_into_inner(&decoders)
1088 .map_err(|x| ServerError::ResponseDeserialization(x.into()))?;
1089 let header = signed_session_outcome.session_outcome.header(index);
1090 if signed_session_outcome.signatures.len() == threshold
1091 && signed_session_outcome
1092 .signatures
1093 .iter()
1094 .all(|(peer_id, sig)| keychain.verify_schnorr(&header, sig, *peer_id))
1095 {
1096 Ok(signed_session_outcome)
1097 } else {
1098 Err(ServerError::InvalidResponse(anyhow!("Invalid signatures")))
1099 }
1100 };
1101
1102 federation_api
1103 .request_with_strategy_retry(
1104 FilterMap::new(filter_map.clone()),
1105 AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
1106 ApiRequestErased::new(index),
1107 )
1108 .await
1109 }
1110
1111 async fn get_finished_session_count(&self) -> u64 {
1114 get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
1115 }
1116}
1117
1118pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
1119 dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
1120 .await
1121 .next()
1122 .await
1123 .map_or(0, |entry| (entry.0.0) + 1)
1124}