1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use aleph_bft::Keychain as KeychainTrait;
8use anyhow::{anyhow, bail};
9use async_channel::Receiver;
10use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, PeerError};
11use fedimint_api_client::query::FilterMap;
12use fedimint_core::config::P2PMessage;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::module::audit::Audit;
19use fedimint_core::module::registry::ModuleDecoderRegistry;
20use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
21use fedimint_core::net::peers::DynP2PConnections;
22use fedimint_core::runtime::spawn;
23use fedimint_core::session_outcome::{
24 AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome,
25};
26use fedimint_core::task::{TaskGroup, TaskHandle, sleep};
27use fedimint_core::timing::TimeReporter;
28use fedimint_core::util::FmtCompact as _;
29use fedimint_core::{NumPeers, NumPeersExt, PeerId, timing};
30use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
31use futures::StreamExt;
32use rand::Rng;
33use tokio::sync::watch;
34use tracing::{Level, debug, info, instrument, trace, warn};
35
36use crate::LOG_CONSENSUS;
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{DataProvider, UnitData, get_citem_bytes_chsum};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::to_node_index;
45use crate::consensus::db::{
46 AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47 SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
51use crate::metrics::{
52 CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
53 CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ITEMS_PROCESSED_TOTAL,
54 CONSENSUS_ORDERING_LATENCY_SECONDS, CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX,
55 CONSENSUS_SESSION_COUNT,
56};
57
58const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61pub struct ConsensusEngine {
63 pub modules: ServerModuleRegistry,
64 pub db: Database,
65 pub federation_api: DynGlobalApi,
66 pub cfg: ServerConfig,
67 pub submission_receiver: Receiver<ConsensusItem>,
68 pub shutdown_receiver: watch::Receiver<Option<u64>>,
69 pub connections: DynP2PConnections<P2PMessage>,
70 pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
71 pub self_id_str: String,
73 pub peer_id_str: Vec<String>,
75 pub task_group: TaskGroup,
76 pub data_dir: PathBuf,
77 pub checkpoint_retention: u64,
78}
79
80impl ConsensusEngine {
81 fn num_peers(&self) -> NumPeers {
82 self.cfg.consensus.broadcast_public_keys.to_num_peers()
83 }
84
85 fn identity(&self) -> PeerId {
86 self.cfg.local.identity
87 }
88
89 #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
90 pub async fn run(self) -> anyhow::Result<()> {
91 if self.num_peers().total() == 1 {
92 self.run_single_guardian(self.task_group.make_handle())
93 .await
94 } else {
95 self.run_consensus(self.task_group.make_handle()).await
96 }
97 }
98
99 pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
100 assert_eq!(self.num_peers(), NumPeers::from(1));
101
102 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
103
104 while !task_handle.is_shutting_down() {
105 let session_index = self.get_finished_session_count().await;
106
107 CONSENSUS_SESSION_COUNT.set(session_index as i64);
108
109 let mut item_index = self.pending_accepted_items().await.len() as u64;
110
111 let session_start_time = std::time::Instant::now();
112
113 while let Ok(item) = self.submission_receiver.recv().await {
114 if self
115 .process_consensus_item(session_index, item_index, item, self.identity())
116 .await
117 .is_ok()
118 {
119 item_index += 1;
120 }
121
122 if session_start_time.elapsed() > Duration::from_secs(60) {
124 break;
125 }
126 }
127
128 let session_outcome = SessionOutcome {
129 items: self.pending_accepted_items().await,
130 };
131
132 let header = session_outcome.header(session_index);
133 let signature = Keychain::new(&self.cfg).sign(&header);
134 let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
135
136 self.complete_session(
137 session_index,
138 SignedSessionOutcome {
139 session_outcome,
140 signatures,
141 },
142 )
143 .await;
144
145 self.checkpoint_database(session_index);
146
147 info!(target: LOG_CONSENSUS, "Session {session_index} completed");
148
149 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
150 break;
151 }
152 }
153
154 info!(target: LOG_CONSENSUS, "Consensus task shut down");
155
156 Ok(())
157 }
158
159 pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
160 assert!(self.num_peers().total() >= 4);
162
163 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
164
165 while !task_handle.is_shutting_down() {
166 let session_index = self.get_finished_session_count().await;
167
168 CONSENSUS_SESSION_COUNT.set(session_index as i64);
169
170 info!(target: LOG_CONSENSUS, session_index, "Starting consensus session");
171
172 self.run_session(self.connections.clone(), session_index)
173 .await?;
174
175 info!(target: LOG_CONSENSUS, session_index, "Completed consensus session");
176
177 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
178 info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
179
180 sleep(Duration::from_secs(60)).await;
181
182 break;
183 }
184 }
185
186 info!(target: LOG_CONSENSUS, "Consensus task shut down");
187
188 Ok(())
189 }
190
191 pub async fn run_session(
192 &self,
193 connections: DynP2PConnections<P2PMessage>,
194 session_index: u64,
195 ) -> anyhow::Result<()> {
196 const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
214 const BASE: f64 = 1.02;
215
216 let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
217 let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
218
219 let mut delay_config = aleph_bft::default_delay_config();
220
221 delay_config.unit_creation_delay = Arc::new(move |round_index| {
222 let delay = if round_index == 0 {
223 0.0
224 } else {
225 round_delay
226 * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
227 * rand::thread_rng().gen_range(0.5..=1.5)
228 };
229
230 Duration::from_millis(delay.round() as u64)
231 });
232
233 let config = aleph_bft::create_config(
234 self.num_peers().total().into(),
235 self.identity().to_usize().into(),
236 session_index,
237 self.cfg
238 .consensus
239 .broadcast_rounds_per_session
240 .checked_add(EXP_SLOWDOWN_ROUNDS)
241 .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
242 delay_config,
243 Duration::from_secs(10 * 365 * 24 * 60 * 60),
244 )
245 .expect("The exponential slowdown exceeds 10 years");
246
247 let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
250 let (signature_sender, signature_receiver) = watch::channel(None);
251 let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
252 let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
253
254 let aleph_handle = spawn(
255 "aleph run session",
256 aleph_bft::run_session(
257 config,
258 aleph_bft::LocalIO::new(
259 DataProvider::new(
260 self.submission_receiver.clone(),
261 signature_receiver,
262 timestamp_sender,
263 ),
264 FinalizationHandler::new(unit_data_sender),
265 BackupWriter::new(self.db.clone()).await,
266 BackupReader::new(self.db.clone()),
267 ),
268 Network::new(connections),
269 Keychain::new(&self.cfg),
270 Spawner::new(self.task_group.make_subgroup()),
271 aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
272 ),
273 );
274
275 let signed_session_outcome = self
276 .complete_signed_session_outcome(
277 session_index,
278 unit_data_receiver,
279 signature_sender,
280 timestamp_receiver,
281 )
282 .await?;
283
284 terminator_sender.send(()).ok();
287 aleph_handle.await.ok();
288
289 self.complete_session(session_index, signed_session_outcome)
293 .await;
294
295 self.checkpoint_database(session_index);
296
297 Ok(())
298 }
299
300 pub async fn complete_signed_session_outcome(
301 &self,
302 session_index: u64,
303 ordered_unit_receiver: Receiver<OrderedUnit>,
304 signature_sender: watch::Sender<Option<SchnorrSignature>>,
305 timestamp_receiver: Receiver<(Instant, u64)>,
306 ) -> anyhow::Result<SignedSessionOutcome> {
307 let mut item_index = 0;
310
311 let mut request_signed_session_outcome = Box::pin(async {
312 self.request_signed_session_outcome(&self.federation_api, session_index)
313 .await
314 });
315
316 loop {
320 tokio::select! {
321 ordered_unit = ordered_unit_receiver.recv() => {
322 let ordered_unit = ordered_unit?;
323
324 if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
325 break;
326 }
327
328 if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
329 if ordered_unit.creator == self.identity() {
330 loop {
331 match timestamp_receiver.try_recv() {
332 Ok((timestamp, chsum)) => {
333 if get_citem_bytes_chsum(&bytes) == chsum {
334 CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
335 break;
336 }
337 warn!(target: LOG_CONSENSUS, "Not reporting ordering latency on possibly out of sync item");
338 }
339 Err(err) => {
340 debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal on start");
341 break;
342 }
343 }
344 }
345 }
346
347 if let Ok(items) = Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()){
348 for item in items {
349 if self.process_consensus_item(
350 session_index,
351 item_index,
352 item.clone(),
353 ordered_unit.creator
354 ).await
355 .is_ok() {
356 item_index += 1;
357 }
358 }
359 }
360 }
361 },
362 signed_session_outcome = &mut request_signed_session_outcome => {
363 let pending_accepted_items = self.pending_accepted_items().await;
364
365 let (processed, unprocessed) = signed_session_outcome
367 .session_outcome
368 .items
369 .split_at(pending_accepted_items.len());
370
371 assert!(
372 processed.iter().eq(pending_accepted_items.iter()),
373 "Consensus Failure: pending accepted items disagree with federation consensus"
374 );
375
376 for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
377 if let Err(err) = self.process_consensus_item(
378 session_index,
379 item_index as u64,
380 accepted_item.item.clone(),
381 accepted_item.peer
382 ).await {
383 panic!(
384 "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
385 processed.len(),
386 unprocessed.len(),
387 );
388 }
389 }
390
391 return Ok(signed_session_outcome);
392 }
393 }
394 }
395
396 let items = self.pending_accepted_items().await;
397
398 assert_eq!(item_index, items.len() as u64);
399
400 let session_outcome = SessionOutcome { items };
401
402 let header = session_outcome.header(session_index);
403
404 let keychain = Keychain::new(&self.cfg);
405
406 signature_sender.send(Some(keychain.sign(&header)))?;
409
410 let mut signatures = BTreeMap::new();
411
412 let items_dump = tokio::sync::OnceCell::new();
413
414 while signatures.len() < self.num_peers().threshold() {
417 tokio::select! {
418 ordered_unit = ordered_unit_receiver.recv() => {
419 let ordered_unit = ordered_unit?;
420
421 if let Some(UnitData::Signature(signature)) = ordered_unit.data {
422 if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
423 signatures.insert(ordered_unit.creator, signature);
424 } else {
425 warn!(target: LOG_CONSENSUS, "Consensus Failure: invalid header signature from {}", ordered_unit.creator);
426
427 items_dump.get_or_init(|| async {
428 for (idx, item) in session_outcome.items.iter().enumerate() {
429 info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
430 }
431 }).await;
432 }
433 }
434 }
435 signed_session_outcome = &mut request_signed_session_outcome => {
436 assert_eq!(
437 header,
438 signed_session_outcome.session_outcome.header(session_index),
439 "Consensus Failure: header disagrees with federation consensus"
440 );
441
442 return Ok(signed_session_outcome);
443 }
444 }
445 }
446
447 Ok(SignedSessionOutcome {
448 session_outcome,
449 signatures,
450 })
451 }
452
453 fn decoders(&self) -> ModuleDecoderRegistry {
454 self.modules.decoder_registry()
455 }
456
457 pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
458 self.db
459 .begin_transaction_nc()
460 .await
461 .find_by_prefix(&AcceptedItemPrefix)
462 .await
463 .map(|entry| entry.1)
464 .collect()
465 .await
466 }
467
468 pub async fn complete_session(
469 &self,
470 session_index: u64,
471 signed_session_outcome: SignedSessionOutcome,
472 ) {
473 let mut dbtx = self.db.begin_transaction().await;
474
475 dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
476
477 dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
478
479 if dbtx
480 .insert_entry(
481 &SignedSessionOutcomeKey(session_index),
482 &signed_session_outcome,
483 )
484 .await
485 .is_some()
486 {
487 panic!("We tried to overwrite a signed session outcome");
488 }
489
490 dbtx.commit_tx_result()
491 .await
492 .expect("This is the only place where we write to this key");
493 }
494
495 fn db_checkpoints_dir(&self) -> PathBuf {
497 self.data_dir.join(DB_CHECKPOINTS_DIR)
498 }
499
500 fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
504 let checkpoint_dir = self.db_checkpoints_dir();
505
506 if checkpoint_dir.exists() {
507 debug!(
508 target: LOG_CONSENSUS,
509 ?current_session,
510 "Removing database checkpoints up to `current_session`"
511 );
512
513 for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
514 if let Ok(file_name) = checkpoint.file_name().into_string() {
516 if let Ok(session) = file_name.parse::<u64>() {
517 if current_session >= self.checkpoint_retention
518 && session < current_session - self.checkpoint_retention
519 {
520 fs::remove_dir_all(checkpoint.path())?;
521 }
522 }
523 }
524 }
525 } else {
526 fs::create_dir_all(&checkpoint_dir)?;
527 }
528
529 Ok(())
530 }
531
532 fn checkpoint_database(&self, session_index: u64) {
536 if self.checkpoint_retention == 0 {
539 return;
540 }
541
542 let checkpoint_dir = self.db_checkpoints_dir();
543 let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
544
545 {
546 let _timing = timing::TimeReporter::new("database-checkpoint").level(Level::TRACE);
547 match self.db.checkpoint(&session_checkpoint_dir) {
548 Ok(()) => {
549 debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
550 }
551 Err(e) => {
552 warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, ?e, "Could not create db checkpoint");
553 }
554 }
555 }
556
557 {
558 let _timing = timing::TimeReporter::new("remove-database-checkpoint").level(Level::TRACE);
560 if let Err(e) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
561 warn!(target: LOG_CONSENSUS, ?e, "Could not delete old checkpoints");
562 }
563 }
564 }
565
566 fn delete_old_database_checkpoint(
569 &self,
570 session_index: u64,
571 checkpoint_dir: &Path,
572 ) -> anyhow::Result<()> {
573 if self.checkpoint_retention > session_index {
574 return Ok(());
575 }
576
577 let delete_session_index = session_index - self.checkpoint_retention;
578 let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
579 if checkpoint_to_delete.exists() {
580 fs::remove_dir_all(checkpoint_to_delete)?;
581 }
582
583 Ok(())
584 }
585
586 #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
587 pub async fn process_consensus_item(
588 &self,
589 session_index: u64,
590 item_index: u64,
591 item: ConsensusItem,
592 peer: PeerId,
593 ) -> anyhow::Result<()> {
594 let peer_id_str = &self.peer_id_str[peer.to_usize()];
595 let _timing = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
596 let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
597 .with_label_values(&[peer_id_str])
598 .start_timer();
599
600 trace!(
601 target: LOG_CONSENSUS,
602 %peer,
603 item = ?DebugConsensusItem(&item),
604 "Processing consensus item"
605 );
606
607 self.ci_status_senders
608 .get(&peer)
609 .expect("No ci status sender for peer {peer}")
610 .send(Some(session_index))
611 .inspect_err(|e| warn!(target: LOG_CONSENSUS, "Failed to update ci status {e}"))
612 .ok();
613
614 CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
615 .with_label_values(&[&self.self_id_str, peer_id_str])
616 .set(session_index as i64);
617
618 let mut dbtx = self.db.begin_transaction().await;
619
620 dbtx.ignore_uncommitted();
621
622 if let Some(existing_item) = dbtx
626 .get_value(&AcceptedItemKey(item_index.to_owned()))
627 .await
628 {
629 if existing_item.item == item && existing_item.peer == peer {
630 return Ok(());
631 }
632
633 bail!(
634 "Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}",
635 existing_item.peer
636 );
637 }
638
639 self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
640 .await?;
641
642 dbtx.warn_uncommitted();
645
646 dbtx.insert_entry(
647 &AcceptedItemKey(item_index),
648 &AcceptedItem {
649 item: item.clone(),
650 peer,
651 },
652 )
653 .await;
654
655 debug!(
656 target: LOG_CONSENSUS,
657 %peer,
658 item = ?DebugConsensusItem(&item),
659 "Processed consensus item"
660 );
661 let mut audit = Audit::default();
662
663 for (module_instance_id, kind, module) in self.modules.iter_modules() {
664 let _module_audit_timing =
665 TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
666
667 let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
668 .with_label_values(&[&MODULE_INSTANCE_ID_GLOBAL.to_string(), kind.as_str()])
669 .start_timer();
670
671 module
672 .audit(
673 &mut dbtx
674 .to_ref_with_prefix_module_id(module_instance_id)
675 .0
676 .into_nc(),
677 &mut audit,
678 module_instance_id,
679 )
680 .await;
681 timing_prom.observe_duration();
682 }
683
684 assert!(
685 audit
686 .net_assets()
687 .expect("Overflow while checking balance sheet")
688 .milli_sat
689 >= 0,
690 "Balance sheet of the fed has gone negative, this should never happen! {audit}"
691 );
692
693 dbtx.commit_tx_result()
694 .await
695 .expect("Committing consensus epoch failed");
696
697 CONSENSUS_ITEMS_PROCESSED_TOTAL
698 .with_label_values(&[&self.peer_id_str[peer.to_usize()]])
699 .inc();
700 timing_prom.observe_duration();
701
702 Ok(())
703 }
704
705 async fn process_consensus_item_with_db_transaction(
706 &self,
707 dbtx: &mut DatabaseTransaction<'_>,
708 consensus_item: ConsensusItem,
709 peer_id: PeerId,
710 ) -> anyhow::Result<()> {
711 self.decoders().assert_reject_mode();
714
715 match consensus_item {
716 ConsensusItem::Module(module_item) => {
717 let instance_id = module_item.module_instance_id();
718
719 let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
720
721 self.modules
722 .get_expect(instance_id)
723 .process_consensus_item(module_dbtx, &module_item, peer_id)
724 .await
725 }
726 ConsensusItem::Transaction(transaction) => {
727 let txid = transaction.tx_hash();
728 if dbtx
729 .get_value(&AcceptedTransactionKey(txid))
730 .await
731 .is_some()
732 {
733 debug!(
734 target: LOG_CONSENSUS,
735 %txid,
736 "Transaction already accepted"
737 );
738 bail!("Transaction is already accepted");
739 }
740
741 let modules_ids = transaction
742 .outputs
743 .iter()
744 .map(DynOutput::module_instance_id)
745 .collect::<Vec<_>>();
746
747 process_transaction_with_dbtx(
748 self.modules.clone(),
749 dbtx,
750 &transaction,
751 self.cfg.consensus.version,
752 TxProcessingMode::Consensus,
753 )
754 .await
755 .map_err(|error| anyhow!(error.to_string()))?;
756
757 debug!(target: LOG_CONSENSUS, %txid, "Transaction accepted");
758 dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
759 .await;
760
761 Ok(())
762 }
763 ConsensusItem::Default { variant, .. } => {
764 warn!(
765 target: LOG_CONSENSUS,
766 "Minor consensus version mismatch: unexpected consensus item type: {variant}"
767 );
768
769 panic!("Unexpected consensus item type: {variant}")
770 }
771 }
772 }
773
774 async fn request_signed_session_outcome(
775 &self,
776 federation_api: &DynGlobalApi,
777 index: u64,
778 ) -> SignedSessionOutcome {
779 let decoders = self.decoders();
780 let keychain = Keychain::new(&self.cfg);
781 let threshold = self.num_peers().threshold();
782
783 let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
784 let signed_session_outcome = response
785 .try_into_inner(&decoders)
786 .map_err(|x| PeerError::ResponseDeserialization(x.into()))?;
787 let header = signed_session_outcome.session_outcome.header(index);
788 if signed_session_outcome.signatures.len() == threshold
789 && signed_session_outcome
790 .signatures
791 .iter()
792 .all(|(peer_id, sig)| keychain.verify(&header, sig, to_node_index(*peer_id)))
793 {
794 Ok(signed_session_outcome)
795 } else {
796 Err(PeerError::InvalidResponse(anyhow!("Invalid signatures")))
797 }
798 };
799
800 loop {
801 let result = federation_api
802 .request_with_strategy(
803 FilterMap::new(filter_map.clone()),
804 AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
805 ApiRequestErased::new(index),
806 )
807 .await;
808
809 match result {
810 Ok(signed_session_outcome) => return signed_session_outcome,
811 Err(error) => {
812 error.report_if_unusual("Requesting Session Outcome");
813 }
814 }
815 }
816 }
817
818 async fn get_finished_session_count(&self) -> u64 {
821 get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
822 }
823}
824
825pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
826 dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
827 .await
828 .next()
829 .await
830 .map_or(0, |entry| (entry.0.0) + 1)
831}