1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use aleph_bft::Keychain as KeychainTrait;
8use anyhow::{Context as _, anyhow, bail};
9use async_channel::Receiver;
10use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, PeerError};
11use fedimint_api_client::query::FilterMap;
12use fedimint_core::config::P2PMessage;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::module::audit::Audit;
19use fedimint_core::module::registry::ModuleDecoderRegistry;
20use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
21use fedimint_core::net::peers::DynP2PConnections;
22use fedimint_core::runtime::spawn;
23use fedimint_core::session_outcome::{
24 AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome,
25};
26use fedimint_core::task::{TaskGroup, TaskHandle, sleep};
27use fedimint_core::timing::TimeReporter;
28use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
29use fedimint_core::{NumPeers, NumPeersExt, PeerId, timing};
30use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
31use futures::StreamExt;
32use rand::Rng;
33use tokio::sync::watch;
34use tracing::{Level, debug, info, instrument, trace, warn};
35
36use crate::LOG_CONSENSUS;
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{DataProvider, UnitData};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::to_node_index;
45use crate::consensus::db::{
46 AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47 SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
51use crate::metrics::{
52 CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
53 CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ITEMS_PROCESSED_TOTAL,
54 CONSENSUS_ORDERING_LATENCY_SECONDS, CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX,
55 CONSENSUS_SESSION_COUNT,
56};
57
58const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61pub struct ConsensusEngine {
63 pub modules: ServerModuleRegistry,
64 pub db: Database,
65 pub federation_api: DynGlobalApi,
66 pub cfg: ServerConfig,
67 pub submission_receiver: Receiver<ConsensusItem>,
68 pub shutdown_receiver: watch::Receiver<Option<u64>>,
69 pub connections: DynP2PConnections<P2PMessage>,
70 pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
71 pub ord_latency_sender: watch::Sender<Option<Duration>>,
72 pub task_group: TaskGroup,
73 pub data_dir: PathBuf,
74 pub db_checkpoint_retention: u64,
75}
76
77impl ConsensusEngine {
78 fn num_peers(&self) -> NumPeers {
79 self.cfg.consensus.broadcast_public_keys.to_num_peers()
80 }
81
82 fn identity(&self) -> PeerId {
83 self.cfg.local.identity
84 }
85
86 #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
87 pub async fn run(self) -> anyhow::Result<()> {
88 if self.num_peers().total() == 1 {
89 self.run_single_guardian(self.task_group.make_handle())
90 .await
91 } else {
92 self.run_consensus(self.task_group.make_handle()).await
93 }
94 }
95
96 pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
97 assert_eq!(self.num_peers(), NumPeers::from(1));
98
99 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
100
101 while !task_handle.is_shutting_down() {
102 let session_index = self.get_finished_session_count().await;
103
104 CONSENSUS_SESSION_COUNT.set(session_index as i64);
105
106 let mut item_index = self.pending_accepted_items().await.len() as u64;
107
108 let session_start_time = std::time::Instant::now();
109
110 while let Ok(item) = self.submission_receiver.recv().await {
111 if self
112 .process_consensus_item(session_index, item_index, item, self.identity())
113 .await
114 .is_ok()
115 {
116 item_index += 1;
117 }
118
119 if session_start_time.elapsed() > Duration::from_secs(60) {
121 break;
122 }
123 }
124
125 let session_outcome = SessionOutcome {
126 items: self.pending_accepted_items().await,
127 };
128
129 let header = session_outcome.header(session_index);
130 let signature = Keychain::new(&self.cfg).sign(&header);
131 let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
132
133 self.complete_session(
134 session_index,
135 SignedSessionOutcome {
136 session_outcome,
137 signatures,
138 },
139 )
140 .await;
141
142 self.checkpoint_database(session_index);
143
144 info!(target: LOG_CONSENSUS, "Session {session_index} completed");
145
146 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
147 break;
148 }
149 }
150
151 info!(target: LOG_CONSENSUS, "Consensus task shut down");
152
153 Ok(())
154 }
155
156 pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
157 assert!(self.num_peers().total() >= 4);
159
160 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
161
162 while !task_handle.is_shutting_down() {
163 let session_index = self.get_finished_session_count().await;
164
165 CONSENSUS_SESSION_COUNT.set(session_index as i64);
166
167 info!(target: LOG_CONSENSUS, session_index, "Starting consensus session");
168
169 self.run_session(self.connections.clone(), session_index)
170 .await?;
171
172 info!(target: LOG_CONSENSUS, session_index, "Completed consensus session");
173
174 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
175 info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
176
177 sleep(Duration::from_secs(60)).await;
178
179 break;
180 }
181 }
182
183 info!(target: LOG_CONSENSUS, "Consensus task shut down");
184
185 Ok(())
186 }
187
188 pub async fn run_session(
189 &self,
190 connections: DynP2PConnections<P2PMessage>,
191 session_index: u64,
192 ) -> anyhow::Result<()> {
193 const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
211 const BASE: f64 = 1.02;
212
213 let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
214 let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
215
216 let mut delay_config = aleph_bft::default_delay_config();
217
218 delay_config.unit_creation_delay = Arc::new(move |round_index| {
219 let delay = if round_index == 0 {
220 0.0
221 } else {
222 round_delay
223 * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
224 * rand::thread_rng().gen_range(0.5..=1.5)
225 };
226
227 Duration::from_millis(delay.round() as u64)
228 });
229
230 let config = aleph_bft::create_config(
231 self.num_peers().total().into(),
232 self.identity().to_usize().into(),
233 session_index,
234 self.cfg
235 .consensus
236 .broadcast_rounds_per_session
237 .checked_add(EXP_SLOWDOWN_ROUNDS)
238 .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
239 delay_config,
240 Duration::from_secs(10 * 365 * 24 * 60 * 60),
241 )
242 .expect("The exponential slowdown exceeds 10 years");
243
244 let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
247 let (signature_sender, signature_receiver) = watch::channel(None);
248 let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
249 let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
250
251 let aleph_handle = spawn(
252 "aleph run session",
253 aleph_bft::run_session(
254 config,
255 aleph_bft::LocalIO::new(
256 DataProvider::new(
257 self.submission_receiver.clone(),
258 signature_receiver,
259 timestamp_sender,
260 self.is_recovery().await,
261 ),
262 FinalizationHandler::new(unit_data_sender),
263 BackupWriter::new(self.db.clone()).await,
264 BackupReader::new(self.db.clone()),
265 ),
266 Network::new(connections),
267 Keychain::new(&self.cfg),
268 Spawner::new(self.task_group.make_subgroup()),
269 aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
270 ),
271 );
272
273 self.ord_latency_sender.send_replace(None);
274
275 let signed_session_outcome = self
276 .complete_signed_session_outcome(
277 session_index,
278 unit_data_receiver,
279 signature_sender,
280 timestamp_receiver,
281 )
282 .await?;
283
284 terminator_sender.send(()).ok();
287 aleph_handle.await.ok();
288
289 self.complete_session(session_index, signed_session_outcome)
293 .await;
294
295 self.checkpoint_database(session_index);
296
297 Ok(())
298 }
299
300 async fn is_recovery(&self) -> bool {
301 self.db
302 .begin_transaction_nc()
303 .await
304 .find_by_prefix(&AlephUnitsPrefix)
305 .await
306 .next()
307 .await
308 .is_some()
309 }
310
311 pub async fn complete_signed_session_outcome(
312 &self,
313 session_index: u64,
314 ordered_unit_receiver: Receiver<OrderedUnit>,
315 signature_sender: watch::Sender<Option<SchnorrSignature>>,
316 timestamp_receiver: Receiver<Instant>,
317 ) -> anyhow::Result<SignedSessionOutcome> {
318 let mut item_index = 0;
321
322 let mut request_signed_session_outcome = Box::pin(async {
323 self.request_signed_session_outcome(&self.federation_api, session_index)
324 .await
325 });
326
327 loop {
331 tokio::select! {
332 ordered_unit = ordered_unit_receiver.recv() => {
333 let ordered_unit = ordered_unit.with_context(|| format!("Alepbft task exited prematurely. session_idx: {session_index}, item_idx: {item_index}")) ;
334 let ordered_unit = match ordered_unit {
335 Ok(o) => o,
336 Err(err) => {
337 while self.task_group.is_shutting_down() {
344 info!(target: LOG_CONSENSUS, "Shutdown detected");
345 sleep(Duration::from_millis(100)).await;
346 }
347 return Err(err);
349 },
350 };
351
352 if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
353 break;
354 }
355
356 if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
357 if ordered_unit.creator == self.identity() {
358 match timestamp_receiver.try_recv() {
359 Ok(timestamp) => {
360 let latency = match *self.ord_latency_sender.borrow() {
361 Some(latency) => (9 * latency + timestamp.elapsed()) / 10,
362 None => timestamp.elapsed()
363 };
364
365 self.ord_latency_sender.send_replace(Some(latency));
366
367 CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
368 }
369 Err(err) => {
370 debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal in recovery");
371 }
372 }
373 }
374
375 match Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()) {
376 Ok(items) => {
377 for item in items {
378 if let Ok(()) = self.process_consensus_item(
379 session_index,
380 item_index,
381 item.clone(),
382 ordered_unit.creator
383 ).await {
384 item_index += 1;
385 }
386 }
387 }
388 Err(err) => {
389 warn!(
390 target: LOG_CONSENSUS,
391 %session_index,
392 peer = %ordered_unit.creator,
393 err = %err.fmt_compact(),
394 "Failed to decode consensus items from peer"
395 );
396 }
397 }
398 }
399 },
400 signed_session_outcome = &mut request_signed_session_outcome => {
401 let pending_accepted_items = self.pending_accepted_items().await;
402
403 let (processed, unprocessed) = signed_session_outcome
405 .session_outcome
406 .items
407 .split_at(pending_accepted_items.len());
408
409 assert!(
410 processed.iter().eq(pending_accepted_items.iter()),
411 "Consensus Failure: pending accepted items disagree with federation consensus"
412 );
413
414 for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
415 if let Err(err) = self.process_consensus_item(
416 session_index,
417 item_index as u64,
418 accepted_item.item.clone(),
419 accepted_item.peer
420 ).await {
421 panic!(
422 "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
423 processed.len(),
424 unprocessed.len(),
425 );
426 }
427 }
428
429 return Ok(signed_session_outcome);
430 }
431 }
432 }
433
434 let items = self.pending_accepted_items().await;
435
436 assert_eq!(item_index, items.len() as u64);
437
438 let session_outcome = SessionOutcome { items };
439
440 let header = session_outcome.header(session_index);
441
442 let keychain = Keychain::new(&self.cfg);
443
444 #[allow(clippy::disallowed_methods)]
447 signature_sender.send(Some(keychain.sign(&header)))?;
448
449 let mut signatures = BTreeMap::new();
450
451 let items_dump = tokio::sync::OnceCell::new();
452
453 while signatures.len() < self.num_peers().threshold() {
456 tokio::select! {
457 ordered_unit = ordered_unit_receiver.recv() => {
458 let ordered_unit = ordered_unit?;
459
460 if let Some(UnitData::Signature(signature)) = ordered_unit.data {
461 if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
462 signatures.insert(ordered_unit.creator, signature);
463 } else {
464 warn!(target: LOG_CONSENSUS, "Consensus Failure: invalid header signature from {}", ordered_unit.creator);
465
466 items_dump.get_or_init(|| async {
467 for (idx, item) in session_outcome.items.iter().enumerate() {
468 info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
469 }
470 }).await;
471 }
472 }
473 }
474 signed_session_outcome = &mut request_signed_session_outcome => {
475 assert_eq!(
476 header,
477 signed_session_outcome.session_outcome.header(session_index),
478 "Consensus Failure: header disagrees with federation consensus"
479 );
480
481 return Ok(signed_session_outcome);
482 }
483 }
484 }
485
486 Ok(SignedSessionOutcome {
487 session_outcome,
488 signatures,
489 })
490 }
491
492 fn decoders(&self) -> ModuleDecoderRegistry {
493 self.modules.decoder_registry()
494 }
495
496 pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
497 self.db
498 .begin_transaction_nc()
499 .await
500 .find_by_prefix(&AcceptedItemPrefix)
501 .await
502 .map(|entry| entry.1)
503 .collect()
504 .await
505 }
506
507 pub async fn complete_session(
508 &self,
509 session_index: u64,
510 signed_session_outcome: SignedSessionOutcome,
511 ) {
512 let mut dbtx = self.db.begin_transaction().await;
513
514 dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
515
516 dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
517
518 if dbtx
519 .insert_entry(
520 &SignedSessionOutcomeKey(session_index),
521 &signed_session_outcome,
522 )
523 .await
524 .is_some()
525 {
526 panic!("We tried to overwrite a signed session outcome");
527 }
528
529 dbtx.commit_tx_result()
530 .await
531 .expect("This is the only place where we write to this key");
532 }
533
534 fn db_checkpoints_dir(&self) -> PathBuf {
536 self.data_dir.join(DB_CHECKPOINTS_DIR)
537 }
538
539 fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
543 let checkpoint_dir = self.db_checkpoints_dir();
544
545 if checkpoint_dir.exists() {
546 debug!(
547 target: LOG_CONSENSUS,
548 ?current_session,
549 "Removing database checkpoints up to `current_session`"
550 );
551
552 for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
553 if let Ok(file_name) = checkpoint.file_name().into_string()
555 && let Ok(session) = file_name.parse::<u64>()
556 && current_session >= self.db_checkpoint_retention
557 && session < current_session - self.db_checkpoint_retention
558 {
559 fs::remove_dir_all(checkpoint.path())?;
560 }
561 }
562 } else {
563 fs::create_dir_all(&checkpoint_dir)?;
564 }
565
566 Ok(())
567 }
568
569 fn checkpoint_database(&self, session_index: u64) {
573 if self.db_checkpoint_retention == 0 {
576 return;
577 }
578
579 let checkpoint_dir = self.db_checkpoints_dir();
580 let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
581
582 {
583 let _timing = timing::TimeReporter::new("database-checkpoint").level(Level::TRACE);
584 match self.db.checkpoint(&session_checkpoint_dir) {
585 Ok(()) => {
586 debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
587 }
588 Err(e) => {
589 warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, ?e, "Could not create db checkpoint");
590 }
591 }
592 }
593
594 {
595 let _timing = timing::TimeReporter::new("remove-database-checkpoint").level(Level::TRACE);
597 if let Err(e) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
598 warn!(target: LOG_CONSENSUS, ?e, "Could not delete old checkpoints");
599 }
600 }
601 }
602
603 fn delete_old_database_checkpoint(
606 &self,
607 session_index: u64,
608 checkpoint_dir: &Path,
609 ) -> anyhow::Result<()> {
610 if self.db_checkpoint_retention > session_index {
611 return Ok(());
612 }
613
614 let delete_session_index = session_index - self.db_checkpoint_retention;
615 let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
616 if checkpoint_to_delete.exists() {
617 fs::remove_dir_all(checkpoint_to_delete)?;
618 }
619
620 Ok(())
621 }
622
623 #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
624 pub async fn process_consensus_item(
625 &self,
626 session_index: u64,
627 item_index: u64,
628 item: ConsensusItem,
629 peer: PeerId,
630 ) -> anyhow::Result<()> {
631 let _timing = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
632
633 let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
634 .with_label_values(&[&peer.to_usize().to_string()])
635 .start_timer();
636
637 trace!(
638 target: LOG_CONSENSUS,
639 %peer,
640 item = ?DebugConsensusItem(&item),
641 "Processing consensus item"
642 );
643
644 self.ci_status_senders
645 .get(&peer)
646 .expect("No ci status sender for peer")
647 .send_replace(Some(session_index));
648
649 CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
650 .with_label_values(&[
651 &self.cfg.local.identity.to_usize().to_string(),
652 &peer.to_usize().to_string(),
653 ])
654 .set(session_index as i64);
655
656 let mut dbtx = self.db.begin_transaction().await;
657
658 dbtx.ignore_uncommitted();
659
660 if let Some(existing_item) = dbtx
664 .get_value(&AcceptedItemKey(item_index.to_owned()))
665 .await
666 {
667 if existing_item.item == item && existing_item.peer == peer {
668 return Ok(());
669 }
670
671 bail!(
672 "Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}",
673 existing_item.peer
674 );
675 }
676
677 self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
678 .await
679 .inspect_err(|err| {
680 trace!(
682 target: LOG_CONSENSUS,
683 %peer,
684 item = ?DebugConsensusItem(&item),
685 err = %err.fmt_compact_anyhow(),
686 "Rejected consensus item"
687 );
688 })?;
689
690 dbtx.warn_uncommitted();
693
694 dbtx.insert_entry(
695 &AcceptedItemKey(item_index),
696 &AcceptedItem {
697 item: item.clone(),
698 peer,
699 },
700 )
701 .await;
702
703 debug!(
704 target: LOG_CONSENSUS,
705 %peer,
706 item = ?DebugConsensusItem(&item),
707 "Processed consensus item"
708 );
709 let mut audit = Audit::default();
710
711 for (module_instance_id, kind, module) in self.modules.iter_modules() {
712 let _module_audit_timing =
713 TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
714
715 let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
716 .with_label_values(&[
717 MODULE_INSTANCE_ID_GLOBAL.to_string().as_str(),
718 kind.as_str(),
719 ])
720 .start_timer();
721
722 module
723 .audit(
724 &mut dbtx
725 .to_ref_with_prefix_module_id(module_instance_id)
726 .0
727 .into_nc(),
728 &mut audit,
729 module_instance_id,
730 )
731 .await;
732
733 timing_prom.observe_duration();
734 }
735
736 assert!(
737 audit
738 .net_assets()
739 .expect("Overflow while checking balance sheet")
740 .milli_sat
741 >= 0,
742 "Balance sheet of the fed has gone negative, this should never happen! {audit}"
743 );
744
745 dbtx.commit_tx_result()
746 .await
747 .expect("Committing consensus epoch failed");
748
749 CONSENSUS_ITEMS_PROCESSED_TOTAL
750 .with_label_values(&[&peer.to_usize().to_string()])
751 .inc();
752
753 timing_prom.observe_duration();
754
755 Ok(())
756 }
757
758 async fn process_consensus_item_with_db_transaction(
759 &self,
760 dbtx: &mut DatabaseTransaction<'_>,
761 consensus_item: ConsensusItem,
762 peer_id: PeerId,
763 ) -> anyhow::Result<()> {
764 self.decoders().assert_reject_mode();
767
768 match consensus_item {
769 ConsensusItem::Module(module_item) => {
770 let instance_id = module_item.module_instance_id();
771
772 let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
773
774 self.modules
775 .get_expect(instance_id)
776 .process_consensus_item(module_dbtx, &module_item, peer_id)
777 .await
778 }
779 ConsensusItem::Transaction(transaction) => {
780 let txid = transaction.tx_hash();
781 if dbtx
782 .get_value(&AcceptedTransactionKey(txid))
783 .await
784 .is_some()
785 {
786 debug!(
787 target: LOG_CONSENSUS,
788 %txid,
789 "Transaction already accepted"
790 );
791 bail!("Transaction is already accepted");
792 }
793
794 let modules_ids = transaction
795 .outputs
796 .iter()
797 .map(DynOutput::module_instance_id)
798 .collect::<Vec<_>>();
799
800 process_transaction_with_dbtx(
801 self.modules.clone(),
802 dbtx,
803 &transaction,
804 self.cfg.consensus.version,
805 TxProcessingMode::Consensus,
806 )
807 .await
808 .map_err(|error| anyhow!(error.to_string()))?;
809
810 debug!(target: LOG_CONSENSUS, %txid, "Transaction accepted");
811 dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
812 .await;
813
814 Ok(())
815 }
816 ConsensusItem::Default { variant, .. } => {
817 warn!(
818 target: LOG_CONSENSUS,
819 "Minor consensus version mismatch: unexpected consensus item type: {variant}"
820 );
821
822 panic!("Unexpected consensus item type: {variant}")
823 }
824 }
825 }
826
827 async fn request_signed_session_outcome(
828 &self,
829 federation_api: &DynGlobalApi,
830 index: u64,
831 ) -> SignedSessionOutcome {
832 let decoders = self.decoders();
833 let keychain = Keychain::new(&self.cfg);
834 let threshold = self.num_peers().threshold();
835
836 let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
837 let signed_session_outcome = response
838 .try_into_inner(&decoders)
839 .map_err(|x| PeerError::ResponseDeserialization(x.into()))?;
840 let header = signed_session_outcome.session_outcome.header(index);
841 if signed_session_outcome.signatures.len() == threshold
842 && signed_session_outcome
843 .signatures
844 .iter()
845 .all(|(peer_id, sig)| keychain.verify(&header, sig, to_node_index(*peer_id)))
846 {
847 Ok(signed_session_outcome)
848 } else {
849 Err(PeerError::InvalidResponse(anyhow!("Invalid signatures")))
850 }
851 };
852
853 let mut backoff = fedimint_core::util::backoff_util::api_networking_backoff();
854 loop {
855 let result = federation_api
856 .request_with_strategy(
857 FilterMap::new(filter_map.clone()),
858 AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
859 ApiRequestErased::new(index),
860 )
861 .await;
862
863 match result {
864 Ok(signed_session_outcome) => return signed_session_outcome,
865 Err(error) => {
866 error.report_if_unusual("Requesting Session Outcome");
867 }
868 }
869
870 sleep(backoff.next().expect("infinite retries")).await;
871 }
872 }
873
874 async fn get_finished_session_count(&self) -> u64 {
877 get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
878 }
879}
880
881pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
882 dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
883 .await
884 .next()
885 .await
886 .map_or(0, |entry| (entry.0.0) + 1)
887}