fedimint_server/consensus/
api.rs

1//! Implements the client API through which users interact with the federation
2use std::cmp::Ordering;
3use std::collections::{BTreeMap, HashMap};
4use std::path::{Path, PathBuf};
5use std::time::Duration;
6
7use anyhow::{Context, Result};
8use async_trait::async_trait;
9use bitcoin::hashes::sha256;
10use fedimint_aead::{encrypt, get_encryption_key, random_salt};
11use fedimint_api_client::api::{
12    LegacyFederationStatus, LegacyP2PConnectionStatus, LegacyPeerStatus, StatusResponse,
13};
14use fedimint_core::admin_client::{GuardianConfigBackup, ServerStatusLegacy, SetupStatus};
15use fedimint_core::backup::{
16    BackupStatistics, ClientBackupKey, ClientBackupKeyPrefix, ClientBackupSnapshot,
17};
18use fedimint_core::config::{ClientConfig, JsonClientConfig, META_FEDERATION_NAME_KEY};
19use fedimint_core::core::backup::{BACKUP_REQUEST_MAX_PAYLOAD_SIZE_BYTES, SignedBackupRequest};
20use fedimint_core::core::{DynOutputOutcome, ModuleInstanceId, ModuleKind};
21use fedimint_core::db::{
22    Committable, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
23};
24#[allow(deprecated)]
25use fedimint_core::endpoint_constants::AWAIT_OUTPUT_OUTCOME_ENDPOINT;
26use fedimint_core::endpoint_constants::{
27    API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT,
28    AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT,
29    BACKUP_STATISTICS_ENDPOINT, CHANGE_PASSWORD_ENDPOINT, CLIENT_CONFIG_ENDPOINT,
30    CLIENT_CONFIG_JSON_ENDPOINT, CONSENSUS_ORD_LATENCY_ENDPOINT, FEDERATION_ID_ENDPOINT,
31    FEDIMINTD_VERSION_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT, INVITE_CODE_ENDPOINT,
32    P2P_CONNECTION_STATUS_ENDPOINT, P2P_CONNECTION_TYPE_ENDPOINT, RECOVER_ENDPOINT,
33    SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT, SESSION_COUNT_ENDPOINT, SESSION_STATUS_ENDPOINT,
34    SESSION_STATUS_V2_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
35    SIGN_API_ANNOUNCEMENT_ENDPOINT, STATUS_ENDPOINT, SUBMIT_API_ANNOUNCEMENT_ENDPOINT,
36    SUBMIT_TRANSACTION_ENDPOINT, VERSION_ENDPOINT,
37};
38use fedimint_core::epoch::ConsensusItem;
39use fedimint_core::module::audit::{Audit, AuditSummary};
40use fedimint_core::module::{
41    ApiAuth, ApiEndpoint, ApiEndpointContext, ApiError, ApiRequestErased, ApiResult, ApiVersion,
42    SerdeModuleEncoding, SerdeModuleEncodingBase64, SupportedApiVersionsSummary, api_endpoint,
43};
44use fedimint_core::net::api_announcement::{
45    ApiAnnouncement, SignedApiAnnouncement, SignedApiAnnouncementSubmission,
46};
47use fedimint_core::secp256k1::{PublicKey, SECP256K1};
48use fedimint_core::session_outcome::{
49    SessionOutcome, SessionStatus, SessionStatusV2, SignedSessionOutcome,
50};
51use fedimint_core::task::TaskGroup;
52use fedimint_core::transaction::{
53    SerdeTransaction, Transaction, TransactionError, TransactionSubmissionOutcome,
54};
55use fedimint_core::util::{FmtCompact, SafeUrl};
56use fedimint_core::{OutPoint, PeerId, TransactionId, secp256k1};
57use fedimint_logging::LOG_NET_API;
58use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
59use fedimint_server_core::dashboard_ui::{ConnectionType, IDashboardApi, ServerBitcoinRpcStatus};
60use fedimint_server_core::net::{GuardianAuthToken, check_auth};
61use fedimint_server_core::{DynServerModule, ServerModuleRegistry, ServerModuleRegistryExt};
62use futures::StreamExt;
63use tokio::sync::watch::{self, Receiver, Sender};
64use tracing::{debug, info, warn};
65
66use crate::config::io::{
67    CONSENSUS_CONFIG, ENCRYPTED_EXT, JSON_EXT, LOCAL_CONFIG, PRIVATE_CONFIG, SALT_FILE,
68    reencrypt_private_config,
69};
70use crate::config::{ServerConfig, legacy_consensus_config_hash};
71use crate::consensus::db::{AcceptedItemPrefix, AcceptedTransactionKey, SignedSessionOutcomeKey};
72use crate::consensus::engine::get_finished_session_count_static;
73use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
74use crate::metrics::{BACKUP_WRITE_SIZE_BYTES, STORED_BACKUPS_COUNT};
75use crate::net::api::HasApiContext;
76use crate::net::api::announcement::{ApiAnnouncementKey, ApiAnnouncementPrefix};
77use crate::net::p2p::{P2PConnectionTypeReceivers, P2PStatusReceivers};
78
79#[derive(Clone)]
80pub struct ConsensusApi {
81    /// Our server configuration
82    pub cfg: ServerConfig,
83    /// Directory where config files are stored
84    pub cfg_dir: PathBuf,
85    /// Database for serving the API
86    pub db: Database,
87    /// Modules registered with the federation
88    pub modules: ServerModuleRegistry,
89    /// Cached client config
90    pub client_cfg: ClientConfig,
91    pub force_api_secret: Option<String>,
92    /// For sending API events to consensus such as transactions
93    pub submission_sender: async_channel::Sender<ConsensusItem>,
94    pub shutdown_receiver: Receiver<Option<u64>>,
95    pub shutdown_sender: Sender<Option<u64>>,
96    pub ord_latency_receiver: watch::Receiver<Option<Duration>>,
97    pub p2p_status_receivers: P2PStatusReceivers,
98    pub p2p_connection_type_receivers: P2PConnectionTypeReceivers,
99    pub ci_status_receivers: BTreeMap<PeerId, Receiver<Option<u64>>>,
100    pub bitcoin_rpc_connection: ServerBitcoinRpcMonitor,
101    pub supported_api_versions: SupportedApiVersionsSummary,
102    pub code_version_str: String,
103    pub task_group: TaskGroup,
104}
105
106impl ConsensusApi {
107    pub fn api_versions_summary(&self) -> &SupportedApiVersionsSummary {
108        &self.supported_api_versions
109    }
110
111    pub fn get_active_api_secret(&self) -> Option<String> {
112        // TODO: In the future, we might want to fetch it from the DB, so it's possible
113        // to customize from the UX
114        self.force_api_secret.clone()
115    }
116
117    // we want to return an error if and only if the submitted transaction is
118    // invalid and will be rejected if we were to submit it to consensus
119    pub async fn submit_transaction(
120        &self,
121        transaction: Transaction,
122    ) -> Result<TransactionId, TransactionError> {
123        let txid = transaction.tx_hash();
124
125        debug!(target: LOG_NET_API, %txid, "Received a submitted transaction");
126
127        // Create read-only DB tx so that the read state is consistent
128        let mut dbtx = self.db.begin_transaction_nc().await;
129        // we already processed the transaction before
130        if dbtx
131            .get_value(&AcceptedTransactionKey(txid))
132            .await
133            .is_some()
134        {
135            debug!(target: LOG_NET_API, %txid, "Transaction already accepted");
136            return Ok(txid);
137        }
138
139        // We ignore any writes, as we only verify if the transaction is valid here
140        dbtx.ignore_uncommitted();
141
142        process_transaction_with_dbtx(
143            self.modules.clone(),
144            &mut dbtx,
145            &transaction,
146            self.cfg.consensus.version,
147            TxProcessingMode::Submission,
148        )
149        .await
150        .inspect_err(|err| {
151            debug!(target: LOG_NET_API, %txid, err = %err.fmt_compact(), "Transaction rejected");
152        })?;
153
154        let _ = self
155            .submission_sender
156            .send(ConsensusItem::Transaction(transaction.clone()))
157            .await
158            .inspect_err(|err| {
159                warn!(target: LOG_NET_API, %txid, err = %err.fmt_compact(), "Unable to submit the tx into consensus");
160            });
161
162        Ok(txid)
163    }
164
165    pub async fn await_transaction(
166        &self,
167        txid: TransactionId,
168    ) -> (Vec<ModuleInstanceId>, DatabaseTransaction<'_, Committable>) {
169        self.db
170            .wait_key_check(&AcceptedTransactionKey(txid), std::convert::identity)
171            .await
172    }
173
174    pub async fn await_output_outcome(
175        &self,
176        outpoint: OutPoint,
177    ) -> Result<SerdeModuleEncoding<DynOutputOutcome>> {
178        let (module_ids, mut dbtx) = self.await_transaction(outpoint.txid).await;
179
180        let module_id = module_ids
181            .into_iter()
182            .nth(outpoint.out_idx as usize)
183            .with_context(|| format!("Outpoint index out of bounds {outpoint:?}"))?;
184
185        #[allow(deprecated)]
186        let outcome = self
187            .modules
188            .get_expect(module_id)
189            .output_status(
190                &mut dbtx.to_ref_with_prefix_module_id(module_id).0.into_nc(),
191                outpoint,
192                module_id,
193            )
194            .await
195            .context("No output outcome for outpoint")?;
196
197        Ok((&outcome).into())
198    }
199
200    pub async fn session_count(&self) -> u64 {
201        get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
202    }
203
204    pub async fn await_signed_session_outcome(&self, index: u64) -> SignedSessionOutcome {
205        self.db
206            .wait_key_check(&SignedSessionOutcomeKey(index), std::convert::identity)
207            .await
208            .0
209    }
210
211    pub async fn session_status(&self, session_index: u64) -> SessionStatusV2 {
212        let mut dbtx = self.db.begin_transaction_nc().await;
213
214        match session_index.cmp(&get_finished_session_count_static(&mut dbtx).await) {
215            Ordering::Greater => SessionStatusV2::Initial,
216            Ordering::Equal => SessionStatusV2::Pending(
217                dbtx.find_by_prefix(&AcceptedItemPrefix)
218                    .await
219                    .map(|entry| entry.1)
220                    .collect()
221                    .await,
222            ),
223            Ordering::Less => SessionStatusV2::Complete(
224                dbtx.get_value(&SignedSessionOutcomeKey(session_index))
225                    .await
226                    .expect("There are no gaps in session outcomes"),
227            ),
228        }
229    }
230
231    pub async fn get_federation_status(&self) -> ApiResult<LegacyFederationStatus> {
232        let session_count = self.session_count().await;
233        let scheduled_shutdown = self.shutdown_receiver.borrow().to_owned();
234
235        let status_by_peer = self
236            .p2p_status_receivers
237            .iter()
238            .map(|(peer, p2p_receiver)| {
239                let ci_receiver = self.ci_status_receivers.get(peer).unwrap();
240
241                let consensus_status = LegacyPeerStatus {
242                    connection_status: match *p2p_receiver.borrow() {
243                        Some(..) => LegacyP2PConnectionStatus::Connected,
244                        None => LegacyP2PConnectionStatus::Disconnected,
245                    },
246                    last_contribution: *ci_receiver.borrow(),
247                    flagged: ci_receiver.borrow().unwrap_or(0) + 1 < session_count,
248                };
249
250                (*peer, consensus_status)
251            })
252            .collect::<HashMap<PeerId, LegacyPeerStatus>>();
253
254        let peers_flagged = status_by_peer
255            .values()
256            .filter(|status| status.flagged)
257            .count() as u64;
258
259        let peers_online = status_by_peer
260            .values()
261            .filter(|status| status.connection_status == LegacyP2PConnectionStatus::Connected)
262            .count() as u64;
263
264        let peers_offline = status_by_peer
265            .values()
266            .filter(|status| status.connection_status == LegacyP2PConnectionStatus::Disconnected)
267            .count() as u64;
268
269        Ok(LegacyFederationStatus {
270            session_count,
271            status_by_peer,
272            peers_online,
273            peers_offline,
274            peers_flagged,
275            scheduled_shutdown,
276        })
277    }
278
279    fn shutdown(&self, index: Option<u64>) {
280        self.shutdown_sender.send_replace(index);
281    }
282
283    async fn get_federation_audit(&self) -> ApiResult<AuditSummary> {
284        let mut dbtx = self.db.begin_transaction_nc().await;
285        // Writes are related to compacting audit keys, which we can safely ignore
286        // within an API request since the compaction will happen when constructing an
287        // audit in the consensus server
288        dbtx.ignore_uncommitted();
289
290        let mut audit = Audit::default();
291        let mut module_instance_id_to_kind: HashMap<ModuleInstanceId, String> = HashMap::new();
292        for (module_instance_id, kind, module) in self.modules.iter_modules() {
293            module_instance_id_to_kind.insert(module_instance_id, kind.as_str().to_string());
294            module
295                .audit(
296                    &mut dbtx.to_ref_with_prefix_module_id(module_instance_id).0,
297                    &mut audit,
298                    module_instance_id,
299                )
300                .await;
301        }
302        Ok(AuditSummary::from_audit(
303            &audit,
304            &module_instance_id_to_kind,
305        ))
306    }
307
308    /// Uses the in-memory config to write a config backup tar archive that
309    /// guardians can download. Private keys are encrypted with the guardian
310    /// password, so it should be safe to store anywhere, this also means the
311    /// backup is useless without the password.
312    fn get_guardian_config_backup(
313        &self,
314        password: &str,
315        _auth: &GuardianAuthToken,
316    ) -> GuardianConfigBackup {
317        let mut tar_archive_builder = tar::Builder::new(Vec::new());
318
319        let mut append = |name: &Path, data: &[u8]| {
320            let mut header = tar::Header::new_gnu();
321            header.set_path(name).expect("Error setting path");
322            header.set_size(data.len() as u64);
323            header.set_mode(0o644);
324            header.set_cksum();
325            tar_archive_builder
326                .append(&header, data)
327                .expect("Error adding data to tar archive");
328        };
329
330        append(
331            &PathBuf::from(LOCAL_CONFIG).with_extension(JSON_EXT),
332            &serde_json::to_vec(&self.cfg.local).expect("Error encoding local config"),
333        );
334
335        append(
336            &PathBuf::from(CONSENSUS_CONFIG).with_extension(JSON_EXT),
337            &serde_json::to_vec(&self.cfg.consensus).expect("Error encoding consensus config"),
338        );
339
340        // Note that the encrypted config returned here uses a different salt than the
341        // on-disk version. While this may be confusing it shouldn't be a problem since
342        // the content and encryption key are the same. It's unpractical to read the
343        // on-disk version here since the server/api aren't aware of the config dir and
344        // ideally we can keep it that way.
345        let encryption_salt = random_salt();
346        append(&PathBuf::from(SALT_FILE), encryption_salt.as_bytes());
347
348        let private_config_bytes =
349            serde_json::to_vec(&self.cfg.private).expect("Error encoding private config");
350        let encryption_key = get_encryption_key(password, &encryption_salt)
351            .expect("Generating key from password failed");
352        let private_config_encrypted =
353            hex::encode(encrypt(private_config_bytes, &encryption_key).expect("Encryption failed"));
354        append(
355            &PathBuf::from(PRIVATE_CONFIG).with_extension(ENCRYPTED_EXT),
356            private_config_encrypted.as_bytes(),
357        );
358
359        let tar_archive_bytes = tar_archive_builder
360            .into_inner()
361            .expect("Error building tar archive");
362
363        GuardianConfigBackup { tar_archive_bytes }
364    }
365
366    async fn handle_backup_request(
367        &self,
368        dbtx: &mut DatabaseTransaction<'_>,
369        request: SignedBackupRequest,
370    ) -> Result<(), ApiError> {
371        let request = request
372            .verify_valid(SECP256K1)
373            .map_err(|_| ApiError::bad_request("invalid request".into()))?;
374
375        if request.payload.len() > BACKUP_REQUEST_MAX_PAYLOAD_SIZE_BYTES {
376            return Err(ApiError::bad_request("snapshot too large".into()));
377        }
378        debug!(target: LOG_NET_API, id = %request.id, len = request.payload.len(), "Received client backup request");
379        if let Some(prev) = dbtx.get_value(&ClientBackupKey(request.id)).await
380            && request.timestamp <= prev.timestamp
381        {
382            debug!(target: LOG_NET_API, id = %request.id, len = request.payload.len(), "Received client backup request with old timestamp - ignoring");
383            return Err(ApiError::bad_request("timestamp too small".into()));
384        }
385
386        info!(target: LOG_NET_API, id = %request.id, len = request.payload.len(), "Storing new client backup");
387        let overwritten = dbtx
388            .insert_entry(
389                &ClientBackupKey(request.id),
390                &ClientBackupSnapshot {
391                    timestamp: request.timestamp,
392                    data: request.payload.clone(),
393                },
394            )
395            .await
396            .is_some();
397        BACKUP_WRITE_SIZE_BYTES.observe(request.payload.len() as f64);
398        if !overwritten {
399            dbtx.on_commit(|| STORED_BACKUPS_COUNT.inc());
400        }
401
402        Ok(())
403    }
404
405    async fn handle_recover_request(
406        &self,
407        dbtx: &mut DatabaseTransaction<'_>,
408        id: PublicKey,
409    ) -> Option<ClientBackupSnapshot> {
410        dbtx.get_value(&ClientBackupKey(id)).await
411    }
412
413    /// List API URL announcements from all peers we have received them from (at
414    /// least ourselves)
415    async fn api_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
416        self.db
417            .begin_transaction_nc()
418            .await
419            .find_by_prefix(&ApiAnnouncementPrefix)
420            .await
421            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
422            .collect()
423            .await
424    }
425
426    /// Returns the tagged fedimintd version currently running
427    fn fedimintd_version(&self) -> String {
428        self.code_version_str.clone()
429    }
430
431    /// Add an API URL announcement from a peer to our database to be returned
432    /// by [`ConsensusApi::api_announcements`].
433    async fn submit_api_announcement(
434        &self,
435        peer_id: PeerId,
436        announcement: SignedApiAnnouncement,
437    ) -> Result<(), ApiError> {
438        let Some(peer_key) = self.cfg.consensus.broadcast_public_keys.get(&peer_id) else {
439            return Err(ApiError::bad_request("Peer not in federation".into()));
440        };
441
442        if !announcement.verify(SECP256K1, peer_key) {
443            return Err(ApiError::bad_request("Invalid signature".into()));
444        }
445
446        let mut dbtx = self.db.begin_transaction().await;
447
448        if let Some(existing_announcement) = dbtx.get_value(&ApiAnnouncementKey(peer_id)).await {
449            // If the current announcement is semantically identical to the new one (except
450            // for potentially having a different, valid signature) we return ok to allow
451            // the caller to stop submitting the value if they are in a retry loop.
452            if existing_announcement.api_announcement == announcement.api_announcement {
453                return Ok(());
454            }
455
456            // We only accept announcements with a nonce higher than the current one to
457            // avoid replay attacks.
458            if existing_announcement.api_announcement.nonce >= announcement.api_announcement.nonce {
459                return Err(ApiError::bad_request(
460                    "Outdated or redundant announcement".into(),
461                ));
462            }
463        }
464
465        dbtx.insert_entry(&ApiAnnouncementKey(peer_id), &announcement)
466            .await;
467        dbtx.commit_tx().await;
468        Ok(())
469    }
470
471    async fn sign_api_announcement(&self, new_url: SafeUrl) -> SignedApiAnnouncement {
472        self.db
473            .autocommit(
474                |dbtx, _| {
475                    let new_url_inner = new_url.clone();
476                    Box::pin(async move {
477                        let new_nonce = dbtx
478                            .get_value(&ApiAnnouncementKey(self.cfg.local.identity))
479                            .await
480                            .map_or(0, |a| a.api_announcement.nonce + 1);
481                        let announcement = ApiAnnouncement {
482                            api_url: new_url_inner,
483                            nonce: new_nonce,
484                        };
485                        let ctx = secp256k1::Secp256k1::new();
486                        let signed_announcement = announcement
487                            .sign(&ctx, &self.cfg.private.broadcast_secret_key.keypair(&ctx));
488
489                        dbtx.insert_entry(
490                            &ApiAnnouncementKey(self.cfg.local.identity),
491                            &signed_announcement,
492                        )
493                        .await;
494
495                        Result::<_, ()>::Ok(signed_announcement)
496                    })
497                },
498                None,
499            )
500            .await
501            .expect("Will not terminate on error")
502    }
503
504    /// Changes the guardian password by re-encrypting the private config and
505    /// changing the on-disk password file if present. `fedimintd` is shut down
506    /// afterward, the user's service manager (e.g. `systemd` is expected to
507    /// restart it).
508    fn change_guardian_password(
509        &self,
510        new_password: &str,
511        _auth: &GuardianAuthToken,
512    ) -> Result<(), ApiError> {
513        reencrypt_private_config(&self.cfg_dir, &self.cfg.private, new_password)
514            .map_err(|e| ApiError::server_error(format!("Failed to change password: {e}")))?;
515
516        info!(target: LOG_NET_API, "Successfully changed guardian password, shutting down to restart with new password");
517        self.task_group.shutdown();
518
519        Ok(())
520    }
521}
522
523#[async_trait]
524impl HasApiContext<ConsensusApi> for ConsensusApi {
525    async fn context(
526        &self,
527        request: &ApiRequestErased,
528        id: Option<ModuleInstanceId>,
529    ) -> (&ConsensusApi, ApiEndpointContext<'_>) {
530        let mut db = self.db.clone();
531        let mut dbtx = self.db.begin_transaction().await;
532        if let Some(id) = id {
533            db = self.db.with_prefix_module_id(id).0;
534            dbtx = dbtx.with_prefix_module_id(id).0;
535        }
536        (
537            self,
538            ApiEndpointContext::new(
539                db,
540                dbtx,
541                request.auth == Some(self.cfg.private.api_auth.clone()),
542                request.auth.clone(),
543            ),
544        )
545    }
546}
547
548#[async_trait]
549impl HasApiContext<DynServerModule> for ConsensusApi {
550    async fn context(
551        &self,
552        request: &ApiRequestErased,
553        id: Option<ModuleInstanceId>,
554    ) -> (&DynServerModule, ApiEndpointContext<'_>) {
555        let (_, context): (&ConsensusApi, _) = self.context(request, id).await;
556        (
557            self.modules.get_expect(id.expect("required module id")),
558            context,
559        )
560    }
561}
562
563#[async_trait]
564impl IDashboardApi for ConsensusApi {
565    async fn auth(&self) -> ApiAuth {
566        self.cfg.private.api_auth.clone()
567    }
568
569    async fn guardian_id(&self) -> PeerId {
570        self.cfg.local.identity
571    }
572
573    async fn guardian_names(&self) -> BTreeMap<PeerId, String> {
574        self.cfg
575            .consensus
576            .api_endpoints()
577            .iter()
578            .map(|(peer_id, endpoint)| (*peer_id, endpoint.name.clone()))
579            .collect()
580    }
581
582    async fn federation_name(&self) -> String {
583        self.cfg
584            .consensus
585            .meta
586            .get(META_FEDERATION_NAME_KEY)
587            .cloned()
588            .expect("Federation name must be set")
589    }
590
591    async fn session_count(&self) -> u64 {
592        self.session_count().await
593    }
594
595    async fn get_session_status(&self, session_idx: u64) -> SessionStatusV2 {
596        self.session_status(session_idx).await
597    }
598
599    async fn consensus_ord_latency(&self) -> Option<Duration> {
600        *self.ord_latency_receiver.borrow()
601    }
602
603    async fn p2p_connection_status(&self) -> BTreeMap<PeerId, Option<Duration>> {
604        self.p2p_status_receivers
605            .iter()
606            .map(|(peer, receiver)| (*peer, *receiver.borrow()))
607            .collect()
608    }
609
610    async fn p2p_connection_type_status(&self) -> BTreeMap<PeerId, ConnectionType> {
611        self.p2p_connection_type_receivers
612            .iter()
613            .map(|(peer, receiver)| (*peer, *receiver.borrow()))
614            .collect()
615    }
616
617    async fn federation_invite_code(&self) -> String {
618        self.cfg
619            .get_invite_code(self.get_active_api_secret())
620            .to_string()
621    }
622
623    async fn federation_audit(&self) -> AuditSummary {
624        self.get_federation_audit()
625            .await
626            .expect("Failed to get federation audit")
627    }
628
629    async fn bitcoin_rpc_url(&self) -> SafeUrl {
630        self.bitcoin_rpc_connection.url()
631    }
632
633    async fn bitcoin_rpc_status(&self) -> Option<ServerBitcoinRpcStatus> {
634        self.bitcoin_rpc_connection.status()
635    }
636
637    async fn download_guardian_config_backup(
638        &self,
639        password: &str,
640        guardian_auth: &GuardianAuthToken,
641    ) -> GuardianConfigBackup {
642        self.get_guardian_config_backup(password, guardian_auth)
643    }
644
645    fn get_module_by_kind(&self, kind: ModuleKind) -> Option<&DynServerModule> {
646        self.modules
647            .iter_modules()
648            .find_map(|(_, module_kind, module)| {
649                if *module_kind == kind {
650                    Some(module)
651                } else {
652                    None
653                }
654            })
655    }
656
657    async fn fedimintd_version(&self) -> String {
658        self.code_version_str.clone()
659    }
660}
661
662pub fn server_endpoints() -> Vec<ApiEndpoint<ConsensusApi>> {
663    vec![
664        api_endpoint! {
665            VERSION_ENDPOINT,
666            ApiVersion::new(0, 0),
667            async |fedimint: &ConsensusApi, _context, _v: ()| -> SupportedApiVersionsSummary {
668                Ok(fedimint.api_versions_summary().to_owned())
669            }
670        },
671        api_endpoint! {
672            SUBMIT_TRANSACTION_ENDPOINT,
673            ApiVersion::new(0, 0),
674            async |fedimint: &ConsensusApi, _context, transaction: SerdeTransaction| -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
675                let transaction = transaction
676                    .try_into_inner(&fedimint.modules.decoder_registry())
677                    .map_err(|e| ApiError::bad_request(e.to_string()))?;
678
679                // we return an inner error if and only if the submitted transaction is
680                // invalid and will be rejected if we were to submit it to consensus
681                Ok((&TransactionSubmissionOutcome(fedimint.submit_transaction(transaction).await)).into())
682            }
683        },
684        api_endpoint! {
685            AWAIT_TRANSACTION_ENDPOINT,
686            ApiVersion::new(0, 0),
687            async |fedimint: &ConsensusApi, _context, tx_hash: TransactionId| -> TransactionId {
688                fedimint.await_transaction(tx_hash).await;
689
690                Ok(tx_hash)
691            }
692        },
693        api_endpoint! {
694            AWAIT_OUTPUT_OUTCOME_ENDPOINT,
695            ApiVersion::new(0, 0),
696            async |fedimint: &ConsensusApi, _context, outpoint: OutPoint| -> SerdeModuleEncoding<DynOutputOutcome> {
697                let outcome = fedimint
698                    .await_output_outcome(outpoint)
699                    .await
700                    .map_err(|e| ApiError::bad_request(e.to_string()))?;
701
702                Ok(outcome)
703            }
704        },
705        api_endpoint! {
706            INVITE_CODE_ENDPOINT,
707            ApiVersion::new(0, 0),
708            async |fedimint: &ConsensusApi, _context,  _v: ()| -> String {
709                Ok(fedimint.cfg.get_invite_code(fedimint.get_active_api_secret()).to_string())
710            }
711        },
712        api_endpoint! {
713            FEDERATION_ID_ENDPOINT,
714            ApiVersion::new(0, 2),
715            async |fedimint: &ConsensusApi, _context,  _v: ()| -> String {
716                Ok(fedimint.cfg.calculate_federation_id().to_string())
717            }
718        },
719        api_endpoint! {
720            CLIENT_CONFIG_ENDPOINT,
721            ApiVersion::new(0, 0),
722            async |fedimint: &ConsensusApi, _context, _v: ()| -> ClientConfig {
723                Ok(fedimint.client_cfg.clone())
724            }
725        },
726        // Helper endpoint for Admin UI that can't parse consensus encoding
727        api_endpoint! {
728            CLIENT_CONFIG_JSON_ENDPOINT,
729            ApiVersion::new(0, 0),
730            async |fedimint: &ConsensusApi, _context, _v: ()| -> JsonClientConfig {
731                Ok(fedimint.client_cfg.to_json())
732            }
733        },
734        api_endpoint! {
735            SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT,
736            ApiVersion::new(0, 0),
737            async |fedimint: &ConsensusApi, _context, _v: ()| -> sha256::Hash {
738                Ok(legacy_consensus_config_hash(&fedimint.cfg.consensus))
739            }
740        },
741        api_endpoint! {
742            STATUS_ENDPOINT,
743            ApiVersion::new(0, 0),
744            async |fedimint: &ConsensusApi, _context, _v: ()| -> StatusResponse {
745                Ok(StatusResponse {
746                    server: ServerStatusLegacy::ConsensusRunning,
747                    federation: Some(fedimint.get_federation_status().await?)
748                })}
749        },
750        api_endpoint! {
751            SETUP_STATUS_ENDPOINT,
752            ApiVersion::new(0, 0),
753            async |_f: &ConsensusApi, _c, _v: ()| -> SetupStatus {
754                Ok(SetupStatus::ConsensusIsRunning)
755            }
756        },
757        api_endpoint! {
758            CONSENSUS_ORD_LATENCY_ENDPOINT,
759            ApiVersion::new(0, 0),
760            async |fedimint: &ConsensusApi, _c, _v: ()| -> Option<Duration> {
761                Ok(*fedimint.ord_latency_receiver.borrow())
762            }
763        },
764        api_endpoint! {
765            P2P_CONNECTION_STATUS_ENDPOINT,
766            ApiVersion::new(0, 0),
767            async |fedimint: &ConsensusApi, _c, _v: ()| -> BTreeMap<PeerId, Option<Duration>> {
768                Ok(fedimint.p2p_status_receivers
769                    .iter()
770                    .map(|(peer, receiver)| (*peer, *receiver.borrow()))
771                    .collect())
772            }
773        },
774        api_endpoint! {
775            P2P_CONNECTION_TYPE_ENDPOINT,
776            ApiVersion::new(0, 7),
777            async |fedimint: &ConsensusApi, _c, _v: ()| -> BTreeMap<PeerId, ConnectionType> {
778                Ok(fedimint.p2p_connection_type_receivers
779                    .iter()
780                    .map(|(peer, receiver)| (*peer, *receiver.borrow()))
781                    .collect())
782            }
783        },
784        api_endpoint! {
785            SESSION_COUNT_ENDPOINT,
786            ApiVersion::new(0, 0),
787            async |fedimint: &ConsensusApi, _context, _v: ()| -> u64 {
788                Ok(fedimint.session_count().await)
789            }
790        },
791        api_endpoint! {
792            AWAIT_SESSION_OUTCOME_ENDPOINT,
793            ApiVersion::new(0, 0),
794            async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SessionOutcome> {
795                Ok((&fedimint.await_signed_session_outcome(index).await.session_outcome).into())
796            }
797        },
798        api_endpoint! {
799            AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT,
800            ApiVersion::new(0, 0),
801            async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SignedSessionOutcome> {
802                Ok((&fedimint.await_signed_session_outcome(index).await).into())
803            }
804        },
805        api_endpoint! {
806            SESSION_STATUS_ENDPOINT,
807            ApiVersion::new(0, 1),
808            async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SessionStatus> {
809                Ok((&SessionStatus::from(fedimint.session_status(index).await)).into())
810            }
811        },
812        api_endpoint! {
813            SESSION_STATUS_V2_ENDPOINT,
814            ApiVersion::new(0, 5),
815            async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncodingBase64<SessionStatusV2> {
816                Ok((&fedimint.session_status(index).await).into())
817            }
818        },
819        api_endpoint! {
820            SHUTDOWN_ENDPOINT,
821            ApiVersion::new(0, 3),
822            async |fedimint: &ConsensusApi, context, index: Option<u64>| -> () {
823                check_auth(context)?;
824                fedimint.shutdown(index);
825                Ok(())
826            }
827        },
828        api_endpoint! {
829            AUDIT_ENDPOINT,
830            ApiVersion::new(0, 0),
831            async |fedimint: &ConsensusApi, context, _v: ()| -> AuditSummary {
832                check_auth(context)?;
833                Ok(fedimint.get_federation_audit().await?)
834            }
835        },
836        api_endpoint! {
837            GUARDIAN_CONFIG_BACKUP_ENDPOINT,
838            ApiVersion::new(0, 2),
839            async |fedimint: &ConsensusApi, context, _v: ()| -> GuardianConfigBackup {
840                let auth = check_auth(context)?;
841                let password = context.request_auth().expect("Auth was checked before").0;
842                Ok(fedimint.get_guardian_config_backup(&password, &auth))
843            }
844        },
845        api_endpoint! {
846            BACKUP_ENDPOINT,
847            ApiVersion::new(0, 0),
848            async |fedimint: &ConsensusApi, context, request: SignedBackupRequest| -> () {
849                fedimint
850                    .handle_backup_request(&mut context.dbtx().into_nc(), request).await?;
851                Ok(())
852
853            }
854        },
855        api_endpoint! {
856            RECOVER_ENDPOINT,
857            ApiVersion::new(0, 0),
858            async |fedimint: &ConsensusApi, context, id: PublicKey| -> Option<ClientBackupSnapshot> {
859                Ok(fedimint
860                    .handle_recover_request(&mut context.dbtx().into_nc(), id).await)
861            }
862        },
863        api_endpoint! {
864            AUTH_ENDPOINT,
865            ApiVersion::new(0, 0),
866            async |_fedimint: &ConsensusApi, context, _v: ()| -> () {
867                check_auth(context)?;
868                Ok(())
869            }
870        },
871        api_endpoint! {
872            API_ANNOUNCEMENTS_ENDPOINT,
873            ApiVersion::new(0, 3),
874            async |fedimint: &ConsensusApi, _context, _v: ()| -> BTreeMap<PeerId, SignedApiAnnouncement> {
875                Ok(fedimint.api_announcements().await)
876            }
877        },
878        api_endpoint! {
879            SUBMIT_API_ANNOUNCEMENT_ENDPOINT,
880            ApiVersion::new(0, 3),
881            async |fedimint: &ConsensusApi, _context, submission: SignedApiAnnouncementSubmission| -> () {
882                fedimint.submit_api_announcement(submission.peer_id, submission.signed_api_announcement).await
883            }
884        },
885        api_endpoint! {
886            SIGN_API_ANNOUNCEMENT_ENDPOINT,
887            ApiVersion::new(0, 3),
888            async |fedimint: &ConsensusApi, context, new_url: SafeUrl| -> SignedApiAnnouncement {
889                check_auth(context)?;
890                Ok(fedimint.sign_api_announcement(new_url).await)
891            }
892        },
893        api_endpoint! {
894            FEDIMINTD_VERSION_ENDPOINT,
895            ApiVersion::new(0, 4),
896            async |fedimint: &ConsensusApi, _context, _v: ()| -> String {
897                Ok(fedimint.fedimintd_version())
898            }
899        },
900        api_endpoint! {
901            BACKUP_STATISTICS_ENDPOINT,
902            ApiVersion::new(0, 5),
903            async |_fedimint: &ConsensusApi, context, _v: ()| -> BackupStatistics {
904                check_auth(context)?;
905                Ok(backup_statistics_static(&mut context.dbtx().into_nc()).await)
906            }
907        },
908        api_endpoint! {
909            CHANGE_PASSWORD_ENDPOINT,
910            ApiVersion::new(0, 6),
911            async |fedimint: &ConsensusApi, context, new_password: String| -> () {
912                let auth = check_auth(context)?;
913                fedimint.change_guardian_password(&new_password, &auth)
914            }
915        },
916    ]
917}
918
919pub(crate) async fn backup_statistics_static(
920    dbtx: &mut DatabaseTransaction<'_>,
921) -> BackupStatistics {
922    const DAY_SECS: u64 = 24 * 60 * 60;
923    const WEEK_SECS: u64 = 7 * DAY_SECS;
924    const MONTH_SECS: u64 = 30 * DAY_SECS;
925    const QUARTER_SECS: u64 = 3 * MONTH_SECS;
926
927    let mut backup_stats = BackupStatistics::default();
928
929    let mut all_backups_stream = dbtx.find_by_prefix(&ClientBackupKeyPrefix).await;
930    while let Some((_, backup)) = all_backups_stream.next().await {
931        backup_stats.num_backups += 1;
932        backup_stats.total_size += backup.data.len();
933
934        let age_secs = backup.timestamp.elapsed().unwrap_or_default().as_secs();
935        if age_secs < DAY_SECS {
936            backup_stats.refreshed_1d += 1;
937        }
938        if age_secs < WEEK_SECS {
939            backup_stats.refreshed_1w += 1;
940        }
941        if age_secs < MONTH_SECS {
942            backup_stats.refreshed_1m += 1;
943        }
944        if age_secs < QUARTER_SECS {
945            backup_stats.refreshed_3m += 1;
946        }
947    }
948
949    backup_stats
950}