Skip to main content

fedimint_server/consensus/
api.rs

1//! Implements the client API through which users interact with the federation
2use std::cmp::Ordering;
3use std::collections::BTreeMap;
4use std::path::{Path, PathBuf};
5use std::time::Duration;
6
7use anyhow::{Context, Result};
8use async_trait::async_trait;
9use bitcoin::hashes::sha256;
10use fedimint_aead::{encrypt, get_encryption_key, random_salt};
11use fedimint_api_client::api::{
12    LegacyFederationStatus, LegacyP2PConnectionStatus, LegacyPeerStatus, StatusResponse,
13};
14use fedimint_core::admin_client::{GuardianConfigBackup, ServerStatusLegacy, SetupStatus};
15use fedimint_core::backup::{
16    BackupStatistics, ClientBackupKey, ClientBackupKeyPrefix, ClientBackupSnapshot,
17};
18use fedimint_core::config::{ClientConfig, JsonClientConfig, META_FEDERATION_NAME_KEY};
19use fedimint_core::core::backup::{BACKUP_REQUEST_MAX_PAYLOAD_SIZE_BYTES, SignedBackupRequest};
20use fedimint_core::core::{DynOutputOutcome, ModuleInstanceId, ModuleKind};
21use fedimint_core::db::{
22    Committable, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
23};
24#[allow(deprecated)]
25use fedimint_core::endpoint_constants::AWAIT_OUTPUT_OUTCOME_ENDPOINT;
26use fedimint_core::endpoint_constants::{
27    API_ANNOUNCEMENTS_ENDPOINT, AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_OUTPUTS_OUTCOMES_ENDPOINT,
28    AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT,
29    AWAIT_TRANSACTION_ENDPOINT, BACKUP_ENDPOINT, BACKUP_STATISTICS_ENDPOINT, CHAIN_ID_ENDPOINT,
30    CHANGE_PASSWORD_ENDPOINT, CLIENT_CONFIG_ENDPOINT, CLIENT_CONFIG_JSON_ENDPOINT,
31    CONSENSUS_ORD_LATENCY_ENDPOINT, FEDERATION_ID_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT,
32    GUARDIAN_CONFIG_BACKUP_ENDPOINT, GUARDIAN_METADATA_ENDPOINT, INVITE_CODE_ENDPOINT,
33    P2P_CONNECTION_STATUS_ENDPOINT, RECOVER_ENDPOINT, SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT,
34    SESSION_COUNT_ENDPOINT, SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT,
35    SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT, SIGN_API_ANNOUNCEMENT_ENDPOINT,
36    SIGN_GUARDIAN_METADATA_ENDPOINT, STATUS_ENDPOINT, SUBMIT_API_ANNOUNCEMENT_ENDPOINT,
37    SUBMIT_GUARDIAN_METADATA_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERSION_ENDPOINT,
38};
39use fedimint_core::epoch::ConsensusItem;
40use fedimint_core::invite_code::InviteCode;
41use fedimint_core::module::audit::{Audit, AuditSummary};
42use fedimint_core::module::{
43    ApiAuth, ApiEndpoint, ApiEndpointContext, ApiError, ApiRequestErased, ApiResult, ApiVersion,
44    SerdeModuleEncoding, SerdeModuleEncodingBase64, SupportedApiVersionsSummary, api_endpoint,
45};
46use fedimint_core::net::api_announcement::{
47    ApiAnnouncement, SignedApiAnnouncement, SignedApiAnnouncementSubmission,
48};
49use fedimint_core::net::auth::{GuardianAuthToken, check_auth};
50use fedimint_core::secp256k1::{PublicKey, SECP256K1};
51use fedimint_core::session_outcome::{
52    SessionOutcome, SessionStatus, SessionStatusV2, SignedSessionOutcome,
53};
54use fedimint_core::task::TaskGroup;
55use fedimint_core::transaction::{
56    SerdeTransaction, Transaction, TransactionError, TransactionSubmissionOutcome,
57};
58use fedimint_core::util::{FmtCompact, SafeUrl};
59use fedimint_core::version::non_zero_version_hash;
60use fedimint_core::{ChainId, OutPoint, OutPointRange, PeerId, TransactionId, secp256k1};
61use fedimint_logging::LOG_NET_API;
62use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
63use fedimint_server_core::dashboard_ui::{
64    IDashboardApi, P2PConnectionStatus, ServerBitcoinRpcStatus,
65};
66use fedimint_server_core::{DynServerModule, ServerModuleRegistry, ServerModuleRegistryExt};
67use futures::StreamExt;
68use tokio::sync::watch::{self, Receiver, Sender};
69use tracing::{debug, info, warn};
70
71use crate::config::io::{
72    CONSENSUS_CONFIG, ENCRYPTED_EXT, JSON_EXT, LOCAL_CONFIG, PRIVATE_CONFIG, SALT_FILE,
73    reencrypt_private_config,
74};
75use crate::config::{ServerConfig, legacy_consensus_config_hash};
76use crate::consensus::db::{AcceptedItemPrefix, AcceptedTransactionKey, SignedSessionOutcomeKey};
77use crate::consensus::engine::get_finished_session_count_static;
78use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
79use crate::metrics::{BACKUP_WRITE_SIZE_BYTES, STORED_BACKUPS_COUNT};
80use crate::net::api::HasApiContext;
81use crate::net::api::announcement::{ApiAnnouncementKey, ApiAnnouncementPrefix, get_api_urls};
82use crate::net::p2p::P2PStatusReceivers;
83
84#[derive(Clone)]
85pub struct ConsensusApi {
86    /// Our server configuration
87    pub cfg: ServerConfig,
88    /// Directory where config files are stored
89    pub cfg_dir: PathBuf,
90    /// Database for serving the API
91    pub db: Database,
92    /// Modules registered with the federation
93    pub modules: ServerModuleRegistry,
94    /// Cached client config
95    pub client_cfg: ClientConfig,
96    pub force_api_secret: Option<String>,
97    /// For sending API events to consensus such as transactions
98    pub submission_sender: async_channel::Sender<ConsensusItem>,
99    pub shutdown_receiver: Receiver<Option<u64>>,
100    pub shutdown_sender: Sender<Option<u64>>,
101    pub ord_latency_receiver: watch::Receiver<Option<Duration>>,
102    pub p2p_status_receivers: P2PStatusReceivers,
103    pub ci_status_receivers: BTreeMap<PeerId, Receiver<Option<u64>>>,
104    pub bitcoin_rpc_connection: ServerBitcoinRpcMonitor,
105    pub supported_api_versions: SupportedApiVersionsSummary,
106    pub code_version_str: String,
107    pub code_version_hash: String,
108    pub task_group: TaskGroup,
109}
110
111impl ConsensusApi {
112    pub fn api_versions_summary(&self) -> &SupportedApiVersionsSummary {
113        &self.supported_api_versions
114    }
115
116    pub fn get_active_api_secret(&self) -> Option<String> {
117        // TODO: In the future, we might want to fetch it from the DB, so it's possible
118        // to customize from the UX
119        self.force_api_secret.clone()
120    }
121
122    // we want to return an error if and only if the submitted transaction is
123    // invalid and will be rejected if we were to submit it to consensus
124    pub async fn submit_transaction(
125        &self,
126        transaction: Transaction,
127    ) -> Result<TransactionId, TransactionError> {
128        let txid = transaction.tx_hash();
129
130        debug!(target: LOG_NET_API, %txid, "Received a submitted transaction");
131
132        // Create read-only DB tx so that the read state is consistent
133        let mut dbtx = self.db.begin_transaction_nc().await;
134        // we already processed the transaction before
135        if dbtx
136            .get_value(&AcceptedTransactionKey(txid))
137            .await
138            .is_some()
139        {
140            debug!(target: LOG_NET_API, %txid, "Transaction already accepted");
141            return Ok(txid);
142        }
143
144        // We ignore any writes, as we only verify if the transaction is valid here
145        dbtx.ignore_uncommitted();
146
147        process_transaction_with_dbtx(
148            self.modules.clone(),
149            &mut dbtx,
150            &transaction,
151            self.cfg.consensus.version,
152            TxProcessingMode::Submission,
153        )
154        .await
155        .inspect_err(|err| {
156            debug!(target: LOG_NET_API, %txid, err = %err.fmt_compact(), "Transaction rejected");
157        })?;
158
159        let _ = self
160            .submission_sender
161            .send(ConsensusItem::Transaction(transaction.clone()))
162            .await
163            .inspect_err(|err| {
164                warn!(target: LOG_NET_API, %txid, err = %err.fmt_compact(), "Unable to submit the tx into consensus");
165            });
166
167        Ok(txid)
168    }
169
170    pub async fn await_transaction(
171        &self,
172        txid: TransactionId,
173    ) -> (Vec<ModuleInstanceId>, DatabaseTransaction<'_, Committable>) {
174        debug!(target: LOG_NET_API, %txid, "Awaiting transaction acceptance");
175        self.db
176            .wait_key_check(&AcceptedTransactionKey(txid), std::convert::identity)
177            .await
178    }
179
180    pub async fn await_output_outcome(
181        &self,
182        outpoint: OutPoint,
183    ) -> Result<SerdeModuleEncoding<DynOutputOutcome>> {
184        debug!(target: LOG_NET_API, %outpoint, "Awaiting output outcome");
185        let (module_ids, mut dbtx) = self.await_transaction(outpoint.txid).await;
186
187        let module_id = module_ids
188            .into_iter()
189            .nth(outpoint.out_idx as usize)
190            .with_context(|| format!("Outpoint index out of bounds {outpoint:?}"))?;
191
192        #[allow(deprecated)]
193        let outcome = self
194            .modules
195            .get_expect(module_id)
196            .output_status(
197                &mut dbtx.to_ref_with_prefix_module_id(module_id).0.into_nc(),
198                outpoint,
199                module_id,
200            )
201            .await
202            .context("No output outcome for outpoint")?;
203
204        Ok((&outcome).into())
205    }
206
207    pub async fn await_outputs_outcomes(
208        &self,
209        outpoint_range: OutPointRange,
210    ) -> Result<Vec<Option<SerdeModuleEncoding<DynOutputOutcome>>>> {
211        // Wait for the transaction to be accepted first
212        let (module_ids, mut dbtx) = self.await_transaction(outpoint_range.txid()).await;
213
214        let mut outcomes = Vec::with_capacity(outpoint_range.count());
215
216        for outpoint in outpoint_range {
217            let module_id = module_ids
218                .get(outpoint.out_idx as usize)
219                .with_context(|| format!("Outpoint index out of bounds {outpoint:?}"))?;
220
221            #[allow(deprecated)]
222            let outcome = self
223                .modules
224                .get_expect(*module_id)
225                .output_status(
226                    &mut dbtx.to_ref_with_prefix_module_id(*module_id).0.into_nc(),
227                    outpoint,
228                    *module_id,
229                )
230                .await
231                .map(|outcome| (&outcome).into());
232
233            outcomes.push(outcome);
234        }
235
236        Ok(outcomes)
237    }
238
239    pub async fn session_count(&self) -> u64 {
240        get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
241    }
242
243    pub async fn await_signed_session_outcome(&self, index: u64) -> SignedSessionOutcome {
244        self.db
245            .wait_key_check(&SignedSessionOutcomeKey(index), std::convert::identity)
246            .await
247            .0
248    }
249
250    pub async fn session_status(&self, session_index: u64) -> SessionStatusV2 {
251        let mut dbtx = self.db.begin_transaction_nc().await;
252
253        match session_index.cmp(&get_finished_session_count_static(&mut dbtx).await) {
254            Ordering::Greater => SessionStatusV2::Initial,
255            Ordering::Equal => SessionStatusV2::Pending(
256                dbtx.find_by_prefix(&AcceptedItemPrefix)
257                    .await
258                    .map(|entry| entry.1)
259                    .collect()
260                    .await,
261            ),
262            Ordering::Less => SessionStatusV2::Complete(
263                dbtx.get_value(&SignedSessionOutcomeKey(session_index))
264                    .await
265                    .expect("There are no gaps in session outcomes"),
266            ),
267        }
268    }
269
270    pub async fn get_federation_status(&self) -> ApiResult<LegacyFederationStatus> {
271        let session_count = self.session_count().await;
272        let scheduled_shutdown = self.shutdown_receiver.borrow().to_owned();
273
274        let status_by_peer = self
275            .p2p_status_receivers
276            .iter()
277            .map(|(peer, p2p_receiver)| {
278                let ci_receiver = self.ci_status_receivers.get(peer).unwrap();
279
280                let consensus_status = LegacyPeerStatus {
281                    connection_status: match *p2p_receiver.borrow() {
282                        Some(..) => LegacyP2PConnectionStatus::Connected,
283                        None => LegacyP2PConnectionStatus::Disconnected,
284                    },
285                    last_contribution: *ci_receiver.borrow(),
286                    flagged: ci_receiver.borrow().unwrap_or(0) + 1 < session_count,
287                };
288
289                (*peer, consensus_status)
290            })
291            .collect::<BTreeMap<_, _>>();
292
293        let peers_flagged = status_by_peer
294            .values()
295            .filter(|status| status.flagged)
296            .count() as u64;
297
298        let peers_online = status_by_peer
299            .values()
300            .filter(|status| status.connection_status == LegacyP2PConnectionStatus::Connected)
301            .count() as u64;
302
303        let peers_offline = status_by_peer
304            .values()
305            .filter(|status| status.connection_status == LegacyP2PConnectionStatus::Disconnected)
306            .count() as u64;
307
308        Ok(LegacyFederationStatus {
309            session_count,
310            status_by_peer,
311            peers_online,
312            peers_offline,
313            peers_flagged,
314            scheduled_shutdown,
315        })
316    }
317
318    fn shutdown(&self, index: Option<u64>) {
319        self.shutdown_sender.send_replace(index);
320    }
321
322    async fn get_federation_audit(&self) -> ApiResult<AuditSummary> {
323        let mut dbtx = self.db.begin_transaction_nc().await;
324        // Writes are related to compacting audit keys, which we can safely ignore
325        // within an API request since the compaction will happen when constructing an
326        // audit in the consensus server
327        dbtx.ignore_uncommitted();
328
329        let mut audit = Audit::default();
330        let mut module_instance_id_to_kind = BTreeMap::new();
331        for (module_instance_id, kind, module) in self.modules.iter_modules() {
332            module_instance_id_to_kind.insert(module_instance_id, kind.as_str().to_string());
333            module
334                .audit(
335                    &mut dbtx.to_ref_with_prefix_module_id(module_instance_id).0,
336                    &mut audit,
337                    module_instance_id,
338                )
339                .await;
340        }
341        Ok(AuditSummary::from_audit(
342            &audit,
343            &module_instance_id_to_kind,
344        ))
345    }
346
347    /// Uses the in-memory config to write a config backup tar archive that
348    /// guardians can download. Private keys are encrypted with the guardian
349    /// password, so it should be safe to store anywhere, this also means the
350    /// backup is useless without the password.
351    fn get_guardian_config_backup(
352        &self,
353        password: &str,
354        _auth: &GuardianAuthToken,
355    ) -> GuardianConfigBackup {
356        let mut tar_archive_builder = tar::Builder::new(Vec::new());
357
358        let mut append = |name: &Path, data: &[u8]| {
359            let mut header = tar::Header::new_gnu();
360            header.set_path(name).expect("Error setting path");
361            header.set_size(data.len() as u64);
362            header.set_mode(0o644);
363            header.set_cksum();
364            tar_archive_builder
365                .append(&header, data)
366                .expect("Error adding data to tar archive");
367        };
368
369        append(
370            &PathBuf::from(LOCAL_CONFIG).with_extension(JSON_EXT),
371            &serde_json::to_vec(&self.cfg.local).expect("Error encoding local config"),
372        );
373
374        append(
375            &PathBuf::from(CONSENSUS_CONFIG).with_extension(JSON_EXT),
376            &serde_json::to_vec(&self.cfg.consensus).expect("Error encoding consensus config"),
377        );
378
379        // Note that the encrypted config returned here uses a different salt than the
380        // on-disk version. While this may be confusing it shouldn't be a problem since
381        // the content and encryption key are the same. It's unpractical to read the
382        // on-disk version here since the server/api aren't aware of the config dir and
383        // ideally we can keep it that way.
384        let encryption_salt = random_salt();
385        append(&PathBuf::from(SALT_FILE), encryption_salt.as_bytes());
386
387        let private_config_bytes =
388            serde_json::to_vec(&self.cfg.private).expect("Error encoding private config");
389        let encryption_key = get_encryption_key(password, &encryption_salt)
390            .expect("Generating key from password failed");
391        let private_config_encrypted =
392            hex::encode(encrypt(private_config_bytes, &encryption_key).expect("Encryption failed"));
393        append(
394            &PathBuf::from(PRIVATE_CONFIG).with_extension(ENCRYPTED_EXT),
395            private_config_encrypted.as_bytes(),
396        );
397
398        let tar_archive_bytes = tar_archive_builder
399            .into_inner()
400            .expect("Error building tar archive");
401
402        GuardianConfigBackup { tar_archive_bytes }
403    }
404
405    async fn handle_backup_request(
406        &self,
407        dbtx: &mut DatabaseTransaction<'_>,
408        request: SignedBackupRequest,
409    ) -> Result<(), ApiError> {
410        let request = request
411            .verify_valid(SECP256K1)
412            .map_err(|_| ApiError::bad_request("invalid request".into()))?;
413
414        if request.payload.len() > BACKUP_REQUEST_MAX_PAYLOAD_SIZE_BYTES {
415            return Err(ApiError::bad_request("snapshot too large".into()));
416        }
417        debug!(target: LOG_NET_API, id = %request.id, len = request.payload.len(), "Received client backup request");
418        if let Some(prev) = dbtx.get_value(&ClientBackupKey(request.id)).await
419            && request.timestamp <= prev.timestamp
420        {
421            debug!(target: LOG_NET_API, id = %request.id, len = request.payload.len(), "Received client backup request with old timestamp - ignoring");
422            return Err(ApiError::bad_request("timestamp too small".into()));
423        }
424
425        info!(target: LOG_NET_API, id = %request.id, len = request.payload.len(), "Storing new client backup");
426        let overwritten = dbtx
427            .insert_entry(
428                &ClientBackupKey(request.id),
429                &ClientBackupSnapshot {
430                    timestamp: request.timestamp,
431                    data: request.payload.clone(),
432                },
433            )
434            .await
435            .is_some();
436        BACKUP_WRITE_SIZE_BYTES.observe(request.payload.len() as f64);
437        if !overwritten {
438            dbtx.on_commit(|| STORED_BACKUPS_COUNT.inc());
439        }
440
441        Ok(())
442    }
443
444    async fn handle_recover_request(
445        &self,
446        dbtx: &mut DatabaseTransaction<'_>,
447        id: PublicKey,
448    ) -> Option<ClientBackupSnapshot> {
449        dbtx.get_value(&ClientBackupKey(id)).await
450    }
451
452    /// List API URL announcements from all peers we have received them from (at
453    /// least ourselves)
454    async fn api_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
455        self.db
456            .begin_transaction_nc()
457            .await
458            .find_by_prefix(&ApiAnnouncementPrefix)
459            .await
460            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
461            .collect()
462            .await
463    }
464
465    /// Returns the tagged fedimintd version currently running
466    fn fedimintd_version(&self) -> String {
467        self.code_version_str.clone()
468    }
469
470    /// Add an API URL announcement from a peer to our database to be returned
471    /// by [`ConsensusApi::api_announcements`].
472    async fn submit_api_announcement(
473        &self,
474        peer_id: PeerId,
475        announcement: SignedApiAnnouncement,
476    ) -> Result<(), ApiError> {
477        let Some(peer_key) = self.cfg.consensus.broadcast_public_keys.get(&peer_id) else {
478            return Err(ApiError::bad_request("Peer not in federation".into()));
479        };
480
481        if !announcement.verify(SECP256K1, peer_key) {
482            return Err(ApiError::bad_request("Invalid signature".into()));
483        }
484
485        // Use autocommit to handle potential transaction conflicts with retries
486        self.db
487            .autocommit(
488                |dbtx, _| {
489                    let announcement = announcement.clone();
490                    Box::pin(async move {
491                        if let Some(existing_announcement) =
492                            dbtx.get_value(&ApiAnnouncementKey(peer_id)).await
493                        {
494                            // If the current announcement is semantically identical to the new one
495                            // (except for potentially having a
496                            // different, valid signature) we return ok to allow
497                            // the caller to stop submitting the value if they are in a retry loop.
498                            if existing_announcement.api_announcement
499                                == announcement.api_announcement
500                            {
501                                return Ok(());
502                            }
503
504                            // We only accept announcements with a nonce higher than the current one
505                            // to avoid replay attacks.
506                            if existing_announcement.api_announcement.nonce
507                                >= announcement.api_announcement.nonce
508                            {
509                                return Err(ApiError::bad_request(
510                                    "Outdated or redundant announcement".into(),
511                                ));
512                            }
513                        }
514
515                        dbtx.insert_entry(&ApiAnnouncementKey(peer_id), &announcement)
516                            .await;
517                        Ok(())
518                    })
519                },
520                None,
521            )
522            .await
523            .map_err(|e| match e {
524                fedimint_core::db::AutocommitError::ClosureError { error, .. } => error,
525                fedimint_core::db::AutocommitError::CommitFailed { last_error, .. } => {
526                    ApiError::server_error(format!("Database commit failed: {last_error}"))
527                }
528            })
529    }
530
531    async fn sign_api_announcement(&self, new_url: SafeUrl) -> SignedApiAnnouncement {
532        self.db
533            .autocommit(
534                |dbtx, _| {
535                    let new_url_inner = new_url.clone();
536                    Box::pin(async move {
537                        let new_nonce = dbtx
538                            .get_value(&ApiAnnouncementKey(self.cfg.local.identity))
539                            .await
540                            .map_or(0, |a| a.api_announcement.nonce + 1);
541                        let announcement = ApiAnnouncement {
542                            api_url: new_url_inner,
543                            nonce: new_nonce,
544                        };
545                        let ctx = secp256k1::Secp256k1::new();
546                        let signed_announcement = announcement
547                            .sign(&ctx, &self.cfg.private.broadcast_secret_key.keypair(&ctx));
548
549                        dbtx.insert_entry(
550                            &ApiAnnouncementKey(self.cfg.local.identity),
551                            &signed_announcement,
552                        )
553                        .await;
554
555                        Result::<_, ()>::Ok(signed_announcement)
556                    })
557                },
558                None,
559            )
560            .await
561            .expect("Will not terminate on error")
562    }
563
564    async fn guardian_metadata_list(
565        &self,
566    ) -> BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
567        use crate::net::api::guardian_metadata::{GuardianMetadataKey, GuardianMetadataPrefix};
568
569        self.db
570            .begin_transaction_nc()
571            .await
572            .find_by_prefix(&GuardianMetadataPrefix)
573            .await
574            .map(|(key, metadata): (GuardianMetadataKey, _)| (key.0, metadata))
575            .collect()
576            .await
577    }
578
579    async fn submit_guardian_metadata(
580        &self,
581        peer_id: PeerId,
582        metadata: fedimint_core::net::guardian_metadata::SignedGuardianMetadata,
583    ) -> Result<(), ApiError> {
584        use crate::net::api::guardian_metadata::GuardianMetadataKey;
585
586        let Some(peer_key) = self.cfg.consensus.broadcast_public_keys.get(&peer_id) else {
587            return Err(ApiError::bad_request("Peer not in federation".into()));
588        };
589
590        let now = fedimint_core::time::duration_since_epoch();
591        if let Err(e) = metadata.verify(SECP256K1, peer_key, now) {
592            return Err(ApiError::bad_request(format!(
593                "Invalid signature or timestamp: {e}"
594            )));
595        }
596
597        let mut dbtx = self.db.begin_transaction().await;
598
599        if let Some(existing_metadata) = dbtx.get_value(&GuardianMetadataKey(peer_id)).await {
600            // If the current metadata is semantically identical to the new one (except
601            // for potentially having a different, valid signature) we return ok to allow
602            // the caller to stop submitting the value if they are in a retry loop.
603            if existing_metadata.bytes == metadata.bytes {
604                return Ok(());
605            }
606
607            // Only update if the new metadata has a newer timestamp
608            if metadata.guardian_metadata().timestamp_secs
609                <= existing_metadata.guardian_metadata().timestamp_secs
610            {
611                return Err(ApiError::bad_request(
612                    "New metadata timestamp is not newer than existing".into(),
613                ));
614            }
615        }
616
617        dbtx.insert_entry(&GuardianMetadataKey(peer_id), &metadata)
618            .await;
619        dbtx.commit_tx().await;
620
621        Ok(())
622    }
623
624    async fn sign_guardian_metadata(
625        &self,
626        new_metadata: fedimint_core::net::guardian_metadata::GuardianMetadata,
627    ) -> fedimint_core::net::guardian_metadata::SignedGuardianMetadata {
628        use crate::net::api::guardian_metadata::GuardianMetadataKey;
629
630        let ctx = secp256k1::Secp256k1::new();
631        let signed_metadata =
632            new_metadata.sign(&ctx, &self.cfg.private.broadcast_secret_key.keypair(&ctx));
633
634        self.db
635            .autocommit(
636                |dbtx, _| {
637                    let signed_metadata_inner = signed_metadata.clone();
638                    Box::pin(async move {
639                        dbtx.insert_entry(
640                            &GuardianMetadataKey(self.cfg.local.identity),
641                            &signed_metadata_inner,
642                        )
643                        .await;
644
645                        Result::<_, ()>::Ok(signed_metadata_inner)
646                    })
647                },
648                None,
649            )
650            .await
651            .expect("Will not terminate on error")
652    }
653
654    /// Changes the guardian password by re-encrypting the private config and
655    /// changing the on-disk password file if present. `fedimintd` is shut down
656    /// afterward, the user's service manager (e.g. `systemd` is expected to
657    /// restart it).
658    fn change_guardian_password(
659        &self,
660        new_password: &str,
661        _auth: &GuardianAuthToken,
662    ) -> Result<(), ApiError> {
663        reencrypt_private_config(&self.cfg_dir, &self.cfg.private, new_password)
664            .map_err(|e| ApiError::server_error(format!("Failed to change password: {e}")))?;
665
666        info!(target: LOG_NET_API, "Successfully changed guardian password");
667
668        Ok(())
669    }
670
671    async fn get_invite_code(&self, api_secret: Option<String>) -> InviteCode {
672        let identity = self.cfg.local.identity;
673        let mut api_urls = get_api_urls(&self.db, &self.cfg.consensus).await;
674
675        InviteCode::new(
676            api_urls
677                .remove(&identity)
678                .expect("API URL for our identity must be present"),
679            identity,
680            self.cfg.calculate_federation_id(),
681            api_secret,
682        )
683    }
684}
685
686#[async_trait]
687impl HasApiContext<ConsensusApi> for ConsensusApi {
688    async fn context(
689        &self,
690        request: &ApiRequestErased,
691        id: Option<ModuleInstanceId>,
692    ) -> (&ConsensusApi, ApiEndpointContext) {
693        let mut db = self.db.clone();
694        if let Some(id) = id {
695            db = self.db.with_prefix_module_id(id).0;
696        }
697        (
698            self,
699            ApiEndpointContext::new(
700                db,
701                request
702                    .auth
703                    .as_ref()
704                    .is_some_and(|auth| self.cfg.private.api_auth.verify(auth.as_str())),
705                request.auth.clone(),
706            ),
707        )
708    }
709}
710
711#[async_trait]
712impl HasApiContext<DynServerModule> for ConsensusApi {
713    async fn context(
714        &self,
715        request: &ApiRequestErased,
716        id: Option<ModuleInstanceId>,
717    ) -> (&DynServerModule, ApiEndpointContext) {
718        let (_, context): (&ConsensusApi, _) = self.context(request, id).await;
719        (
720            self.modules.get_expect(id.expect("required module id")),
721            context,
722        )
723    }
724}
725
726#[async_trait]
727impl IDashboardApi for ConsensusApi {
728    async fn auth(&self) -> ApiAuth {
729        self.cfg.private.api_auth.clone()
730    }
731
732    async fn guardian_id(&self) -> PeerId {
733        self.cfg.local.identity
734    }
735
736    async fn guardian_names(&self) -> BTreeMap<PeerId, String> {
737        self.cfg
738            .consensus
739            .api_endpoints()
740            .iter()
741            .map(|(peer_id, endpoint)| (*peer_id, endpoint.name.clone()))
742            .collect()
743    }
744
745    async fn federation_name(&self) -> String {
746        self.cfg
747            .consensus
748            .meta
749            .get(META_FEDERATION_NAME_KEY)
750            .cloned()
751            .expect("Federation name must be set")
752    }
753
754    async fn session_count(&self) -> u64 {
755        self.session_count().await
756    }
757
758    async fn get_session_status(&self, session_idx: u64) -> SessionStatusV2 {
759        self.session_status(session_idx).await
760    }
761
762    async fn consensus_ord_latency(&self) -> Option<Duration> {
763        *self.ord_latency_receiver.borrow()
764    }
765
766    async fn p2p_connection_status(&self) -> BTreeMap<PeerId, Option<P2PConnectionStatus>> {
767        self.p2p_status_receivers
768            .iter()
769            .map(|(peer, receiver)| (*peer, receiver.borrow().clone()))
770            .collect()
771    }
772
773    async fn federation_invite_code(&self) -> String {
774        self.get_invite_code(self.get_active_api_secret())
775            .await
776            .to_string()
777    }
778
779    async fn federation_audit(&self) -> AuditSummary {
780        self.get_federation_audit()
781            .await
782            .expect("Failed to get federation audit")
783    }
784
785    async fn bitcoin_rpc_url(&self) -> SafeUrl {
786        self.bitcoin_rpc_connection.url()
787    }
788
789    async fn bitcoin_rpc_status(&self) -> Option<ServerBitcoinRpcStatus> {
790        self.bitcoin_rpc_connection.status()
791    }
792
793    async fn download_guardian_config_backup(
794        &self,
795        password: &str,
796        guardian_auth: &GuardianAuthToken,
797    ) -> GuardianConfigBackup {
798        self.get_guardian_config_backup(password, guardian_auth)
799    }
800
801    fn get_module_by_kind(&self, kind: ModuleKind) -> Option<&DynServerModule> {
802        self.modules
803            .iter_modules()
804            .find_map(|(_, module_kind, module)| {
805                if *module_kind == kind {
806                    Some(module)
807                } else {
808                    None
809                }
810            })
811    }
812
813    async fn fedimintd_version(&self) -> String {
814        self.code_version_str.clone()
815    }
816
817    async fn fedimintd_version_hash(&self) -> Option<String> {
818        non_zero_version_hash(&self.code_version_hash).map(str::to_owned)
819    }
820
821    async fn change_password(
822        &self,
823        new_password: &str,
824        current_password: &str,
825        guardian_auth: &GuardianAuthToken,
826    ) -> Result<(), String> {
827        let auth = self.auth().await;
828        if !auth.verify(current_password) {
829            return Err("Current password is incorrect".into());
830        }
831        self.change_guardian_password(new_password, guardian_auth)
832            .map_err(|e| e.to_string())
833    }
834}
835
836pub fn server_endpoints() -> Vec<ApiEndpoint<ConsensusApi>> {
837    vec![
838        api_endpoint! {
839            VERSION_ENDPOINT,
840            ApiVersion::new(0, 0),
841            async |fedimint: &ConsensusApi, _context, _v: ()| -> SupportedApiVersionsSummary {
842                Ok(fedimint.api_versions_summary().to_owned())
843            }
844        },
845        api_endpoint! {
846            SUBMIT_TRANSACTION_ENDPOINT,
847            ApiVersion::new(0, 0),
848            async |fedimint: &ConsensusApi, _context, transaction: SerdeTransaction| -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
849                let transaction = transaction
850                    .try_into_inner(&fedimint.modules.decoder_registry())
851                    .map_err(|e| ApiError::bad_request(e.to_string()))?;
852
853                // we return an inner error if and only if the submitted transaction is
854                // invalid and will be rejected if we were to submit it to consensus
855                Ok((&TransactionSubmissionOutcome(fedimint.submit_transaction(transaction).await)).into())
856            }
857        },
858        api_endpoint! {
859            AWAIT_TRANSACTION_ENDPOINT,
860            ApiVersion::new(0, 0),
861            async |fedimint: &ConsensusApi, _context, tx_hash: TransactionId| -> TransactionId {
862                fedimint.await_transaction(tx_hash).await;
863
864                Ok(tx_hash)
865            }
866        },
867        api_endpoint! {
868            AWAIT_OUTPUT_OUTCOME_ENDPOINT,
869            ApiVersion::new(0, 0),
870            async |fedimint: &ConsensusApi, _context, outpoint: OutPoint| -> SerdeModuleEncoding<DynOutputOutcome> {
871                let outcome = fedimint
872                    .await_output_outcome(outpoint)
873                    .await
874                    .map_err(|e| ApiError::bad_request(e.to_string()))?;
875
876                Ok(outcome)
877            }
878        },
879        api_endpoint! {
880            AWAIT_OUTPUTS_OUTCOMES_ENDPOINT,
881            ApiVersion::new(0, 8),
882            async |fedimint: &ConsensusApi, _context, outpoint_range: OutPointRange| -> Vec<Option<SerdeModuleEncoding<DynOutputOutcome>>> {
883                let outcomes = fedimint
884                    .await_outputs_outcomes(outpoint_range)
885                    .await
886                    .map_err(|e| ApiError::bad_request(e.to_string()))?;
887
888                Ok(outcomes)
889            }
890        },
891        api_endpoint! {
892            INVITE_CODE_ENDPOINT,
893            ApiVersion::new(0, 0),
894            async |fedimint: &ConsensusApi, _context,  _v: ()| -> String {
895                Ok(fedimint.get_invite_code(fedimint.get_active_api_secret()).await.to_string())
896            }
897        },
898        api_endpoint! {
899            FEDERATION_ID_ENDPOINT,
900            ApiVersion::new(0, 2),
901            async |fedimint: &ConsensusApi, _context,  _v: ()| -> String {
902                Ok(fedimint.cfg.calculate_federation_id().to_string())
903            }
904        },
905        api_endpoint! {
906            CLIENT_CONFIG_ENDPOINT,
907            ApiVersion::new(0, 0),
908            async |fedimint: &ConsensusApi, _context, _v: ()| -> ClientConfig {
909                Ok(fedimint.client_cfg.clone())
910            }
911        },
912        // Helper endpoint for Admin UI that can't parse consensus encoding
913        api_endpoint! {
914            CLIENT_CONFIG_JSON_ENDPOINT,
915            ApiVersion::new(0, 0),
916            async |fedimint: &ConsensusApi, _context, _v: ()| -> JsonClientConfig {
917                Ok(fedimint.client_cfg.to_json())
918            }
919        },
920        api_endpoint! {
921            SERVER_CONFIG_CONSENSUS_HASH_ENDPOINT,
922            ApiVersion::new(0, 0),
923            async |fedimint: &ConsensusApi, _context, _v: ()| -> sha256::Hash {
924                Ok(legacy_consensus_config_hash(&fedimint.cfg.consensus))
925            }
926        },
927        api_endpoint! {
928            STATUS_ENDPOINT,
929            ApiVersion::new(0, 0),
930            async |fedimint: &ConsensusApi, _context, _v: ()| -> StatusResponse {
931                Ok(StatusResponse {
932                    server: ServerStatusLegacy::ConsensusRunning,
933                    federation: Some(fedimint.get_federation_status().await?)
934                })}
935        },
936        api_endpoint! {
937            SETUP_STATUS_ENDPOINT,
938            ApiVersion::new(0, 0),
939            async |_f: &ConsensusApi, _c, _v: ()| -> SetupStatus {
940                Ok(SetupStatus::ConsensusIsRunning)
941            }
942        },
943        api_endpoint! {
944            CONSENSUS_ORD_LATENCY_ENDPOINT,
945            ApiVersion::new(0, 0),
946            async |fedimint: &ConsensusApi, _c, _v: ()| -> Option<Duration> {
947                Ok(*fedimint.ord_latency_receiver.borrow())
948            }
949        },
950        api_endpoint! {
951            P2P_CONNECTION_STATUS_ENDPOINT,
952            ApiVersion::new(0, 0),
953            async |fedimint: &ConsensusApi, _c, _v: ()| -> BTreeMap<PeerId, Option<P2PConnectionStatus>> {
954                Ok(fedimint.p2p_status_receivers
955                    .iter()
956                    .map(|(peer, receiver)| (*peer, receiver.borrow().clone()))
957                    .collect())
958            }
959        },
960        api_endpoint! {
961            SESSION_COUNT_ENDPOINT,
962            ApiVersion::new(0, 0),
963            async |fedimint: &ConsensusApi, _context, _v: ()| -> u64 {
964                Ok(fedimint.session_count().await)
965            }
966        },
967        api_endpoint! {
968            AWAIT_SESSION_OUTCOME_ENDPOINT,
969            ApiVersion::new(0, 0),
970            async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SessionOutcome> {
971                Ok((&fedimint.await_signed_session_outcome(index).await.session_outcome).into())
972            }
973        },
974        api_endpoint! {
975            AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT,
976            ApiVersion::new(0, 0),
977            async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SignedSessionOutcome> {
978                Ok((&fedimint.await_signed_session_outcome(index).await).into())
979            }
980        },
981        api_endpoint! {
982            SESSION_STATUS_ENDPOINT,
983            ApiVersion::new(0, 1),
984            async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncoding<SessionStatus> {
985                Ok((&SessionStatus::from(fedimint.session_status(index).await)).into())
986            }
987        },
988        api_endpoint! {
989            SESSION_STATUS_V2_ENDPOINT,
990            ApiVersion::new(0, 5),
991            async |fedimint: &ConsensusApi, _context, index: u64| -> SerdeModuleEncodingBase64<SessionStatusV2> {
992                Ok((&fedimint.session_status(index).await).into())
993            }
994        },
995        api_endpoint! {
996            SHUTDOWN_ENDPOINT,
997            ApiVersion::new(0, 3),
998            async |fedimint: &ConsensusApi, context, index: Option<u64>| -> () {
999                check_auth(context)?;
1000                fedimint.shutdown(index);
1001                Ok(())
1002            }
1003        },
1004        api_endpoint! {
1005            AUDIT_ENDPOINT,
1006            ApiVersion::new(0, 0),
1007            async |fedimint: &ConsensusApi, context, _v: ()| -> AuditSummary {
1008                check_auth(context)?;
1009                Ok(fedimint.get_federation_audit().await?)
1010            }
1011        },
1012        api_endpoint! {
1013            GUARDIAN_CONFIG_BACKUP_ENDPOINT,
1014            ApiVersion::new(0, 2),
1015            async |fedimint: &ConsensusApi, context, _v: ()| -> GuardianConfigBackup {
1016                let auth = check_auth(context)?;
1017                let password = context.request_auth().expect("Auth was checked before").as_str().to_string();
1018                Ok(fedimint.get_guardian_config_backup(&password, &auth))
1019            }
1020        },
1021        api_endpoint! {
1022            BACKUP_ENDPOINT,
1023            ApiVersion::new(0, 0),
1024            async |fedimint: &ConsensusApi, context, request: SignedBackupRequest| -> () {
1025                let db = context.db();
1026                let mut dbtx = db.begin_transaction().await;
1027                fedimint
1028                    .handle_backup_request(&mut dbtx.to_ref_nc(), request).await?;
1029                dbtx.commit_tx_result().await?;
1030                Ok(())
1031
1032            }
1033        },
1034        api_endpoint! {
1035            RECOVER_ENDPOINT,
1036            ApiVersion::new(0, 0),
1037            async |fedimint: &ConsensusApi, context, id: PublicKey| -> Option<ClientBackupSnapshot> {
1038                let db = context.db();
1039                let mut dbtx = db.begin_transaction_nc().await;
1040                Ok(fedimint
1041                    .handle_recover_request(&mut dbtx, id).await)
1042            }
1043        },
1044        api_endpoint! {
1045            AUTH_ENDPOINT,
1046            ApiVersion::new(0, 0),
1047            async |_fedimint: &ConsensusApi, context, _v: ()| -> () {
1048                check_auth(context)?;
1049                Ok(())
1050            }
1051        },
1052        api_endpoint! {
1053            API_ANNOUNCEMENTS_ENDPOINT,
1054            ApiVersion::new(0, 3),
1055            async |fedimint: &ConsensusApi, _context, _v: ()| -> BTreeMap<PeerId, SignedApiAnnouncement> {
1056                Ok(fedimint.api_announcements().await)
1057            }
1058        },
1059        api_endpoint! {
1060            SUBMIT_API_ANNOUNCEMENT_ENDPOINT,
1061            ApiVersion::new(0, 3),
1062            async |fedimint: &ConsensusApi, _context, submission: SignedApiAnnouncementSubmission| -> () {
1063                fedimint.submit_api_announcement(submission.peer_id, submission.signed_api_announcement).await
1064            }
1065        },
1066        api_endpoint! {
1067            SIGN_API_ANNOUNCEMENT_ENDPOINT,
1068            ApiVersion::new(0, 3),
1069            async |fedimint: &ConsensusApi, context, new_url: SafeUrl| -> SignedApiAnnouncement {
1070                check_auth(context)?;
1071                Ok(fedimint.sign_api_announcement(new_url).await)
1072            }
1073        },
1074        api_endpoint! {
1075            GUARDIAN_METADATA_ENDPOINT,
1076            ApiVersion::new(0, 9),
1077            async |fedimint: &ConsensusApi, _context, _v: ()| -> BTreeMap<PeerId, fedimint_core::net::guardian_metadata::SignedGuardianMetadata> {
1078                Ok(fedimint.guardian_metadata_list().await)
1079            }
1080        },
1081        api_endpoint! {
1082            SUBMIT_GUARDIAN_METADATA_ENDPOINT,
1083            ApiVersion::new(0, 9),
1084            async |fedimint: &ConsensusApi, _context, submission: fedimint_core::net::guardian_metadata::SignedGuardianMetadataSubmission| -> () {
1085                fedimint.submit_guardian_metadata(submission.peer_id, submission.signed_guardian_metadata).await
1086            }
1087        },
1088        api_endpoint! {
1089            SIGN_GUARDIAN_METADATA_ENDPOINT,
1090            ApiVersion::new(0, 9),
1091            async |fedimint: &ConsensusApi, context, metadata: fedimint_core::net::guardian_metadata::GuardianMetadata| -> fedimint_core::net::guardian_metadata::SignedGuardianMetadata {
1092                check_auth(context)?;
1093                Ok(fedimint.sign_guardian_metadata(metadata).await)
1094            }
1095        },
1096        api_endpoint! {
1097            FEDIMINTD_VERSION_ENDPOINT,
1098            ApiVersion::new(0, 4),
1099            async |fedimint: &ConsensusApi, _context, _v: ()| -> String {
1100                Ok(fedimint.fedimintd_version())
1101            }
1102        },
1103        api_endpoint! {
1104            BACKUP_STATISTICS_ENDPOINT,
1105            ApiVersion::new(0, 5),
1106            async |_fedimint: &ConsensusApi, context, _v: ()| -> BackupStatistics {
1107                check_auth(context)?;
1108                let db = context.db();
1109                let mut dbtx = db.begin_transaction_nc().await;
1110                Ok(backup_statistics_static(&mut dbtx).await)
1111            }
1112        },
1113        api_endpoint! {
1114            CHANGE_PASSWORD_ENDPOINT,
1115            ApiVersion::new(0, 6),
1116            async |fedimint: &ConsensusApi, context, new_password: String| -> () {
1117                let auth = check_auth(context)?;
1118                fedimint.change_guardian_password(&new_password, &auth)?;
1119                let task_group = fedimint.task_group.clone();
1120                fedimint_core::runtime::spawn("shutdown after password change",  async move {
1121                    info!(target: LOG_NET_API, "Will shutdown after password change");
1122                    fedimint_core:: runtime::sleep(Duration::from_secs(1)).await;
1123                    task_group.shutdown();
1124                });
1125                Ok(())
1126            }
1127        },
1128        api_endpoint! {
1129            CHAIN_ID_ENDPOINT,
1130            ApiVersion::new(0, 9),
1131            async |fedimint: &ConsensusApi, _context, _v: ()| -> ChainId {
1132                fedimint
1133                    .bitcoin_rpc_connection
1134                    .get_chain_id()
1135                    .await
1136                    .map_err(|e| ApiError::server_error(e.to_string()))
1137            }
1138        },
1139    ]
1140}
1141
1142pub(crate) async fn backup_statistics_static(
1143    dbtx: &mut DatabaseTransaction<'_>,
1144) -> BackupStatistics {
1145    const DAY_SECS: u64 = 24 * 60 * 60;
1146    const WEEK_SECS: u64 = 7 * DAY_SECS;
1147    const MONTH_SECS: u64 = 30 * DAY_SECS;
1148    const QUARTER_SECS: u64 = 3 * MONTH_SECS;
1149
1150    let mut backup_stats = BackupStatistics::default();
1151
1152    let mut all_backups_stream = dbtx.find_by_prefix(&ClientBackupKeyPrefix).await;
1153    while let Some((_, backup)) = all_backups_stream.next().await {
1154        backup_stats.num_backups += 1;
1155        backup_stats.total_size += backup.data.len();
1156
1157        let age_secs = backup.timestamp.elapsed().unwrap_or_default().as_secs();
1158        if age_secs < DAY_SECS {
1159            backup_stats.refreshed_1d += 1;
1160        }
1161        if age_secs < WEEK_SECS {
1162            backup_stats.refreshed_1w += 1;
1163        }
1164        if age_secs < MONTH_SECS {
1165            backup_stats.refreshed_1m += 1;
1166        }
1167        if age_secs < QUARTER_SECS {
1168            backup_stats.refreshed_3m += 1;
1169        }
1170    }
1171
1172    backup_stats
1173}