fedimint_server/consensus/
api.rs

1//! Implements the client API through which users interact with the federation
2use std::cmp::Ordering;
3use std::collections::{BTreeMap, HashMap};
4use std::path::{Path, PathBuf};
5use std::time::Duration;
6
7use anyhow::{Context, Result};
8use async_trait::async_trait;
9use bitcoin::hashes::sha256;
10use fedimint_aead::{encrypt, get_encryption_key, random_salt};
11use fedimint_api_client::api::{
12    GuardianConfigBackup, LegacyFederationStatus, LegacyP2PConnectionStatus, LegacyPeerStatus,
13    StatusResponse,
14};
15use fedimint_core::admin_client::{ServerStatusLegacy, SetupStatus};
16use fedimint_core::backup::{
17    BackupStatistics, ClientBackupKey, ClientBackupKeyPrefix, ClientBackupSnapshot,
18};
19use fedimint_core::config::{ClientConfig, JsonClientConfig, META_FEDERATION_NAME_KEY};
20use fedimint_core::core::backup::{BACKUP_REQUEST_MAX_PAYLOAD_SIZE_BYTES, SignedBackupRequest};
21use fedimint_core::core::{DynOutputOutcome, ModuleInstanceId, ModuleKind};
22use fedimint_core::db::{
23    Committable, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
24};
25#[allow(deprecated)]
26use fedimint_core::endpoint_constants::AWAIT_OUTPUT_OUTCOME_ENDPOINT;
27use fedimint_core::endpoint_constants::{
28    API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT,
29    AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
30    BACKUP_STATISTICS_ENDPOINT, CLIENT_CONFIG_ENDPOINT, CLIENT_CONFIG_JSON_ENDPOINT,
31    CONSENSUS_ORD_LATENCY_ENDPOINT, FEDERATION_ID_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT,
32    GUARDIAN_CONFIG_BACKUP_ENDPOINT, INVITE_CODE_ENDPOINT, P2P_CONNECTION_STATUS_ENDPOINT,
33    RECOVER_ENDPOINT, SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT, SESSION_COUNT_ENDPOINT,
34    SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
35    SIGN_API_ANNOUNCEMENT_ENDPOINT, STATUS_ENDPOINT, SUBMIT_API_ANNOUNCEMENT_ENDPOINT,
36    SUBMIT_TRANSACTION_ENDPOINT, VERSION_ENDPOINT,
37};
38use fedimint_core::epoch::ConsensusItem;
39use fedimint_core::module::audit::{Audit, AuditSummary};
40use fedimint_core::module::{
41    ApiAuth, ApiEndpoint, ApiEndpointContext, ApiError, ApiRequestErased, ApiResult, ApiVersion,
42    SerdeModuleEncoding, SerdeModuleEncodingBase64, SupportedApiVersionsSummary, api_endpoint,
43};
44use fedimint_core::net::api_announcement::{
45    ApiAnnouncement, SignedApiAnnouncement, SignedApiAnnouncementSubmission,
46};
47use fedimint_core::secp256k1::{PublicKey, SECP256K1};
48use fedimint_core::session_outcome::{
49    SessionOutcome, SessionStatus, SessionStatusV2, SignedSessionOutcome,
50};
51use fedimint_core::transaction::{
52    SerdeTransaction, Transaction, TransactionError, TransactionSubmissionOutcome,
53};
54use fedimint_core::util::{FmtCompact, SafeUrl};
55use fedimint_core::{OutPoint, PeerId, TransactionId, secp256k1};
56use fedimint_logging::LOG_NET_API;
57use fedimint_server_core::dashboard_ui::IDashboardApi;
58use fedimint_server_core::net::{GuardianAuthToken, check_auth};
59use fedimint_server_core::{DynServerModule, ServerModuleRegistry, ServerModuleRegistryExt};
60use futures::StreamExt;
61use tokio::sync::watch::{self, Receiver, Sender};
62use tracing::{debug, info, warn};
63
64use crate::config::io::{
65    CONSENSUS_CONFIG, ENCRYPTED_EXT, JSON_EXT, LOCAL_CONFIG, PRIVATE_CONFIG, SALT_FILE,
66};
67use crate::config::{ServerConfig, legacy_consensus_config_hash};
68use crate::consensus::db::{AcceptedItemPrefix, AcceptedTransactionKey, SignedSessionOutcomeKey};
69use crate::consensus::engine::get_finished_session_count_static;
70use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
71use crate::metrics::{BACKUP_WRITE_SIZE_BYTES, STORED_BACKUPS_COUNT};
72use crate::net::api::HasApiContext;
73use crate::net::api::announcement::{ApiAnnouncementKey, ApiAnnouncementPrefix};
74use crate::net::p2p::P2PStatusReceivers;
75
76#[derive(Clone)]
77pub struct ConsensusApi {
78    /// Our server configuration
79    pub cfg: ServerConfig,
80    /// Database for serving the API
81    pub db: Database,
82    /// Modules registered with the federation
83    pub modules: ServerModuleRegistry,
84    /// Cached client config
85    pub client_cfg: ClientConfig,
86    pub force_api_secret: Option<String>,
87    /// For sending API events to consensus such as transactions
88    pub submission_sender: async_channel::Sender<ConsensusItem>,
89    pub shutdown_receiver: Receiver<Option<u64>>,
90    pub shutdown_sender: Sender<Option<u64>>,
91    pub ord_latency_receiver: watch::Receiver<Option<Duration>>,
92    pub p2p_status_receivers: P2PStatusReceivers,
93    pub ci_status_receivers: BTreeMap<PeerId, Receiver<Option<u64>>>,
94    pub supported_api_versions: SupportedApiVersionsSummary,
95    pub code_version_str: String,
96}
97
98impl ConsensusApi {
99    pub fn api_versions_summary(&self) -> &SupportedApiVersionsSummary {
100        &self.supported_api_versions
101    }
102
103    pub fn get_active_api_secret(&self) -> Option<String> {
104        // TODO: In the future, we might want to fetch it from the DB, so it's possible
105        // to customize from the UX
106        self.force_api_secret.clone()
107    }
108
109    // we want to return an error if and only if the submitted transaction is
110    // invalid and will be rejected if we were to submit it to consensus
111    pub async fn submit_transaction(
112        &self,
113        transaction: Transaction,
114    ) -> Result<TransactionId, TransactionError> {
115        let txid = transaction.tx_hash();
116
117        debug!(target: LOG_NET_API, %txid, "Received a submitted transaction");
118
119        // Create read-only DB tx so that the read state is consistent
120        let mut dbtx = self.db.begin_transaction_nc().await;
121        // we already processed the transaction before
122        if dbtx
123            .get_value(&AcceptedTransactionKey(txid))
124            .await
125            .is_some()
126        {
127            debug!(target: LOG_NET_API, %txid, "Transaction already accepted");
128            return Ok(txid);
129        }
130
131        // We ignore any writes, as we only verify if the transaction is valid here
132        dbtx.ignore_uncommitted();
133
134        process_transaction_with_dbtx(
135            self.modules.clone(),
136            &mut dbtx,
137            &transaction,
138            self.cfg.consensus.version,
139            TxProcessingMode::Submission,
140        )
141        .await
142        .inspect_err(|err| {
143            debug!(target: LOG_NET_API, %txid, err = %err.fmt_compact(), "Transaction rejected");
144        })?;
145
146        let _ = self
147            .submission_sender
148            .send(ConsensusItem::Transaction(transaction.clone()))
149            .await
150            .inspect_err(|err| {
151                warn!(target: LOG_NET_API, %txid, err = %err.fmt_compact(), "Unable to submit the tx into consensus");
152            });
153
154        Ok(txid)
155    }
156
157    pub async fn await_transaction(
158        &self,
159        txid: TransactionId,
160    ) -> (Vec<ModuleInstanceId>, DatabaseTransaction<'_, Committable>) {
161        self.db
162            .wait_key_check(&AcceptedTransactionKey(txid), std::convert::identity)
163            .await
164    }
165
166    pub async fn await_output_outcome(
167        &self,
168        outpoint: OutPoint,
169    ) -> Result<SerdeModuleEncoding<DynOutputOutcome>> {
170        let (module_ids, mut dbtx) = self.await_transaction(outpoint.txid).await;
171
172        let module_id = module_ids
173            .into_iter()
174            .nth(outpoint.out_idx as usize)
175            .context("Outpoint index out of bounds {outpoint:?}")?;
176
177        #[allow(deprecated)]
178        let outcome = self
179            .modules
180            .get_expect(module_id)
181            .output_status(
182                &mut dbtx.to_ref_with_prefix_module_id(module_id).0.into_nc(),
183                outpoint,
184                module_id,
185            )
186            .await
187            .context("No output outcome for outpoint")?;
188
189        Ok((&outcome).into())
190    }
191
192    pub async fn session_count(&self) -> u64 {
193        get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
194    }
195
196    pub async fn await_signed_session_outcome(&self, index: u64) -> SignedSessionOutcome {
197        self.db
198            .wait_key_check(&SignedSessionOutcomeKey(index), std::convert::identity)
199            .await
200            .0
201    }
202
203    pub async fn session_status(&self, session_index: u64) -> SessionStatusV2 {
204        let mut dbtx = self.db.begin_transaction_nc().await;
205
206        match session_index.cmp(&get_finished_session_count_static(&mut dbtx).await) {
207            Ordering::Greater => SessionStatusV2::Initial,
208            Ordering::Equal => SessionStatusV2::Pending(
209                dbtx.find_by_prefix(&AcceptedItemPrefix)
210                    .await
211                    .map(|entry| entry.1)
212                    .collect()
213                    .await,
214            ),
215            Ordering::Less => SessionStatusV2::Complete(
216                dbtx.get_value(&SignedSessionOutcomeKey(session_index))
217                    .await
218                    .expect("There are no gaps in session outcomes"),
219            ),
220        }
221    }
222
223    pub async fn get_federation_status(&self) -> ApiResult<LegacyFederationStatus> {
224        let session_count = self.session_count().await;
225        let scheduled_shutdown = self.shutdown_receiver.borrow().to_owned();
226
227        let status_by_peer = self
228            .p2p_status_receivers
229            .iter()
230            .map(|(peer, p2p_receiver)| {
231                let ci_receiver = self.ci_status_receivers.get(peer).unwrap();
232
233                let consensus_status = LegacyPeerStatus {
234                    connection_status: match *p2p_receiver.borrow() {
235                        Some(..) => LegacyP2PConnectionStatus::Connected,
236                        None => LegacyP2PConnectionStatus::Disconnected,
237                    },
238                    last_contribution: *ci_receiver.borrow(),
239                    flagged: ci_receiver.borrow().unwrap_or(0) + 1 < session_count,
240                };
241
242                (*peer, consensus_status)
243            })
244            .collect::<HashMap<PeerId, LegacyPeerStatus>>();
245
246        let peers_flagged = status_by_peer
247            .values()
248            .filter(|status| status.flagged)
249            .count() as u64;
250
251        let peers_online = status_by_peer
252            .values()
253            .filter(|status| status.connection_status == LegacyP2PConnectionStatus::Connected)
254            .count() as u64;
255
256        let peers_offline = status_by_peer
257            .values()
258            .filter(|status| status.connection_status == LegacyP2PConnectionStatus::Disconnected)
259            .count() as u64;
260
261        Ok(LegacyFederationStatus {
262            session_count,
263            status_by_peer,
264            peers_online,
265            peers_offline,
266            peers_flagged,
267            scheduled_shutdown,
268        })
269    }
270
271    fn shutdown(&self, index: Option<u64>) {
272        self.shutdown_sender.send_replace(index);
273    }
274
275    async fn get_federation_audit(&self) -> ApiResult<AuditSummary> {
276        let mut dbtx = self.db.begin_transaction_nc().await;
277        // Writes are related to compacting audit keys, which we can safely ignore
278        // within an API request since the compaction will happen when constructing an
279        // audit in the consensus server
280        dbtx.ignore_uncommitted();
281
282        let mut audit = Audit::default();
283        let mut module_instance_id_to_kind: HashMap<ModuleInstanceId, String> = HashMap::new();
284        for (module_instance_id, kind, module) in self.modules.iter_modules() {
285            module_instance_id_to_kind.insert(module_instance_id, kind.as_str().to_string());
286            module
287                .audit(
288                    &mut dbtx.to_ref_with_prefix_module_id(module_instance_id).0,
289                    &mut audit,
290                    module_instance_id,
291                )
292                .await;
293        }
294        Ok(AuditSummary::from_audit(
295            &audit,
296            &module_instance_id_to_kind,
297        ))
298    }
299
300    /// Uses the in-memory config to write a config backup tar archive that
301    /// guardians can download. Private keys are encrypted with the guardian
302    /// password, so it should be safe to store anywhere, this also means the
303    /// backup is useless without the password.
304    fn get_guardian_config_backup(
305        &self,
306        password: &str,
307        _auth: &GuardianAuthToken,
308    ) -> GuardianConfigBackup {
309        let mut tar_archive_builder = tar::Builder::new(Vec::new());
310
311        let mut append = |name: &Path, data: &[u8]| {
312            let mut header = tar::Header::new_gnu();
313            header.set_path(name).expect("Error setting path");
314            header.set_size(data.len() as u64);
315            header.set_mode(0o644);
316            header.set_cksum();
317            tar_archive_builder
318                .append(&header, data)
319                .expect("Error adding data to tar archive");
320        };
321
322        append(
323            &PathBuf::from(LOCAL_CONFIG).with_extension(JSON_EXT),
324            &serde_json::to_vec(&self.cfg.local).expect("Error encoding local config"),
325        );
326
327        append(
328            &PathBuf::from(CONSENSUS_CONFIG).with_extension(JSON_EXT),
329            &serde_json::to_vec(&self.cfg.consensus).expect("Error encoding consensus config"),
330        );
331
332        // Note that the encrypted config returned here uses a different salt than the
333        // on-disk version. While this may be confusing it shouldn't be a problem since
334        // the content and encryption key are the same. It's unpractical to read the
335        // on-disk version here since the server/api aren't aware of the config dir and
336        // ideally we can keep it that way.
337        let encryption_salt = random_salt();
338        append(&PathBuf::from(SALT_FILE), encryption_salt.as_bytes());
339
340        let private_config_bytes =
341            serde_json::to_vec(&self.cfg.private).expect("Error encoding private config");
342        let encryption_key = get_encryption_key(password, &encryption_salt)
343            .expect("Generating key from password failed");
344        let private_config_encrypted =
345            hex::encode(encrypt(private_config_bytes, &encryption_key).expect("Encryption failed"));
346        append(
347            &PathBuf::from(PRIVATE_CONFIG).with_extension(ENCRYPTED_EXT),
348            private_config_encrypted.as_bytes(),
349        );
350
351        let tar_archive_bytes = tar_archive_builder
352            .into_inner()
353            .expect("Error building tar archive");
354
355        GuardianConfigBackup { tar_archive_bytes }
356    }
357
358    async fn handle_backup_request<'s, 'dbtx, 'a>(
359        &'s self,
360        dbtx: &'dbtx mut DatabaseTransaction<'a>,
361        request: SignedBackupRequest,
362    ) -> Result<(), ApiError> {
363        let request = request
364            .verify_valid(SECP256K1)
365            .map_err(|_| ApiError::bad_request("invalid request".into()))?;
366
367        if request.payload.len() > BACKUP_REQUEST_MAX_PAYLOAD_SIZE_BYTES {
368            return Err(ApiError::bad_request("snapshot too large".into()));
369        }
370        debug!(target: LOG_NET_API, id = %request.id, len = request.payload.len(), "Received client backup request");
371        if let Some(prev) = dbtx.get_value(&ClientBackupKey(request.id)).await {
372            if request.timestamp <= prev.timestamp {
373                debug!(target: LOG_NET_API, id = %request.id, len = request.payload.len(), "Received client backup request with old timestamp - ignoring");
374                return Err(ApiError::bad_request("timestamp too small".into()));
375            }
376        }
377
378        info!(target: LOG_NET_API, id = %request.id, len = request.payload.len(), "Storing new client backup");
379        let overwritten = dbtx
380            .insert_entry(
381                &ClientBackupKey(request.id),
382                &ClientBackupSnapshot {
383                    timestamp: request.timestamp,
384                    data: request.payload.clone(),
385                },
386            )
387            .await
388            .is_some();
389        BACKUP_WRITE_SIZE_BYTES.observe(request.payload.len() as f64);
390        if !overwritten {
391            dbtx.on_commit(|| STORED_BACKUPS_COUNT.inc());
392        }
393
394        Ok(())
395    }
396
397    async fn handle_recover_request(
398        &self,
399        dbtx: &mut DatabaseTransaction<'_>,
400        id: PublicKey,
401    ) -> Option<ClientBackupSnapshot> {
402        dbtx.get_value(&ClientBackupKey(id)).await
403    }
404
405    /// List API URL announcements from all peers we have received them from (at
406    /// least ourselves)
407    async fn api_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
408        self.db
409            .begin_transaction_nc()
410            .await
411            .find_by_prefix(&ApiAnnouncementPrefix)
412            .await
413            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
414            .collect()
415            .await
416    }
417
418    /// Returns the tagged fedimintd version currently running
419    fn fedimintd_version(&self) -> String {
420        self.code_version_str.clone()
421    }
422
423    /// Add an API URL announcement from a peer to our database to be returned
424    /// by [`ConsensusApi::api_announcements`].
425    async fn submit_api_announcement(
426        &self,
427        peer_id: PeerId,
428        announcement: SignedApiAnnouncement,
429    ) -> Result<(), ApiError> {
430        let Some(peer_key) = self.cfg.consensus.broadcast_public_keys.get(&peer_id) else {
431            return Err(ApiError::bad_request("Peer not in federation".into()));
432        };
433
434        if !announcement.verify(SECP256K1, peer_key) {
435            return Err(ApiError::bad_request("Invalid signature".into()));
436        }
437
438        let mut dbtx = self.db.begin_transaction().await;
439
440        if let Some(existing_announcement) = dbtx.get_value(&ApiAnnouncementKey(peer_id)).await {
441            // If the current announcement is semantically identical to the new one (except
442            // for potentially having a different, valid signature) we return ok to allow
443            // the caller to stop submitting the value if they are in a retry loop.
444            if existing_announcement.api_announcement == announcement.api_announcement {
445                return Ok(());
446            }
447
448            // We only accept announcements with a nonce higher than the current one to
449            // avoid replay attacks.
450            if existing_announcement.api_announcement.nonce >= announcement.api_announcement.nonce {
451                return Err(ApiError::bad_request(
452                    "Outdated or redundant announcement".into(),
453                ));
454            }
455        }
456
457        dbtx.insert_entry(&ApiAnnouncementKey(peer_id), &announcement)
458            .await;
459        dbtx.commit_tx().await;
460        Ok(())
461    }
462
463    async fn sign_api_announcement(&self, new_url: SafeUrl) -> SignedApiAnnouncement {
464        self.db
465            .autocommit(
466                |dbtx, _| {
467                    let new_url_inner = new_url.clone();
468                    Box::pin(async move {
469                        let new_nonce = dbtx
470                            .get_value(&ApiAnnouncementKey(self.cfg.local.identity))
471                            .await
472                            .map_or(0, |a| a.api_announcement.nonce + 1);
473                        let announcement = ApiAnnouncement {
474                            api_url: new_url_inner,
475                            nonce: new_nonce,
476                        };
477                        let ctx = secp256k1::Secp256k1::new();
478                        let signed_announcement = announcement
479                            .sign(&ctx, &self.cfg.private.broadcast_secret_key.keypair(&ctx));
480
481                        dbtx.insert_entry(
482                            &ApiAnnouncementKey(self.cfg.local.identity),
483                            &signed_announcement,
484                        )
485                        .await;
486
487                        Result::<_, ()>::Ok(signed_announcement)
488                    })
489                },
490                None,
491            )
492            .await
493            .expect("Will not terminate on error")
494    }
495}
496
497#[async_trait]
498impl HasApiContext<ConsensusApi> for ConsensusApi {
499    async fn context(
500        &self,
501        request: &ApiRequestErased,
502        id: Option<ModuleInstanceId>,
503    ) -> (&ConsensusApi, ApiEndpointContext<'_>) {
504        let mut db = self.db.clone();
505        let mut dbtx = self.db.begin_transaction().await;
506        if let Some(id) = id {
507            db = self.db.with_prefix_module_id(id).0;
508            dbtx = dbtx.with_prefix_module_id(id).0;
509        }
510        (
511            self,
512            ApiEndpointContext::new(
513                db,
514                dbtx,
515                request.auth == Some(self.cfg.private.api_auth.clone()),
516                request.auth.clone(),
517            ),
518        )
519    }
520}
521
522#[async_trait]
523impl HasApiContext<DynServerModule> for ConsensusApi {
524    async fn context(
525        &self,
526        request: &ApiRequestErased,
527        id: Option<ModuleInstanceId>,
528    ) -> (&DynServerModule, ApiEndpointContext<'_>) {
529        let (_, context): (&ConsensusApi, _) = self.context(request, id).await;
530        (
531            self.modules.get_expect(id.expect("required module id")),
532            context,
533        )
534    }
535}
536
537#[async_trait]
538impl IDashboardApi for ConsensusApi {
539    async fn auth(&self) -> ApiAuth {
540        self.cfg.private.api_auth.clone()
541    }
542
543    async fn guardian_id(&self) -> PeerId {
544        self.cfg.local.identity
545    }
546
547    async fn guardian_names(&self) -> BTreeMap<PeerId, String> {
548        self.cfg
549            .consensus
550            .api_endpoints()
551            .iter()
552            .map(|(peer_id, endpoint)| (*peer_id, endpoint.name.clone()))
553            .collect()
554    }
555
556    async fn federation_name(&self) -> String {
557        self.cfg
558            .consensus
559            .meta
560            .get(META_FEDERATION_NAME_KEY)
561            .cloned()
562            .expect("Federation name must be set")
563    }
564
565    async fn session_count(&self) -> usize {
566        self.session_count().await as usize
567    }
568
569    async fn consensus_ord_latency(&self) -> Option<Duration> {
570        *self.ord_latency_receiver.borrow()
571    }
572
573    async fn p2p_connection_status(&self) -> BTreeMap<PeerId, Option<Duration>> {
574        self.p2p_status_receivers
575            .iter()
576            .map(|(peer, receiver)| (*peer, *receiver.borrow()))
577            .collect()
578    }
579
580    async fn federation_invite_code(&self) -> String {
581        self.cfg
582            .get_invite_code(self.get_active_api_secret())
583            .to_string()
584    }
585
586    async fn federation_audit(&self) -> AuditSummary {
587        self.get_federation_audit()
588            .await
589            .expect("Failed to get federation audit")
590    }
591
592    fn get_module_by_kind(&self, kind: ModuleKind) -> Option<&DynServerModule> {
593        self.modules
594            .iter_modules()
595            .find_map(|(_, module_kind, module)| {
596                if *module_kind == kind {
597                    Some(module)
598                } else {
599                    None
600                }
601            })
602    }
603}
604
605pub fn server_endpoints() -> Vec<ApiEndpoint<ConsensusApi>> {
606    vec![
607        api_endpoint! {
608            VERSION_ENDPOINT,
609            ApiVersion::new(0, 0),
610            async |fedimint: &ConsensusApi, _context, _v: ()| -> SupportedApiVersionsSummary {
611                Ok(fedimint.api_versions_summary().to_owned())
612            }
613        },
614        api_endpoint! {
615            SUBMIT_TRANSACTION_ENDPOINT,
616            ApiVersion::new(0, 0),
617            async |fedimint: &ConsensusApi, _context, transaction: SerdeTransaction| -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
618                let transaction = transaction
619                    .try_into_inner(&fedimint.modules.decoder_registry())
620                    .map_err(|e| ApiError::bad_request(e.to_string()))?;
621
622                // we return an inner error if and only if the submitted transaction is
623                // invalid and will be rejected if we were to submit it to consensus
624                Ok((&TransactionSubmissionOutcome(fedimint.submit_transaction(transaction).await)).into())
625            }
626        },
627        api_endpoint! {
628            AWAIT_TRANSACTION_ENDPOINT,
629            ApiVersion::new(0, 0),
630            async |fedimint: &ConsensusApi, _context, tx_hash: TransactionId| -> TransactionId {
631                fedimint.await_transaction(tx_hash).await;
632
633                Ok(tx_hash)
634            }
635        },
636        api_endpoint! {
637            AWAIT_OUTPUT_OUTCOME_ENDPOINT,
638            ApiVersion::new(0, 0),
639            async |fedimint: &ConsensusApi, _context, outpoint: OutPoint| -> SerdeModuleEncoding<DynOutputOutcome> {
640                let outcome = fedimint
641                    .await_output_outcome(outpoint)
642                    .await
643                    .map_err(|e| ApiError::bad_request(e.to_string()))?;
644
645                Ok(outcome)
646            }
647        },
648        api_endpoint! {
649            INVITE_CODE_ENDPOINT,
650            ApiVersion::new(0, 0),
651            async |fedimint: &ConsensusApi, _context,  _v: ()| -> String {
652                Ok(fedimint.cfg.get_invite_code(fedimint.get_active_api_secret()).to_string())
653            }
654        },
655        api_endpoint! {
656            FEDERATION_ID_ENDPOINT,
657            ApiVersion::new(0, 2),
658            async |fedimint: &ConsensusApi, _context,  _v: ()| -> String {
659                Ok(fedimint.cfg.calculate_federation_id().to_string())
660            }
661        },
662        api_endpoint! {
663            CLIENT_CONFIG_ENDPOINT,
664            ApiVersion::new(0, 0),
665            async |fedimint: &ConsensusApi, _context, _v: ()| -> ClientConfig {
666                Ok(fedimint.client_cfg.clone())
667            }
668        },
669        // Helper endpoint for Admin UI that can't parse consensus encoding
670        api_endpoint! {
671            CLIENT_CONFIG_JSON_ENDPOINT,
672            ApiVersion::new(0, 0),
673            async |fedimint: &ConsensusApi, _context, _v: ()| -> JsonClientConfig {
674                Ok(fedimint.client_cfg.to_json())
675            }
676        },
677        api_endpoint! {
678            SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT,
679            ApiVersion::new(0, 0),
680            async |fedimint: &ConsensusApi, _context, _v: ()| -> sha256::Hash {
681                Ok(legacy_consensus_config_hash(&fedimint.cfg.consensus))
682            }
683        },
684        api_endpoint! {
685            STATUS_ENDPOINT,
686            ApiVersion::new(0, 0),
687            async |fedimint: &ConsensusApi, _context, _v: ()| -> StatusResponse {
688                Ok(StatusResponse {
689                    server: ServerStatusLegacy::ConsensusRunning,
690                    federation: Some(fedimint.get_federation_status().await?)
691                })}
692        },
693        api_endpoint! {
694            SETUP_STATUS_ENDPOINT,
695            ApiVersion::new(0, 0),
696            async |_f: &ConsensusApi, _c, _v: ()| -> SetupStatus {
697                Ok(SetupStatus::ConsensusIsRunning)
698            }
699        },
700        api_endpoint! {
701            CONSENSUS_ORD_LATENCY_ENDPOINT,
702            ApiVersion::new(0, 0),
703            async |fedimint: &ConsensusApi, _c, _v: ()| -> Option<Duration> {
704                Ok(*fedimint.ord_latency_receiver.borrow())
705            }
706        },
707        api_endpoint! {
708            P2P_CONNECTION_STATUS_ENDPOINT,
709            ApiVersion::new(0, 0),
710            async |fedimint: &ConsensusApi, _c, _v: ()| -> BTreeMap<PeerId, Option<Duration>> {
711                Ok(fedimint.p2p_status_receivers
712                    .iter()
713                    .map(|(peer, receiver)| (*peer, *receiver.borrow()))
714                    .collect())
715            }
716        },
717        api_endpoint! {
718            SESSION_COUNT_ENDPOINT,
719            ApiVersion::new(0, 0),
720            async |fedimint: &ConsensusApi, _context, _v: ()| -> u64 {
721                Ok(fedimint.session_count().await)
722            }
723        },
724        api_endpoint! {
725            AWAIT_SESSION_OUTCOME_ENDPOINT,
726            ApiVersion::new(0, 0),
727            async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SessionOutcome> {
728                Ok((&fedimint.await_signed_session_outcome(index).await.session_outcome).into())
729            }
730        },
731        api_endpoint! {
732            AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT,
733            ApiVersion::new(0, 0),
734            async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SignedSessionOutcome> {
735                Ok((&fedimint.await_signed_session_outcome(index).await).into())
736            }
737        },
738        api_endpoint! {
739            SESSION_STATUS_ENDPOINT,
740            ApiVersion::new(0, 1),
741            async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SessionStatus> {
742                Ok((&SessionStatus::from(fedimint.session_status(index).await)).into())
743            }
744        },
745        api_endpoint! {
746            SESSION_STATUS_V2_ENDPOINT,
747            ApiVersion::new(0, 5),
748            async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncodingBase64<SessionStatusV2> {
749                Ok((&fedimint.session_status(index).await).into())
750            }
751        },
752        api_endpoint! {
753            SHUTDOWN_ENDPOINT,
754            ApiVersion::new(0, 3),
755            async |fedimint: &ConsensusApi, context, index: Option<u64>| -> () {
756                check_auth(context)?;
757                fedimint.shutdown(index);
758                Ok(())
759            }
760        },
761        api_endpoint! {
762            AUDIT_ENDPOINT,
763            ApiVersion::new(0, 0),
764            async |fedimint: &ConsensusApi, context, _v: ()| -> AuditSummary {
765                check_auth(context)?;
766                Ok(fedimint.get_federation_audit().await?)
767            }
768        },
769        api_endpoint! {
770            GUARDIAN_CONFIG_BACKUP_ENDPOINT,
771            ApiVersion::new(0, 2),
772            async |fedimint: &ConsensusApi, context, _v: ()| -> GuardianConfigBackup {
773                let auth = check_auth(context)?;
774                let password = context.request_auth().expect("Auth was checked before").0;
775                Ok(fedimint.get_guardian_config_backup(&password, &auth))
776            }
777        },
778        api_endpoint! {
779            BACKUP_ENDPOINT,
780            ApiVersion::new(0, 0),
781            async |fedimint: &ConsensusApi, context, request: SignedBackupRequest| -> () {
782                fedimint
783                    .handle_backup_request(&mut context.dbtx().into_nc(), request).await?;
784                Ok(())
785
786            }
787        },
788        api_endpoint! {
789            RECOVER_ENDPOINT,
790            ApiVersion::new(0, 0),
791            async |fedimint: &ConsensusApi, context, id: PublicKey| -> Option<ClientBackupSnapshot> {
792                Ok(fedimint
793                    .handle_recover_request(&mut context.dbtx().into_nc(), id).await)
794            }
795        },
796        api_endpoint! {
797            AUTH_ENDPOINT,
798            ApiVersion::new(0, 0),
799            async |_fedimint: &ConsensusApi, context, _v: ()| -> () {
800                check_auth(context)?;
801                Ok(())
802            }
803        },
804        api_endpoint! {
805            API_ANNOUNCEMENTS_ENDPOINT,
806            ApiVersion::new(0, 3),
807            async |fedimint: &ConsensusApi, _context, _v: ()| -> BTreeMap<PeerId, SignedApiAnnouncement> {
808                Ok(fedimint.api_announcements().await)
809            }
810        },
811        api_endpoint! {
812            SUBMIT_API_ANNOUNCEMENT_ENDPOINT,
813            ApiVersion::new(0, 3),
814            async |fedimint: &ConsensusApi, _context, submission: SignedApiAnnouncementSubmission| -> () {
815                fedimint.submit_api_announcement(submission.peer_id, submission.signed_api_announcement).await
816            }
817        },
818        api_endpoint! {
819            SIGN_API_ANNOUNCEMENT_ENDPOINT,
820            ApiVersion::new(0, 3),
821            async |fedimint: &ConsensusApi, context, new_url: SafeUrl| -> SignedApiAnnouncement {
822                check_auth(context)?;
823                Ok(fedimint.sign_api_announcement(new_url).await)
824            }
825        },
826        api_endpoint! {
827            FEDIMINTD_VERSION_ENDPOINT,
828            ApiVersion::new(0, 4),
829            async |fedimint: &ConsensusApi, _context, _v: ()| -> String {
830                Ok(fedimint.fedimintd_version())
831            }
832        },
833        api_endpoint! {
834            BACKUP_STATISTICS_ENDPOINT,
835            ApiVersion::new(0, 5),
836            async |_fedimint: &ConsensusApi, context, _v: ()| -> BackupStatistics {
837                check_auth(context)?;
838                Ok(backup_statistics_static(&mut context.dbtx().into_nc()).await)
839            }
840        },
841    ]
842}
843
844pub(crate) async fn backup_statistics_static(
845    dbtx: &mut DatabaseTransaction<'_>,
846) -> BackupStatistics {
847    const DAY_SECS: u64 = 24 * 60 * 60;
848    const WEEK_SECS: u64 = 7 * DAY_SECS;
849    const MONTH_SECS: u64 = 30 * DAY_SECS;
850    const QUARTER_SECS: u64 = 3 * MONTH_SECS;
851
852    let mut backup_stats = BackupStatistics::default();
853
854    let mut all_backups_stream = dbtx.find_by_prefix(&ClientBackupKeyPrefix).await;
855    while let Some((_, backup)) = all_backups_stream.next().await {
856        backup_stats.num_backups += 1;
857        backup_stats.total_size += backup.data.len();
858
859        let age_secs = backup.timestamp.elapsed().unwrap_or_default().as_secs();
860        if age_secs < DAY_SECS {
861            backup_stats.refreshed_1d += 1;
862        }
863        if age_secs < WEEK_SECS {
864            backup_stats.refreshed_1w += 1;
865        }
866        if age_secs < MONTH_SECS {
867            backup_stats.refreshed_1m += 1;
868        }
869        if age_secs < QUARTER_SECS {
870            backup_stats.refreshed_3m += 1;
871        }
872    }
873
874    backup_stats
875}