1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use aleph_bft::Keychain as KeychainTrait;
8use anyhow::{anyhow, bail};
9use async_channel::Receiver;
10use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, ServerError};
11use fedimint_api_client::query::FilterMap;
12use fedimint_core::config::P2PMessage;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::module::audit::Audit;
19use fedimint_core::module::registry::ModuleDecoderRegistry;
20use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
21use fedimint_core::net::peers::DynP2PConnections;
22use fedimint_core::runtime::spawn;
23use fedimint_core::secp256k1::schnorr;
24use fedimint_core::session_outcome::{AcceptedItem, SessionOutcome, SignedSessionOutcome};
25use fedimint_core::task::{TaskGroup, TaskHandle, sleep};
26use fedimint_core::timing::TimeReporter;
27use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
28use fedimint_core::{NumPeers, NumPeersExt, PeerId, timing};
29use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
30use futures::StreamExt;
31use rand::Rng;
32use rand::seq::IteratorRandom;
33use tokio::sync::watch;
34use tracing::{Level, debug, error, info, instrument, trace, warn};
35
36use crate::LOG_CONSENSUS;
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{DataProvider, UnitData};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::to_node_index;
45use crate::consensus::db::{
46 AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47 SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
51use crate::metrics::{
52 CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
53 CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ITEMS_PROCESSED_TOTAL,
54 CONSENSUS_ORDERING_LATENCY_SECONDS, CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX,
55 CONSENSUS_SESSION_COUNT,
56};
57
58const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61pub struct ConsensusEngine {
63 pub modules: ServerModuleRegistry,
64 pub db: Database,
65 pub federation_api: DynGlobalApi,
66 pub cfg: ServerConfig,
67 pub submission_receiver: Receiver<ConsensusItem>,
68 pub shutdown_receiver: watch::Receiver<Option<u64>>,
69 pub connections: DynP2PConnections<P2PMessage>,
70 pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
71 pub ord_latency_sender: watch::Sender<Option<Duration>>,
72 pub task_group: TaskGroup,
73 pub data_dir: PathBuf,
74 pub db_checkpoint_retention: u64,
75}
76
77impl ConsensusEngine {
78 fn num_peers(&self) -> NumPeers {
79 self.cfg.consensus.broadcast_public_keys.to_num_peers()
80 }
81
82 fn identity(&self) -> PeerId {
83 self.cfg.local.identity
84 }
85
86 #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
87 pub async fn run(self) -> anyhow::Result<()> {
88 if self.num_peers().total() == 1 {
89 self.run_single_guardian(self.task_group.make_handle())
90 .await
91 } else {
92 self.run_consensus(self.task_group.make_handle()).await
93 }
94 }
95
96 pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
97 assert_eq!(self.num_peers(), NumPeers::from(1));
98
99 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
100
101 while !task_handle.is_shutting_down() {
102 let session_index = self.get_finished_session_count().await;
103
104 CONSENSUS_SESSION_COUNT.set(session_index as i64);
105
106 let mut item_index = self.pending_accepted_items().await.len() as u64;
107
108 let session_start_time = std::time::Instant::now();
109
110 while let Ok(item) = self.submission_receiver.recv().await {
111 if self
112 .process_consensus_item(session_index, item_index, item, self.identity())
113 .await
114 .is_ok()
115 {
116 item_index += 1;
117 }
118
119 if session_start_time.elapsed() > Duration::from_secs(60) {
121 break;
122 }
123 }
124
125 let session_outcome = SessionOutcome {
126 items: self.pending_accepted_items().await,
127 };
128
129 let header = session_outcome.header(session_index);
130 let signature = Keychain::new(&self.cfg).sign_schnorr(&header);
131 let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
132
133 self.complete_session(
134 session_index,
135 SignedSessionOutcome {
136 session_outcome,
137 signatures,
138 },
139 )
140 .await;
141
142 self.checkpoint_database(session_index);
143
144 info!(target: LOG_CONSENSUS, "Session {session_index} completed");
145
146 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
147 break;
148 }
149 }
150
151 info!(target: LOG_CONSENSUS, "Consensus task shut down");
152
153 Ok(())
154 }
155
156 pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
157 assert!(self.num_peers().total() >= 4);
159
160 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
161
162 while !task_handle.is_shutting_down() {
163 let session_index = self.get_finished_session_count().await;
164
165 CONSENSUS_SESSION_COUNT.set(session_index as i64);
166
167 let is_recovery = self.is_recovery().await;
168
169 info!(
170 target: LOG_CONSENSUS,
171 session_index,
172 is_recovery,
173 "Starting consensus session"
174 );
175
176 if self
177 .run_session(self.connections.clone(), session_index)
178 .await
179 .is_none()
180 {
181 return Ok(());
182 }
183
184 info!(target: LOG_CONSENSUS, ?session_index, "Completed consensus session");
185
186 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
187 info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
188
189 sleep(Duration::from_secs(60)).await;
190
191 break;
192 }
193 }
194
195 info!(target: LOG_CONSENSUS, "Consensus task shut down");
196
197 Ok(())
198 }
199
200 pub async fn run_session(
201 &self,
202 connections: DynP2PConnections<P2PMessage>,
203 session_index: u64,
204 ) -> Option<()> {
205 const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
223 const BASE: f64 = 1.02;
224
225 let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
226 let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
227
228 let mut delay_config = aleph_bft::default_delay_config();
229
230 delay_config.unit_creation_delay = Arc::new(move |round_index| {
231 let delay = if round_index == 0 {
232 0.0
233 } else {
234 round_delay
235 * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
236 * rand::thread_rng().gen_range(0.5..=1.5)
237 };
238
239 Duration::from_millis(delay.round() as u64)
240 });
241
242 let config = aleph_bft::create_config(
243 self.num_peers().total().into(),
244 self.identity().to_usize().into(),
245 session_index,
246 self.cfg
247 .consensus
248 .broadcast_rounds_per_session
249 .checked_add(EXP_SLOWDOWN_ROUNDS)
250 .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
251 delay_config,
252 Duration::from_secs(10 * 365 * 24 * 60 * 60),
253 )
254 .expect("The exponential slowdown exceeds 10 years");
255
256 let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
259 let (signature_sender, signature_receiver) = watch::channel(None);
260 let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
261 let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
262
263 let (signed_outcomes_sender, signed_outcomes_receiver) = async_channel::unbounded();
265 let (signatures_sender, signatures_receiver) = async_channel::unbounded();
266
267 let aleph_handle = spawn(
268 "aleph run session",
269 aleph_bft::run_session(
270 config,
271 aleph_bft::LocalIO::new(
272 DataProvider::new(
273 self.submission_receiver.clone(),
274 signature_receiver,
275 timestamp_sender,
276 self.is_recovery().await,
277 ),
278 FinalizationHandler::new(unit_data_sender),
279 BackupWriter::new(self.db.clone()).await,
280 BackupReader::new(self.db.clone()),
281 ),
282 Network::new(
283 connections.clone(),
284 signed_outcomes_sender,
285 signatures_sender,
286 self.db.clone(),
287 ),
288 Keychain::new(&self.cfg),
289 Spawner::new(self.task_group.make_subgroup()),
290 aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
291 ),
292 );
293
294 self.ord_latency_sender.send_replace(None);
295
296 let signed_session_outcome = self
297 .complete_signed_session_outcome(
298 session_index,
299 unit_data_receiver,
300 signature_sender,
301 timestamp_receiver,
302 signed_outcomes_receiver,
303 signatures_receiver,
304 connections,
305 )
306 .await?;
307
308 assert!(
309 self.validate_signed_session_outcome(&signed_session_outcome, session_index),
310 "Our created signed session outcome fails validation"
311 );
312
313 info!(target: LOG_CONSENSUS, ?session_index, "Terminating Aleph BFT session");
314
315 terminator_sender.send(()).ok();
318 aleph_handle.await.ok();
319
320 self.complete_session(session_index, signed_session_outcome)
324 .await;
325
326 self.checkpoint_database(session_index);
327
328 Some(())
329 }
330
331 async fn is_recovery(&self) -> bool {
332 self.db
333 .begin_transaction_nc()
334 .await
335 .find_by_prefix(&AlephUnitsPrefix)
336 .await
337 .next()
338 .await
339 .is_some()
340 }
341
342 #[allow(clippy::too_many_arguments)]
343 pub async fn complete_signed_session_outcome(
344 &self,
345 session_index: u64,
346 ordered_unit_receiver: Receiver<OrderedUnit>,
347 signature_sender: watch::Sender<Option<schnorr::Signature>>,
348 timestamp_receiver: Receiver<Instant>,
349 signed_outcomes_receiver: Receiver<(PeerId, SignedSessionOutcome)>,
350 signatures_receiver: Receiver<(PeerId, schnorr::Signature)>,
351 _connections: DynP2PConnections<P2PMessage>,
352 ) -> Option<SignedSessionOutcome> {
353 let mut item_index = 0;
356
357 let mut index_broadcast_interval = tokio::time::interval(Duration::from_secs(3));
359
360 let mut request_signed_session_outcome = Box::pin(async {
361 self.request_signed_session_outcome(&self.federation_api, session_index)
362 .await
363 });
364
365 loop {
369 tokio::select! {
370 result = ordered_unit_receiver.recv() => {
371 let ordered_unit = result.ok()?;
372
373 if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
374 info!(
375 target: LOG_CONSENSUS,
376 session_index,
377 "Reached Aleph BFT round limit, stopping item collection"
378 );
379 break;
380 }
381
382 if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
383 if ordered_unit.creator == self.identity() {
384 match timestamp_receiver.try_recv() {
385 Ok(timestamp) => {
386 let latency = match *self.ord_latency_sender.borrow() {
387 Some(latency) => (9 * latency + timestamp.elapsed()) / 10,
388 None => timestamp.elapsed()
389 };
390
391 self.ord_latency_sender.send_replace(Some(latency));
392
393 CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
394 }
395 Err(err) => {
396 debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal in recovery");
397 }
398 }
399 }
400
401 match Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()) {
402 Ok(items) => {
403 for item in items {
404 if let Ok(()) = self.process_consensus_item(
405 session_index,
406 item_index,
407 item.clone(),
408 ordered_unit.creator
409 ).await {
410 item_index += 1;
411 }
412 }
413 }
414 Err(err) => {
415 error!(
416 target: LOG_CONSENSUS,
417 session_index,
418 peer = %ordered_unit.creator,
419 err = %err.fmt_compact(),
420 "Failed to decode consensus items from peer"
421 );
422 }
423 }
424 }
425 },
426 signed_session_outcome = &mut request_signed_session_outcome => {
428 info!(
429 target: LOG_CONSENSUS,
430 ?session_index,
431 "Recovered signed session outcome from peers while processing consensus items"
432 );
433
434 let pending_accepted_items = self.pending_accepted_items().await;
435
436 let (processed, unprocessed) = signed_session_outcome
438 .session_outcome
439 .items
440 .split_at(pending_accepted_items.len());
441
442 info!(
443 target: LOG_CONSENSUS,
444 session_index,
445 processed = %processed.len(),
446 unprocessed = %unprocessed.len(),
447 "Processing remaining items..."
448 );
449
450 assert!(
451 processed.iter().eq(pending_accepted_items.iter()),
452 "Consensus Failure: pending accepted items disagree with federation consensus"
453 );
454
455 for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
456 if let Err(err) = self.process_consensus_item(
457 session_index,
458 item_index as u64,
459 accepted_item.item.clone(),
460 accepted_item.peer
461 ).await {
462 panic!(
463 "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
464 processed.len(),
465 unprocessed.len(),
466 );
467 }
468 }
469
470 return Some(signed_session_outcome);
471 },
472 result = signed_outcomes_receiver.recv() => {
473 let (peer_id, p2p_outcome) = result.ok()?;
474
475 if self.validate_signed_session_outcome(&p2p_outcome, session_index) {
477 info!(
478 target: LOG_CONSENSUS,
479 session_index,
480 peer_id = %peer_id,
481 "Received SignedSessionOutcome via P2P while collection signatures"
482 );
483
484 let pending_accepted_items = self.pending_accepted_items().await;
485
486 let (processed, unprocessed) = p2p_outcome
488 .session_outcome
489 .items
490 .split_at(pending_accepted_items.len());
491
492 info!(
493 target: LOG_CONSENSUS,
494 ?session_index,
495 processed = %processed.len(),
496 unprocessed = %unprocessed.len(),
497 "Processing remaining items..."
498 );
499
500 assert!(
501 processed.iter().eq(pending_accepted_items.iter()),
502 "Consensus Failure: pending accepted items disagree with federation consensus"
503 );
504
505 for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
506 if let Err(err) = self.process_consensus_item(
507 session_index,
508 item_index as u64,
509 accepted_item.item.clone(),
510 accepted_item.peer
511 ).await {
512 panic!(
513 "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
514 processed.len(),
515 unprocessed.len(),
516 );
517 }
518 }
519
520 info!(
521 target: LOG_CONSENSUS,
522 ?session_index,
523 peer_id = %peer_id,
524 "Successfully recovered session via P2P"
525 );
526
527 return Some(p2p_outcome);
528 }
529
530 debug!(
531 target: LOG_CONSENSUS,
532 %peer_id,
533 "Invalid P2P SignedSessionOutcome"
534 );
535 }
536 _ = index_broadcast_interval.tick() => {
537 }
543 }
544 }
545
546 let items = self.pending_accepted_items().await;
547
548 assert_eq!(item_index, items.len() as u64);
549
550 info!(target: LOG_CONSENSUS, ?session_index, ?item_index, "Processed all items for session");
551
552 let session_outcome = SessionOutcome { items };
553
554 let header = session_outcome.header(session_index);
555
556 info!(
557 target: LOG_CONSENSUS,
558 ?session_index,
559 "Signing session header..."
560 );
561
562 let keychain = Keychain::new(&self.cfg);
563
564 let our_signature = keychain.sign_schnorr(&header);
565
566 #[allow(clippy::disallowed_methods)]
568 signature_sender.send(Some(our_signature)).ok()?;
569
570 let mut signatures = BTreeMap::from_iter([(self.identity(), our_signature)]);
571
572 let items_dump = tokio::sync::OnceCell::new();
573
574 let mut signature_broadcast_interval = tokio::time::interval(Duration::from_secs(1));
576
577 while signatures.len() < self.num_peers().threshold() {
580 tokio::select! {
581 result = ordered_unit_receiver.recv() => {
583 let ordered_unit = result.ok()?;
584
585 if let Some(UnitData::Signature(signature)) = ordered_unit.data {
586 info!(
587 target: LOG_CONSENSUS,
588 ?session_index,
589 peer = %ordered_unit.creator,
590 "Collected signature from peer via AlephBFT, verifying..."
591 );
592
593 if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
594 signatures.insert(ordered_unit.creator, schnorr::Signature::from_slice(&signature).expect("AlephBFT signature is valid"));
595 } else {
596 error!(
597 target: LOG_CONSENSUS,
598 ?session_index,
599 peer = %ordered_unit.creator,
600 "Consensus Failure: invalid header signature from peer"
601 );
602
603 items_dump.get_or_init(|| async {
604 for (idx, item) in session_outcome.items.iter().enumerate() {
605 info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
606 }
607 }).await;
608 }
609 }
610 }
611 result = signatures_receiver.recv() => {
612 let (peer_id, signature) = result.ok()?;
613
614 if keychain.verify_schnorr(&header, &signature, peer_id) {
615 signatures.insert(peer_id, signature);
616
617 info!(
618 target: LOG_CONSENSUS,
619 session_index,
620 peer_id = %peer_id,
621 "Collected signature from peer via P2P"
622 );
623 }
624
625 debug!(
626 target: LOG_CONSENSUS,
627 session_index,
628 peer_id = %peer_id,
629 "Invalid P2P signature from peer"
630 );
631 }
632 signed_session_outcome = &mut request_signed_session_outcome => {
634 info!(
635 target: LOG_CONSENSUS,
636 ?session_index,
637 "Recovered signed session outcome from peers while collecting signatures"
638 );
639
640 assert_eq!(
641 header,
642 signed_session_outcome.session_outcome.header(session_index),
643 "Consensus Failure: header disagrees with federation consensus"
644 );
645
646 return Some(signed_session_outcome);
647 },
648 result = signed_outcomes_receiver.recv() => {
649 let (peer_id, p2p_outcome) = result.ok()?;
650
651 if self.validate_signed_session_outcome(&p2p_outcome, session_index) {
652 assert_eq!(
653 header,
654 p2p_outcome.session_outcome.header(session_index),
655 "Consensus Failure: header disagrees with federation consensus"
656 );
657
658 info!(
659 target: LOG_CONSENSUS,
660 session_index,
661 %peer_id,
662 "Recovered session via P2P while collecting signatures"
663 );
664
665 return Some(p2p_outcome);
666 }
667
668 debug!(
669 target: LOG_CONSENSUS,
670 %peer_id,
671 "Invalid P2P SignedSessionOutcome"
672 );
673 }
674 _ = signature_broadcast_interval.tick() => {
675 }
681 _ = index_broadcast_interval.tick() => {
682 }
688 }
689 }
690
691 info!(
692 target: LOG_CONSENSUS,
693 session_index,
694 "Successfully collected threshold of signatures"
695 );
696
697 Some(SignedSessionOutcome {
698 session_outcome,
699 signatures,
700 })
701 }
702
703 #[allow(unused)]
705 fn random_peer(&self) -> PeerId {
706 self.num_peers()
707 .peer_ids()
708 .filter(|p| *p != self.identity())
709 .choose(&mut rand::thread_rng())
710 .expect("We have at least three peers")
711 }
712
713 fn validate_signed_session_outcome(
715 &self,
716 outcome: &SignedSessionOutcome,
717 session_index: u64,
718 ) -> bool {
719 if outcome.signatures.len() != self.num_peers().threshold() {
720 return false;
721 }
722
723 let keychain = Keychain::new(&self.cfg);
724 let header = outcome.session_outcome.header(session_index);
725
726 outcome
727 .signatures
728 .iter()
729 .all(|(signer_id, sig)| keychain.verify_schnorr(&header, sig, *signer_id))
730 }
731
732 fn decoders(&self) -> ModuleDecoderRegistry {
733 self.modules.decoder_registry()
734 }
735
736 pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
737 self.db
738 .begin_transaction_nc()
739 .await
740 .find_by_prefix(&AcceptedItemPrefix)
741 .await
742 .map(|entry| entry.1)
743 .collect()
744 .await
745 }
746
747 pub async fn complete_session(
748 &self,
749 session_index: u64,
750 signed_session_outcome: SignedSessionOutcome,
751 ) {
752 let mut dbtx = self.db.begin_transaction().await;
753
754 dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
755
756 dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
757
758 if dbtx
759 .insert_entry(
760 &SignedSessionOutcomeKey(session_index),
761 &signed_session_outcome,
762 )
763 .await
764 .is_some()
765 {
766 panic!("We tried to overwrite a signed session outcome");
767 }
768
769 dbtx.commit_tx_result()
770 .await
771 .expect("This is the only place where we write to this key");
772 }
773
774 fn db_checkpoints_dir(&self) -> PathBuf {
776 self.data_dir.join(DB_CHECKPOINTS_DIR)
777 }
778
779 fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
783 let checkpoint_dir = self.db_checkpoints_dir();
784
785 if checkpoint_dir.exists() {
786 debug!(
787 target: LOG_CONSENSUS,
788 ?current_session,
789 "Removing database checkpoints up to `current_session`"
790 );
791
792 for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
793 if let Ok(file_name) = checkpoint.file_name().into_string()
795 && let Ok(session) = file_name.parse::<u64>()
796 && current_session >= self.db_checkpoint_retention
797 && session < current_session - self.db_checkpoint_retention
798 {
799 fs::remove_dir_all(checkpoint.path())?;
800 }
801 }
802 } else {
803 fs::create_dir_all(&checkpoint_dir)?;
804 }
805
806 Ok(())
807 }
808
809 fn checkpoint_database(&self, session_index: u64) {
813 if self.db_checkpoint_retention == 0 {
816 return;
817 }
818
819 let checkpoint_dir = self.db_checkpoints_dir();
820 let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
821
822 {
823 let _timing = timing::TimeReporter::new("database-checkpoint").level(Level::TRACE);
824 match self.db.checkpoint(&session_checkpoint_dir) {
825 Ok(()) => {
826 debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
827 }
828 Err(err) => {
829 warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, err = %err.fmt_compact_anyhow(), "Could not create db checkpoint");
830 }
831 }
832 }
833
834 {
835 let _timing = timing::TimeReporter::new("remove-database-checkpoint").level(Level::TRACE);
837 if let Err(err) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
838 warn!(target: LOG_CONSENSUS, err = %err.fmt_compact_anyhow(), "Could not delete old checkpoints");
839 }
840 }
841 }
842
843 fn delete_old_database_checkpoint(
846 &self,
847 session_index: u64,
848 checkpoint_dir: &Path,
849 ) -> anyhow::Result<()> {
850 if self.db_checkpoint_retention > session_index {
851 return Ok(());
852 }
853
854 let delete_session_index = session_index - self.db_checkpoint_retention;
855 let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
856 if checkpoint_to_delete.exists() {
857 fs::remove_dir_all(checkpoint_to_delete)?;
858 }
859
860 Ok(())
861 }
862
863 #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
864 pub async fn process_consensus_item(
865 &self,
866 session_index: u64,
867 item_index: u64,
868 item: ConsensusItem,
869 peer: PeerId,
870 ) -> anyhow::Result<()> {
871 let _timing = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
872
873 let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
874 .with_label_values(&[&peer.to_usize().to_string()])
875 .start_timer();
876
877 trace!(
878 target: LOG_CONSENSUS,
879 %peer,
880 item = ?DebugConsensusItem(&item),
881 "Processing consensus item"
882 );
883
884 self.ci_status_senders
885 .get(&peer)
886 .expect("No ci status sender for peer")
887 .send_replace(Some(session_index));
888
889 CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
890 .with_label_values(&[
891 &self.cfg.local.identity.to_usize().to_string(),
892 &peer.to_usize().to_string(),
893 ])
894 .set(session_index as i64);
895
896 let mut dbtx = self.db.begin_transaction().await;
897
898 dbtx.ignore_uncommitted();
899
900 if let Some(existing_item) = dbtx
904 .get_value(&AcceptedItemKey(item_index.to_owned()))
905 .await
906 {
907 if existing_item.item == item && existing_item.peer == peer {
908 return Ok(());
909 }
910
911 bail!(
912 "Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}",
913 existing_item.peer
914 );
915 }
916
917 self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
918 .await
919 .inspect_err(|err| {
920 trace!(
922 target: LOG_CONSENSUS,
923 %peer,
924 item = ?DebugConsensusItem(&item),
925 err = %err.fmt_compact_anyhow(),
926 "Rejected consensus item"
927 );
928 })?;
929
930 dbtx.warn_uncommitted();
933
934 dbtx.insert_entry(
935 &AcceptedItemKey(item_index),
936 &AcceptedItem {
937 item: item.clone(),
938 peer,
939 },
940 )
941 .await;
942
943 debug!(
944 target: LOG_CONSENSUS,
945 %peer,
946 item = ?DebugConsensusItem(&item),
947 "Processed consensus item"
948 );
949 let mut audit = Audit::default();
950
951 for (module_instance_id, kind, module) in self.modules.iter_modules() {
952 let _module_audit_timing =
953 TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
954
955 let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
956 .with_label_values(&[
957 MODULE_INSTANCE_ID_GLOBAL.to_string().as_str(),
958 kind.as_str(),
959 ])
960 .start_timer();
961
962 module
963 .audit(
964 &mut dbtx
965 .to_ref_with_prefix_module_id(module_instance_id)
966 .0
967 .into_nc(),
968 &mut audit,
969 module_instance_id,
970 )
971 .await;
972
973 timing_prom.observe_duration();
974 }
975
976 assert!(
977 audit
978 .net_assets()
979 .expect("Overflow while checking balance sheet")
980 .milli_sat
981 >= 0,
982 "Balance sheet of the fed has gone negative, this should never happen! {audit}"
983 );
984
985 dbtx.commit_tx_result()
986 .await
987 .expect("Committing consensus epoch failed");
988
989 CONSENSUS_ITEMS_PROCESSED_TOTAL
990 .with_label_values(&[&peer.to_usize().to_string()])
991 .inc();
992
993 timing_prom.observe_duration();
994
995 Ok(())
996 }
997
998 async fn process_consensus_item_with_db_transaction(
999 &self,
1000 dbtx: &mut DatabaseTransaction<'_>,
1001 consensus_item: ConsensusItem,
1002 peer_id: PeerId,
1003 ) -> anyhow::Result<()> {
1004 self.decoders().assert_reject_mode();
1007
1008 match consensus_item {
1009 ConsensusItem::Module(module_item) => {
1010 let instance_id = module_item.module_instance_id();
1011
1012 let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
1013
1014 self.modules
1015 .get_expect(instance_id)
1016 .process_consensus_item(module_dbtx, &module_item, peer_id)
1017 .await
1018 }
1019 ConsensusItem::Transaction(transaction) => {
1020 let txid = transaction.tx_hash();
1021 if dbtx
1022 .get_value(&AcceptedTransactionKey(txid))
1023 .await
1024 .is_some()
1025 {
1026 debug!(
1027 target: LOG_CONSENSUS,
1028 %txid,
1029 "Transaction already accepted"
1030 );
1031 bail!("Transaction is already accepted");
1032 }
1033
1034 let modules_ids = transaction
1035 .outputs
1036 .iter()
1037 .map(DynOutput::module_instance_id)
1038 .collect::<Vec<_>>();
1039
1040 process_transaction_with_dbtx(
1041 self.modules.clone(),
1042 dbtx,
1043 &transaction,
1044 self.cfg.consensus.version,
1045 TxProcessingMode::Consensus,
1046 )
1047 .await
1048 .map_err(|error| anyhow!(error.to_string()))?;
1049
1050 debug!(target: LOG_CONSENSUS, %txid, "Transaction accepted");
1051 dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
1052 .await;
1053
1054 Ok(())
1055 }
1056 ConsensusItem::Default { variant, .. } => {
1057 warn!(
1058 target: LOG_CONSENSUS,
1059 "Minor consensus version mismatch: unexpected consensus item type: {variant}"
1060 );
1061
1062 panic!("Unexpected consensus item type: {variant}")
1063 }
1064 }
1065 }
1066
1067 async fn request_signed_session_outcome(
1068 &self,
1069 federation_api: &DynGlobalApi,
1070 index: u64,
1071 ) -> SignedSessionOutcome {
1072 let decoders = self.decoders();
1073 let keychain = Keychain::new(&self.cfg);
1074 let threshold = self.num_peers().threshold();
1075
1076 let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
1077 let signed_session_outcome = response
1078 .try_into_inner(&decoders)
1079 .map_err(|x| ServerError::ResponseDeserialization(x.into()))?;
1080 let header = signed_session_outcome.session_outcome.header(index);
1081 if signed_session_outcome.signatures.len() == threshold
1082 && signed_session_outcome
1083 .signatures
1084 .iter()
1085 .all(|(peer_id, sig)| keychain.verify_schnorr(&header, sig, *peer_id))
1086 {
1087 Ok(signed_session_outcome)
1088 } else {
1089 Err(ServerError::InvalidResponse(anyhow!("Invalid signatures")))
1090 }
1091 };
1092
1093 let mut backoff = fedimint_core::util::backoff_util::api_networking_backoff();
1094 loop {
1095 let result = federation_api
1096 .request_with_strategy(
1097 FilterMap::new(filter_map.clone()),
1098 AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
1099 ApiRequestErased::new(index),
1100 )
1101 .await;
1102
1103 match result {
1104 Ok(signed_session_outcome) => return signed_session_outcome,
1105 Err(error) => {
1106 error.report_if_unusual("Requesting Session Outcome");
1107 }
1108 }
1109
1110 sleep(backoff.next().expect("infinite retries")).await;
1111 }
1112 }
1113
1114 async fn get_finished_session_count(&self) -> u64 {
1117 get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
1118 }
1119}
1120
1121pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
1122 dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
1123 .await
1124 .next()
1125 .await
1126 .map_or(0, |entry| (entry.0.0) + 1)
1127}