1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use aleph_bft::Keychain as KeychainTrait;
8use anyhow::{anyhow, bail};
9use async_channel::Receiver;
10use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, PeerError};
11use fedimint_api_client::query::FilterMap;
12use fedimint_core::config::P2PMessage;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::module::audit::Audit;
19use fedimint_core::module::registry::ModuleDecoderRegistry;
20use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
21use fedimint_core::net::peers::DynP2PConnections;
22use fedimint_core::runtime::spawn;
23use fedimint_core::session_outcome::{
24 AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome,
25};
26use fedimint_core::task::{TaskGroup, TaskHandle, sleep};
27use fedimint_core::timing::TimeReporter;
28use fedimint_core::util::FmtCompact as _;
29use fedimint_core::{NumPeers, NumPeersExt, PeerId, timing};
30use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
31use futures::StreamExt;
32use rand::Rng;
33use tokio::sync::watch;
34use tracing::{Level, debug, info, instrument, trace, warn};
35
36use crate::LOG_CONSENSUS;
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{DataProvider, UnitData};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::to_node_index;
45use crate::consensus::db::{
46 AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47 SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
51use crate::metrics::{
52 CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
53 CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ITEMS_PROCESSED_TOTAL,
54 CONSENSUS_ORDERING_LATENCY_SECONDS, CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX,
55 CONSENSUS_SESSION_COUNT,
56};
57
58const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61pub struct ConsensusEngine {
63 pub modules: ServerModuleRegistry,
64 pub db: Database,
65 pub federation_api: DynGlobalApi,
66 pub cfg: ServerConfig,
67 pub submission_receiver: Receiver<ConsensusItem>,
68 pub shutdown_receiver: watch::Receiver<Option<u64>>,
69 pub connections: DynP2PConnections<P2PMessage>,
70 pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
71 pub ord_latency_sender: watch::Sender<Option<Duration>>,
72 pub task_group: TaskGroup,
73 pub data_dir: PathBuf,
74 pub checkpoint_retention: u64,
75}
76
77impl ConsensusEngine {
78 fn num_peers(&self) -> NumPeers {
79 self.cfg.consensus.broadcast_public_keys.to_num_peers()
80 }
81
82 fn identity(&self) -> PeerId {
83 self.cfg.local.identity
84 }
85
86 #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
87 pub async fn run(self) -> anyhow::Result<()> {
88 if self.num_peers().total() == 1 {
89 self.run_single_guardian(self.task_group.make_handle())
90 .await
91 } else {
92 self.run_consensus(self.task_group.make_handle()).await
93 }
94 }
95
96 pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
97 assert_eq!(self.num_peers(), NumPeers::from(1));
98
99 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
100
101 while !task_handle.is_shutting_down() {
102 let session_index = self.get_finished_session_count().await;
103
104 CONSENSUS_SESSION_COUNT.set(session_index as i64);
105
106 let mut item_index = self.pending_accepted_items().await.len() as u64;
107
108 let session_start_time = std::time::Instant::now();
109
110 while let Ok(item) = self.submission_receiver.recv().await {
111 if self
112 .process_consensus_item(session_index, item_index, item, self.identity())
113 .await
114 .is_ok()
115 {
116 item_index += 1;
117 }
118
119 if session_start_time.elapsed() > Duration::from_secs(60) {
121 break;
122 }
123 }
124
125 let session_outcome = SessionOutcome {
126 items: self.pending_accepted_items().await,
127 };
128
129 let header = session_outcome.header(session_index);
130 let signature = Keychain::new(&self.cfg).sign(&header);
131 let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
132
133 self.complete_session(
134 session_index,
135 SignedSessionOutcome {
136 session_outcome,
137 signatures,
138 },
139 )
140 .await;
141
142 self.checkpoint_database(session_index);
143
144 info!(target: LOG_CONSENSUS, "Session {session_index} completed");
145
146 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
147 break;
148 }
149 }
150
151 info!(target: LOG_CONSENSUS, "Consensus task shut down");
152
153 Ok(())
154 }
155
156 pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
157 assert!(self.num_peers().total() >= 4);
159
160 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
161
162 while !task_handle.is_shutting_down() {
163 let session_index = self.get_finished_session_count().await;
164
165 CONSENSUS_SESSION_COUNT.set(session_index as i64);
166
167 info!(target: LOG_CONSENSUS, session_index, "Starting consensus session");
168
169 self.run_session(self.connections.clone(), session_index)
170 .await?;
171
172 info!(target: LOG_CONSENSUS, session_index, "Completed consensus session");
173
174 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
175 info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
176
177 sleep(Duration::from_secs(60)).await;
178
179 break;
180 }
181 }
182
183 info!(target: LOG_CONSENSUS, "Consensus task shut down");
184
185 Ok(())
186 }
187
188 pub async fn run_session(
189 &self,
190 connections: DynP2PConnections<P2PMessage>,
191 session_index: u64,
192 ) -> anyhow::Result<()> {
193 const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
211 const BASE: f64 = 1.02;
212
213 let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
214 let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
215
216 let mut delay_config = aleph_bft::default_delay_config();
217
218 delay_config.unit_creation_delay = Arc::new(move |round_index| {
219 let delay = if round_index == 0 {
220 0.0
221 } else {
222 round_delay
223 * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
224 * rand::thread_rng().gen_range(0.5..=1.5)
225 };
226
227 Duration::from_millis(delay.round() as u64)
228 });
229
230 let config = aleph_bft::create_config(
231 self.num_peers().total().into(),
232 self.identity().to_usize().into(),
233 session_index,
234 self.cfg
235 .consensus
236 .broadcast_rounds_per_session
237 .checked_add(EXP_SLOWDOWN_ROUNDS)
238 .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
239 delay_config,
240 Duration::from_secs(10 * 365 * 24 * 60 * 60),
241 )
242 .expect("The exponential slowdown exceeds 10 years");
243
244 let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
247 let (signature_sender, signature_receiver) = watch::channel(None);
248 let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
249 let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
250
251 let aleph_handle = spawn(
252 "aleph run session",
253 aleph_bft::run_session(
254 config,
255 aleph_bft::LocalIO::new(
256 DataProvider::new(
257 self.submission_receiver.clone(),
258 signature_receiver,
259 timestamp_sender,
260 self.is_recovery().await,
261 ),
262 FinalizationHandler::new(unit_data_sender),
263 BackupWriter::new(self.db.clone()).await,
264 BackupReader::new(self.db.clone()),
265 ),
266 Network::new(connections),
267 Keychain::new(&self.cfg),
268 Spawner::new(self.task_group.make_subgroup()),
269 aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
270 ),
271 );
272
273 self.ord_latency_sender.send_replace(None);
274
275 let signed_session_outcome = self
276 .complete_signed_session_outcome(
277 session_index,
278 unit_data_receiver,
279 signature_sender,
280 timestamp_receiver,
281 )
282 .await?;
283
284 terminator_sender.send(()).ok();
287 aleph_handle.await.ok();
288
289 self.complete_session(session_index, signed_session_outcome)
293 .await;
294
295 self.checkpoint_database(session_index);
296
297 Ok(())
298 }
299
300 async fn is_recovery(&self) -> bool {
301 self.db
302 .begin_transaction_nc()
303 .await
304 .find_by_prefix(&AlephUnitsPrefix)
305 .await
306 .next()
307 .await
308 .is_some()
309 }
310
311 pub async fn complete_signed_session_outcome(
312 &self,
313 session_index: u64,
314 ordered_unit_receiver: Receiver<OrderedUnit>,
315 signature_sender: watch::Sender<Option<SchnorrSignature>>,
316 timestamp_receiver: Receiver<Instant>,
317 ) -> anyhow::Result<SignedSessionOutcome> {
318 let mut item_index = 0;
321
322 let mut request_signed_session_outcome = Box::pin(async {
323 self.request_signed_session_outcome(&self.federation_api, session_index)
324 .await
325 });
326
327 loop {
331 tokio::select! {
332 ordered_unit = ordered_unit_receiver.recv() => {
333 let ordered_unit = ordered_unit?;
334
335 if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
336 break;
337 }
338
339 if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
340 if ordered_unit.creator == self.identity() {
341 match timestamp_receiver.try_recv() {
342 Ok(timestamp) => {
343 let latency = match *self.ord_latency_sender.borrow() {
344 Some(latency) => (9 * latency + timestamp.elapsed()) / 10,
345 None => timestamp.elapsed()
346 };
347
348 self.ord_latency_sender.send_replace(Some(latency));
349
350 CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
351 }
352 Err(err) => {
353 debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal in recovery");
354 }
355 }
356 }
357
358 if let Ok(items) = Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()){
359 for item in items {
360 if self.process_consensus_item(
361 session_index,
362 item_index,
363 item.clone(),
364 ordered_unit.creator
365 ).await
366 .is_ok() {
367 item_index += 1;
368 }
369 }
370 }
371 }
372 },
373 signed_session_outcome = &mut request_signed_session_outcome => {
374 let pending_accepted_items = self.pending_accepted_items().await;
375
376 let (processed, unprocessed) = signed_session_outcome
378 .session_outcome
379 .items
380 .split_at(pending_accepted_items.len());
381
382 assert!(
383 processed.iter().eq(pending_accepted_items.iter()),
384 "Consensus Failure: pending accepted items disagree with federation consensus"
385 );
386
387 for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
388 if let Err(err) = self.process_consensus_item(
389 session_index,
390 item_index as u64,
391 accepted_item.item.clone(),
392 accepted_item.peer
393 ).await {
394 panic!(
395 "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
396 processed.len(),
397 unprocessed.len(),
398 );
399 }
400 }
401
402 return Ok(signed_session_outcome);
403 }
404 }
405 }
406
407 let items = self.pending_accepted_items().await;
408
409 assert_eq!(item_index, items.len() as u64);
410
411 let session_outcome = SessionOutcome { items };
412
413 let header = session_outcome.header(session_index);
414
415 let keychain = Keychain::new(&self.cfg);
416
417 #[allow(clippy::disallowed_methods)]
420 signature_sender.send(Some(keychain.sign(&header)))?;
421
422 let mut signatures = BTreeMap::new();
423
424 let items_dump = tokio::sync::OnceCell::new();
425
426 while signatures.len() < self.num_peers().threshold() {
429 tokio::select! {
430 ordered_unit = ordered_unit_receiver.recv() => {
431 let ordered_unit = ordered_unit?;
432
433 if let Some(UnitData::Signature(signature)) = ordered_unit.data {
434 if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
435 signatures.insert(ordered_unit.creator, signature);
436 } else {
437 warn!(target: LOG_CONSENSUS, "Consensus Failure: invalid header signature from {}", ordered_unit.creator);
438
439 items_dump.get_or_init(|| async {
440 for (idx, item) in session_outcome.items.iter().enumerate() {
441 info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
442 }
443 }).await;
444 }
445 }
446 }
447 signed_session_outcome = &mut request_signed_session_outcome => {
448 assert_eq!(
449 header,
450 signed_session_outcome.session_outcome.header(session_index),
451 "Consensus Failure: header disagrees with federation consensus"
452 );
453
454 return Ok(signed_session_outcome);
455 }
456 }
457 }
458
459 Ok(SignedSessionOutcome {
460 session_outcome,
461 signatures,
462 })
463 }
464
465 fn decoders(&self) -> ModuleDecoderRegistry {
466 self.modules.decoder_registry()
467 }
468
469 pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
470 self.db
471 .begin_transaction_nc()
472 .await
473 .find_by_prefix(&AcceptedItemPrefix)
474 .await
475 .map(|entry| entry.1)
476 .collect()
477 .await
478 }
479
480 pub async fn complete_session(
481 &self,
482 session_index: u64,
483 signed_session_outcome: SignedSessionOutcome,
484 ) {
485 let mut dbtx = self.db.begin_transaction().await;
486
487 dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
488
489 dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
490
491 if dbtx
492 .insert_entry(
493 &SignedSessionOutcomeKey(session_index),
494 &signed_session_outcome,
495 )
496 .await
497 .is_some()
498 {
499 panic!("We tried to overwrite a signed session outcome");
500 }
501
502 dbtx.commit_tx_result()
503 .await
504 .expect("This is the only place where we write to this key");
505 }
506
507 fn db_checkpoints_dir(&self) -> PathBuf {
509 self.data_dir.join(DB_CHECKPOINTS_DIR)
510 }
511
512 fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
516 let checkpoint_dir = self.db_checkpoints_dir();
517
518 if checkpoint_dir.exists() {
519 debug!(
520 target: LOG_CONSENSUS,
521 ?current_session,
522 "Removing database checkpoints up to `current_session`"
523 );
524
525 for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
526 if let Ok(file_name) = checkpoint.file_name().into_string() {
528 if let Ok(session) = file_name.parse::<u64>() {
529 if current_session >= self.checkpoint_retention
530 && session < current_session - self.checkpoint_retention
531 {
532 fs::remove_dir_all(checkpoint.path())?;
533 }
534 }
535 }
536 }
537 } else {
538 fs::create_dir_all(&checkpoint_dir)?;
539 }
540
541 Ok(())
542 }
543
544 fn checkpoint_database(&self, session_index: u64) {
548 if self.checkpoint_retention == 0 {
551 return;
552 }
553
554 let checkpoint_dir = self.db_checkpoints_dir();
555 let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
556
557 {
558 let _timing = timing::TimeReporter::new("database-checkpoint").level(Level::TRACE);
559 match self.db.checkpoint(&session_checkpoint_dir) {
560 Ok(()) => {
561 debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
562 }
563 Err(e) => {
564 warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, ?e, "Could not create db checkpoint");
565 }
566 }
567 }
568
569 {
570 let _timing = timing::TimeReporter::new("remove-database-checkpoint").level(Level::TRACE);
572 if let Err(e) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
573 warn!(target: LOG_CONSENSUS, ?e, "Could not delete old checkpoints");
574 }
575 }
576 }
577
578 fn delete_old_database_checkpoint(
581 &self,
582 session_index: u64,
583 checkpoint_dir: &Path,
584 ) -> anyhow::Result<()> {
585 if self.checkpoint_retention > session_index {
586 return Ok(());
587 }
588
589 let delete_session_index = session_index - self.checkpoint_retention;
590 let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
591 if checkpoint_to_delete.exists() {
592 fs::remove_dir_all(checkpoint_to_delete)?;
593 }
594
595 Ok(())
596 }
597
598 #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
599 pub async fn process_consensus_item(
600 &self,
601 session_index: u64,
602 item_index: u64,
603 item: ConsensusItem,
604 peer: PeerId,
605 ) -> anyhow::Result<()> {
606 let _timing = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
607
608 let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
609 .with_label_values(&[&peer.to_usize().to_string()])
610 .start_timer();
611
612 trace!(
613 target: LOG_CONSENSUS,
614 %peer,
615 item = ?DebugConsensusItem(&item),
616 "Processing consensus item"
617 );
618
619 self.ci_status_senders
620 .get(&peer)
621 .expect("No ci status sender for peer {peer}")
622 .send_replace(Some(session_index));
623
624 CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
625 .with_label_values(&[
626 &self.cfg.local.identity.to_usize().to_string(),
627 &peer.to_usize().to_string(),
628 ])
629 .set(session_index as i64);
630
631 let mut dbtx = self.db.begin_transaction().await;
632
633 dbtx.ignore_uncommitted();
634
635 if let Some(existing_item) = dbtx
639 .get_value(&AcceptedItemKey(item_index.to_owned()))
640 .await
641 {
642 if existing_item.item == item && existing_item.peer == peer {
643 return Ok(());
644 }
645
646 bail!(
647 "Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}",
648 existing_item.peer
649 );
650 }
651
652 self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
653 .await?;
654
655 dbtx.warn_uncommitted();
658
659 dbtx.insert_entry(
660 &AcceptedItemKey(item_index),
661 &AcceptedItem {
662 item: item.clone(),
663 peer,
664 },
665 )
666 .await;
667
668 debug!(
669 target: LOG_CONSENSUS,
670 %peer,
671 item = ?DebugConsensusItem(&item),
672 "Processed consensus item"
673 );
674 let mut audit = Audit::default();
675
676 for (module_instance_id, kind, module) in self.modules.iter_modules() {
677 let _module_audit_timing =
678 TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
679
680 let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
681 .with_label_values(&[&MODULE_INSTANCE_ID_GLOBAL.to_string(), kind.as_str()])
682 .start_timer();
683
684 module
685 .audit(
686 &mut dbtx
687 .to_ref_with_prefix_module_id(module_instance_id)
688 .0
689 .into_nc(),
690 &mut audit,
691 module_instance_id,
692 )
693 .await;
694
695 timing_prom.observe_duration();
696 }
697
698 assert!(
699 audit
700 .net_assets()
701 .expect("Overflow while checking balance sheet")
702 .milli_sat
703 >= 0,
704 "Balance sheet of the fed has gone negative, this should never happen! {audit}"
705 );
706
707 dbtx.commit_tx_result()
708 .await
709 .expect("Committing consensus epoch failed");
710
711 CONSENSUS_ITEMS_PROCESSED_TOTAL
712 .with_label_values(&[&peer.to_usize().to_string()])
713 .inc();
714
715 timing_prom.observe_duration();
716
717 Ok(())
718 }
719
720 async fn process_consensus_item_with_db_transaction(
721 &self,
722 dbtx: &mut DatabaseTransaction<'_>,
723 consensus_item: ConsensusItem,
724 peer_id: PeerId,
725 ) -> anyhow::Result<()> {
726 self.decoders().assert_reject_mode();
729
730 match consensus_item {
731 ConsensusItem::Module(module_item) => {
732 let instance_id = module_item.module_instance_id();
733
734 let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
735
736 self.modules
737 .get_expect(instance_id)
738 .process_consensus_item(module_dbtx, &module_item, peer_id)
739 .await
740 }
741 ConsensusItem::Transaction(transaction) => {
742 let txid = transaction.tx_hash();
743 if dbtx
744 .get_value(&AcceptedTransactionKey(txid))
745 .await
746 .is_some()
747 {
748 debug!(
749 target: LOG_CONSENSUS,
750 %txid,
751 "Transaction already accepted"
752 );
753 bail!("Transaction is already accepted");
754 }
755
756 let modules_ids = transaction
757 .outputs
758 .iter()
759 .map(DynOutput::module_instance_id)
760 .collect::<Vec<_>>();
761
762 process_transaction_with_dbtx(
763 self.modules.clone(),
764 dbtx,
765 &transaction,
766 self.cfg.consensus.version,
767 TxProcessingMode::Consensus,
768 )
769 .await
770 .map_err(|error| anyhow!(error.to_string()))?;
771
772 debug!(target: LOG_CONSENSUS, %txid, "Transaction accepted");
773 dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
774 .await;
775
776 Ok(())
777 }
778 ConsensusItem::Default { variant, .. } => {
779 warn!(
780 target: LOG_CONSENSUS,
781 "Minor consensus version mismatch: unexpected consensus item type: {variant}"
782 );
783
784 panic!("Unexpected consensus item type: {variant}")
785 }
786 }
787 }
788
789 async fn request_signed_session_outcome(
790 &self,
791 federation_api: &DynGlobalApi,
792 index: u64,
793 ) -> SignedSessionOutcome {
794 let decoders = self.decoders();
795 let keychain = Keychain::new(&self.cfg);
796 let threshold = self.num_peers().threshold();
797
798 let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
799 let signed_session_outcome = response
800 .try_into_inner(&decoders)
801 .map_err(|x| PeerError::ResponseDeserialization(x.into()))?;
802 let header = signed_session_outcome.session_outcome.header(index);
803 if signed_session_outcome.signatures.len() == threshold
804 && signed_session_outcome
805 .signatures
806 .iter()
807 .all(|(peer_id, sig)| keychain.verify(&header, sig, to_node_index(*peer_id)))
808 {
809 Ok(signed_session_outcome)
810 } else {
811 Err(PeerError::InvalidResponse(anyhow!("Invalid signatures")))
812 }
813 };
814
815 loop {
816 let result = federation_api
817 .request_with_strategy(
818 FilterMap::new(filter_map.clone()),
819 AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
820 ApiRequestErased::new(index),
821 )
822 .await;
823
824 match result {
825 Ok(signed_session_outcome) => return signed_session_outcome,
826 Err(error) => {
827 error.report_if_unusual("Requesting Session Outcome");
828 }
829 }
830 }
831 }
832
833 async fn get_finished_session_count(&self) -> u64 {
836 get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
837 }
838}
839
840pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
841 dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
842 .await
843 .next()
844 .await
845 .map_or(0, |entry| (entry.0.0) + 1)
846}