1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use aleph_bft::Keychain as KeychainTrait;
8use anyhow::{Context as _, anyhow, bail};
9use async_channel::Receiver;
10use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, PeerError};
11use fedimint_api_client::query::FilterMap;
12use fedimint_core::config::P2PMessage;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::module::audit::Audit;
19use fedimint_core::module::registry::ModuleDecoderRegistry;
20use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
21use fedimint_core::net::peers::DynP2PConnections;
22use fedimint_core::runtime::spawn;
23use fedimint_core::session_outcome::{
24 AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome,
25};
26use fedimint_core::task::{TaskGroup, TaskHandle, sleep};
27use fedimint_core::timing::TimeReporter;
28use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
29use fedimint_core::{NumPeers, NumPeersExt, PeerId, timing};
30use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
31use futures::StreamExt;
32use rand::Rng;
33use tokio::sync::watch;
34use tracing::{Level, debug, info, instrument, trace, warn};
35
36use crate::LOG_CONSENSUS;
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{DataProvider, UnitData};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::to_node_index;
45use crate::consensus::db::{
46 AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47 SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
51use crate::metrics::{
52 CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
53 CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ITEMS_PROCESSED_TOTAL,
54 CONSENSUS_ORDERING_LATENCY_SECONDS, CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX,
55 CONSENSUS_SESSION_COUNT,
56};
57
58const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61pub struct ConsensusEngine {
63 pub modules: ServerModuleRegistry,
64 pub db: Database,
65 pub federation_api: DynGlobalApi,
66 pub cfg: ServerConfig,
67 pub submission_receiver: Receiver<ConsensusItem>,
68 pub shutdown_receiver: watch::Receiver<Option<u64>>,
69 pub connections: DynP2PConnections<P2PMessage>,
70 pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
71 pub ord_latency_sender: watch::Sender<Option<Duration>>,
72 pub task_group: TaskGroup,
73 pub data_dir: PathBuf,
74 pub db_checkpoint_retention: u64,
75}
76
77impl ConsensusEngine {
78 fn num_peers(&self) -> NumPeers {
79 self.cfg.consensus.broadcast_public_keys.to_num_peers()
80 }
81
82 fn identity(&self) -> PeerId {
83 self.cfg.local.identity
84 }
85
86 #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
87 pub async fn run(self) -> anyhow::Result<()> {
88 if self.num_peers().total() == 1 {
89 self.run_single_guardian(self.task_group.make_handle())
90 .await
91 } else {
92 self.run_consensus(self.task_group.make_handle()).await
93 }
94 }
95
96 pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
97 assert_eq!(self.num_peers(), NumPeers::from(1));
98
99 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
100
101 while !task_handle.is_shutting_down() {
102 let session_index = self.get_finished_session_count().await;
103
104 CONSENSUS_SESSION_COUNT.set(session_index as i64);
105
106 let mut item_index = self.pending_accepted_items().await.len() as u64;
107
108 let session_start_time = std::time::Instant::now();
109
110 while let Ok(item) = self.submission_receiver.recv().await {
111 if self
112 .process_consensus_item(session_index, item_index, item, self.identity())
113 .await
114 .is_ok()
115 {
116 item_index += 1;
117 }
118
119 if session_start_time.elapsed() > Duration::from_secs(60) {
121 break;
122 }
123 }
124
125 let session_outcome = SessionOutcome {
126 items: self.pending_accepted_items().await,
127 };
128
129 let header = session_outcome.header(session_index);
130 let signature = Keychain::new(&self.cfg).sign(&header);
131 let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
132
133 self.complete_session(
134 session_index,
135 SignedSessionOutcome {
136 session_outcome,
137 signatures,
138 },
139 )
140 .await;
141
142 self.checkpoint_database(session_index);
143
144 info!(target: LOG_CONSENSUS, "Session {session_index} completed");
145
146 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
147 break;
148 }
149 }
150
151 info!(target: LOG_CONSENSUS, "Consensus task shut down");
152
153 Ok(())
154 }
155
156 pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
157 assert!(self.num_peers().total() >= 4);
159
160 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
161
162 while !task_handle.is_shutting_down() {
163 let session_index = self.get_finished_session_count().await;
164
165 CONSENSUS_SESSION_COUNT.set(session_index as i64);
166
167 info!(target: LOG_CONSENSUS, session_index, "Starting consensus session");
168
169 self.run_session(self.connections.clone(), session_index)
170 .await?;
171
172 info!(target: LOG_CONSENSUS, session_index, "Completed consensus session");
173
174 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
175 info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
176
177 sleep(Duration::from_secs(60)).await;
178
179 break;
180 }
181 }
182
183 info!(target: LOG_CONSENSUS, "Consensus task shut down");
184
185 Ok(())
186 }
187
188 pub async fn run_session(
189 &self,
190 connections: DynP2PConnections<P2PMessage>,
191 session_index: u64,
192 ) -> anyhow::Result<()> {
193 const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
211 const BASE: f64 = 1.02;
212
213 let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
214 let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
215
216 let mut delay_config = aleph_bft::default_delay_config();
217
218 delay_config.unit_creation_delay = Arc::new(move |round_index| {
219 let delay = if round_index == 0 {
220 0.0
221 } else {
222 round_delay
223 * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
224 * rand::thread_rng().gen_range(0.5..=1.5)
225 };
226
227 Duration::from_millis(delay.round() as u64)
228 });
229
230 let config = aleph_bft::create_config(
231 self.num_peers().total().into(),
232 self.identity().to_usize().into(),
233 session_index,
234 self.cfg
235 .consensus
236 .broadcast_rounds_per_session
237 .checked_add(EXP_SLOWDOWN_ROUNDS)
238 .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
239 delay_config,
240 Duration::from_secs(10 * 365 * 24 * 60 * 60),
241 )
242 .expect("The exponential slowdown exceeds 10 years");
243
244 let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
247 let (signature_sender, signature_receiver) = watch::channel(None);
248 let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
249 let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
250
251 let aleph_handle = spawn(
252 "aleph run session",
253 aleph_bft::run_session(
254 config,
255 aleph_bft::LocalIO::new(
256 DataProvider::new(
257 self.submission_receiver.clone(),
258 signature_receiver,
259 timestamp_sender,
260 self.is_recovery().await,
261 ),
262 FinalizationHandler::new(unit_data_sender),
263 BackupWriter::new(self.db.clone()).await,
264 BackupReader::new(self.db.clone()),
265 ),
266 Network::new(connections),
267 Keychain::new(&self.cfg),
268 Spawner::new(self.task_group.make_subgroup()),
269 aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
270 ),
271 );
272
273 self.ord_latency_sender.send_replace(None);
274
275 let signed_session_outcome = self
276 .complete_signed_session_outcome(
277 session_index,
278 unit_data_receiver,
279 signature_sender,
280 timestamp_receiver,
281 )
282 .await?;
283
284 terminator_sender.send(()).ok();
287 aleph_handle.await.ok();
288
289 self.complete_session(session_index, signed_session_outcome)
293 .await;
294
295 self.checkpoint_database(session_index);
296
297 Ok(())
298 }
299
300 async fn is_recovery(&self) -> bool {
301 self.db
302 .begin_transaction_nc()
303 .await
304 .find_by_prefix(&AlephUnitsPrefix)
305 .await
306 .next()
307 .await
308 .is_some()
309 }
310
311 pub async fn complete_signed_session_outcome(
312 &self,
313 session_index: u64,
314 ordered_unit_receiver: Receiver<OrderedUnit>,
315 signature_sender: watch::Sender<Option<SchnorrSignature>>,
316 timestamp_receiver: Receiver<Instant>,
317 ) -> anyhow::Result<SignedSessionOutcome> {
318 let mut item_index = 0;
321
322 let mut request_signed_session_outcome = Box::pin(async {
323 self.request_signed_session_outcome(&self.federation_api, session_index)
324 .await
325 });
326
327 loop {
331 tokio::select! {
332 ordered_unit = ordered_unit_receiver.recv() => {
333 let ordered_unit = ordered_unit.with_context(|| format!("Alepbft task exited prematurely. session_idx: {session_index}, item_idx: {item_index}")) ;
334 let ordered_unit = match ordered_unit {
335 Ok(o) => o,
336 Err(err) => {
337 while self.task_group.is_shutting_down() {
344 info!(target: LOG_CONSENSUS, "Shutdown detected");
345 sleep(Duration::from_millis(100)).await;
346 }
347 return Err(err);
349 },
350 };
351
352 if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
353 break;
354 }
355
356 if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
357 if ordered_unit.creator == self.identity() {
358 match timestamp_receiver.try_recv() {
359 Ok(timestamp) => {
360 let latency = match *self.ord_latency_sender.borrow() {
361 Some(latency) => (9 * latency + timestamp.elapsed()) / 10,
362 None => timestamp.elapsed()
363 };
364
365 self.ord_latency_sender.send_replace(Some(latency));
366
367 CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
368 }
369 Err(err) => {
370 debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal in recovery");
371 }
372 }
373 }
374
375 match Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()) {
376 Ok(items) => {
377 for item in items {
378 if let Ok(()) = self.process_consensus_item(
379 session_index,
380 item_index,
381 item.clone(),
382 ordered_unit.creator
383 ).await {
384 item_index += 1;
385 }
386 }
387 }
388 Err(err) => {
389 warn!(
390 target: LOG_CONSENSUS,
391 %session_index,
392 peer = %ordered_unit.creator,
393 err = %err.fmt_compact(),
394 "Failed to decode consensus items from peer"
395 );
396 }
397 }
398 }
399 },
400 signed_session_outcome = &mut request_signed_session_outcome => {
401 let pending_accepted_items = self.pending_accepted_items().await;
402
403 let (processed, unprocessed) = signed_session_outcome
405 .session_outcome
406 .items
407 .split_at(pending_accepted_items.len());
408
409 assert!(
410 processed.iter().eq(pending_accepted_items.iter()),
411 "Consensus Failure: pending accepted items disagree with federation consensus"
412 );
413
414 for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
415 if let Err(err) = self.process_consensus_item(
416 session_index,
417 item_index as u64,
418 accepted_item.item.clone(),
419 accepted_item.peer
420 ).await {
421 panic!(
422 "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
423 processed.len(),
424 unprocessed.len(),
425 );
426 }
427 }
428
429 return Ok(signed_session_outcome);
430 }
431 }
432 }
433
434 let items = self.pending_accepted_items().await;
435
436 assert_eq!(item_index, items.len() as u64);
437
438 let session_outcome = SessionOutcome { items };
439
440 let header = session_outcome.header(session_index);
441
442 let keychain = Keychain::new(&self.cfg);
443
444 #[allow(clippy::disallowed_methods)]
447 signature_sender.send(Some(keychain.sign(&header)))?;
448
449 let mut signatures = BTreeMap::new();
450
451 let items_dump = tokio::sync::OnceCell::new();
452
453 while signatures.len() < self.num_peers().threshold() {
456 tokio::select! {
457 ordered_unit = ordered_unit_receiver.recv() => {
458 let ordered_unit = ordered_unit?;
459
460 if let Some(UnitData::Signature(signature)) = ordered_unit.data {
461 if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
462 signatures.insert(ordered_unit.creator, signature);
463 } else {
464 warn!(target: LOG_CONSENSUS, "Consensus Failure: invalid header signature from {}", ordered_unit.creator);
465
466 items_dump.get_or_init(|| async {
467 for (idx, item) in session_outcome.items.iter().enumerate() {
468 info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
469 }
470 }).await;
471 }
472 }
473 }
474 signed_session_outcome = &mut request_signed_session_outcome => {
475 assert_eq!(
476 header,
477 signed_session_outcome.session_outcome.header(session_index),
478 "Consensus Failure: header disagrees with federation consensus"
479 );
480
481 return Ok(signed_session_outcome);
482 }
483 }
484 }
485
486 Ok(SignedSessionOutcome {
487 session_outcome,
488 signatures,
489 })
490 }
491
492 fn decoders(&self) -> ModuleDecoderRegistry {
493 self.modules.decoder_registry()
494 }
495
496 pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
497 self.db
498 .begin_transaction_nc()
499 .await
500 .find_by_prefix(&AcceptedItemPrefix)
501 .await
502 .map(|entry| entry.1)
503 .collect()
504 .await
505 }
506
507 pub async fn complete_session(
508 &self,
509 session_index: u64,
510 signed_session_outcome: SignedSessionOutcome,
511 ) {
512 let mut dbtx = self.db.begin_transaction().await;
513
514 dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
515
516 dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
517
518 if dbtx
519 .insert_entry(
520 &SignedSessionOutcomeKey(session_index),
521 &signed_session_outcome,
522 )
523 .await
524 .is_some()
525 {
526 panic!("We tried to overwrite a signed session outcome");
527 }
528
529 dbtx.commit_tx_result()
530 .await
531 .expect("This is the only place where we write to this key");
532 }
533
534 fn db_checkpoints_dir(&self) -> PathBuf {
536 self.data_dir.join(DB_CHECKPOINTS_DIR)
537 }
538
539 fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
543 let checkpoint_dir = self.db_checkpoints_dir();
544
545 if checkpoint_dir.exists() {
546 debug!(
547 target: LOG_CONSENSUS,
548 ?current_session,
549 "Removing database checkpoints up to `current_session`"
550 );
551
552 for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
553 if let Ok(file_name) = checkpoint.file_name().into_string() {
555 if let Ok(session) = file_name.parse::<u64>() {
556 if current_session >= self.db_checkpoint_retention
557 && session < current_session - self.db_checkpoint_retention
558 {
559 fs::remove_dir_all(checkpoint.path())?;
560 }
561 }
562 }
563 }
564 } else {
565 fs::create_dir_all(&checkpoint_dir)?;
566 }
567
568 Ok(())
569 }
570
571 fn checkpoint_database(&self, session_index: u64) {
575 if self.db_checkpoint_retention == 0 {
578 return;
579 }
580
581 let checkpoint_dir = self.db_checkpoints_dir();
582 let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
583
584 {
585 let _timing = timing::TimeReporter::new("database-checkpoint").level(Level::TRACE);
586 match self.db.checkpoint(&session_checkpoint_dir) {
587 Ok(()) => {
588 debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
589 }
590 Err(e) => {
591 warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, ?e, "Could not create db checkpoint");
592 }
593 }
594 }
595
596 {
597 let _timing = timing::TimeReporter::new("remove-database-checkpoint").level(Level::TRACE);
599 if let Err(e) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
600 warn!(target: LOG_CONSENSUS, ?e, "Could not delete old checkpoints");
601 }
602 }
603 }
604
605 fn delete_old_database_checkpoint(
608 &self,
609 session_index: u64,
610 checkpoint_dir: &Path,
611 ) -> anyhow::Result<()> {
612 if self.db_checkpoint_retention > session_index {
613 return Ok(());
614 }
615
616 let delete_session_index = session_index - self.db_checkpoint_retention;
617 let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
618 if checkpoint_to_delete.exists() {
619 fs::remove_dir_all(checkpoint_to_delete)?;
620 }
621
622 Ok(())
623 }
624
625 #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
626 pub async fn process_consensus_item(
627 &self,
628 session_index: u64,
629 item_index: u64,
630 item: ConsensusItem,
631 peer: PeerId,
632 ) -> anyhow::Result<()> {
633 let _timing = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
634
635 let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
636 .with_label_values(&[&peer.to_usize().to_string()])
637 .start_timer();
638
639 trace!(
640 target: LOG_CONSENSUS,
641 %peer,
642 item = ?DebugConsensusItem(&item),
643 "Processing consensus item"
644 );
645
646 self.ci_status_senders
647 .get(&peer)
648 .expect("No ci status sender for peer")
649 .send_replace(Some(session_index));
650
651 CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
652 .with_label_values(&[
653 &self.cfg.local.identity.to_usize().to_string(),
654 &peer.to_usize().to_string(),
655 ])
656 .set(session_index as i64);
657
658 let mut dbtx = self.db.begin_transaction().await;
659
660 dbtx.ignore_uncommitted();
661
662 if let Some(existing_item) = dbtx
666 .get_value(&AcceptedItemKey(item_index.to_owned()))
667 .await
668 {
669 if existing_item.item == item && existing_item.peer == peer {
670 return Ok(());
671 }
672
673 bail!(
674 "Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}",
675 existing_item.peer
676 );
677 }
678
679 self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
680 .await
681 .inspect_err(|err| {
682 trace!(
684 target: LOG_CONSENSUS,
685 %peer,
686 item = ?DebugConsensusItem(&item),
687 err = %err.fmt_compact_anyhow(),
688 "Rejected consensus item"
689 );
690 })?;
691
692 dbtx.warn_uncommitted();
695
696 dbtx.insert_entry(
697 &AcceptedItemKey(item_index),
698 &AcceptedItem {
699 item: item.clone(),
700 peer,
701 },
702 )
703 .await;
704
705 debug!(
706 target: LOG_CONSENSUS,
707 %peer,
708 item = ?DebugConsensusItem(&item),
709 "Processed consensus item"
710 );
711 let mut audit = Audit::default();
712
713 for (module_instance_id, kind, module) in self.modules.iter_modules() {
714 let _module_audit_timing =
715 TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
716
717 let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
718 .with_label_values(&[&MODULE_INSTANCE_ID_GLOBAL.to_string(), kind.as_str()])
719 .start_timer();
720
721 module
722 .audit(
723 &mut dbtx
724 .to_ref_with_prefix_module_id(module_instance_id)
725 .0
726 .into_nc(),
727 &mut audit,
728 module_instance_id,
729 )
730 .await;
731
732 timing_prom.observe_duration();
733 }
734
735 assert!(
736 audit
737 .net_assets()
738 .expect("Overflow while checking balance sheet")
739 .milli_sat
740 >= 0,
741 "Balance sheet of the fed has gone negative, this should never happen! {audit}"
742 );
743
744 dbtx.commit_tx_result()
745 .await
746 .expect("Committing consensus epoch failed");
747
748 CONSENSUS_ITEMS_PROCESSED_TOTAL
749 .with_label_values(&[&peer.to_usize().to_string()])
750 .inc();
751
752 timing_prom.observe_duration();
753
754 Ok(())
755 }
756
757 async fn process_consensus_item_with_db_transaction(
758 &self,
759 dbtx: &mut DatabaseTransaction<'_>,
760 consensus_item: ConsensusItem,
761 peer_id: PeerId,
762 ) -> anyhow::Result<()> {
763 self.decoders().assert_reject_mode();
766
767 match consensus_item {
768 ConsensusItem::Module(module_item) => {
769 let instance_id = module_item.module_instance_id();
770
771 let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
772
773 self.modules
774 .get_expect(instance_id)
775 .process_consensus_item(module_dbtx, &module_item, peer_id)
776 .await
777 }
778 ConsensusItem::Transaction(transaction) => {
779 let txid = transaction.tx_hash();
780 if dbtx
781 .get_value(&AcceptedTransactionKey(txid))
782 .await
783 .is_some()
784 {
785 debug!(
786 target: LOG_CONSENSUS,
787 %txid,
788 "Transaction already accepted"
789 );
790 bail!("Transaction is already accepted");
791 }
792
793 let modules_ids = transaction
794 .outputs
795 .iter()
796 .map(DynOutput::module_instance_id)
797 .collect::<Vec<_>>();
798
799 process_transaction_with_dbtx(
800 self.modules.clone(),
801 dbtx,
802 &transaction,
803 self.cfg.consensus.version,
804 TxProcessingMode::Consensus,
805 )
806 .await
807 .map_err(|error| anyhow!(error.to_string()))?;
808
809 debug!(target: LOG_CONSENSUS, %txid, "Transaction accepted");
810 dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
811 .await;
812
813 Ok(())
814 }
815 ConsensusItem::Default { variant, .. } => {
816 warn!(
817 target: LOG_CONSENSUS,
818 "Minor consensus version mismatch: unexpected consensus item type: {variant}"
819 );
820
821 panic!("Unexpected consensus item type: {variant}")
822 }
823 }
824 }
825
826 async fn request_signed_session_outcome(
827 &self,
828 federation_api: &DynGlobalApi,
829 index: u64,
830 ) -> SignedSessionOutcome {
831 let decoders = self.decoders();
832 let keychain = Keychain::new(&self.cfg);
833 let threshold = self.num_peers().threshold();
834
835 let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
836 let signed_session_outcome = response
837 .try_into_inner(&decoders)
838 .map_err(|x| PeerError::ResponseDeserialization(x.into()))?;
839 let header = signed_session_outcome.session_outcome.header(index);
840 if signed_session_outcome.signatures.len() == threshold
841 && signed_session_outcome
842 .signatures
843 .iter()
844 .all(|(peer_id, sig)| keychain.verify(&header, sig, to_node_index(*peer_id)))
845 {
846 Ok(signed_session_outcome)
847 } else {
848 Err(PeerError::InvalidResponse(anyhow!("Invalid signatures")))
849 }
850 };
851
852 let mut backoff = fedimint_core::util::backoff_util::api_networking_backoff();
853 loop {
854 let result = federation_api
855 .request_with_strategy(
856 FilterMap::new(filter_map.clone()),
857 AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
858 ApiRequestErased::new(index),
859 )
860 .await;
861
862 match result {
863 Ok(signed_session_outcome) => return signed_session_outcome,
864 Err(error) => {
865 error.report_if_unusual("Requesting Session Outcome");
866 }
867 }
868
869 sleep(backoff.next().expect("infinite retries")).await;
870 }
871 }
872
873 async fn get_finished_session_count(&self) -> u64 {
876 get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
877 }
878}
879
880pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
881 dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
882 .await
883 .next()
884 .await
885 .map_or(0, |entry| (entry.0.0) + 1)
886}