1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_possible_wrap)]
4#![allow(clippy::cast_precision_loss)]
5#![allow(clippy::cast_sign_loss)]
6#![allow(clippy::doc_markdown)]
7#![allow(clippy::missing_errors_doc)]
8#![allow(clippy::missing_panics_doc)]
9#![allow(clippy::module_name_repetitions)]
10#![allow(clippy::must_use_candidate)]
11#![allow(clippy::needless_lifetimes)]
12#![allow(clippy::ref_option)]
13#![allow(clippy::return_self_not_must_use)]
14#![allow(clippy::similar_names)]
15#![allow(clippy::too_many_lines)]
16#![allow(clippy::needless_pass_by_value)]
17#![allow(clippy::manual_let_else)]
18#![allow(clippy::match_wildcard_for_single_variants)]
19#![allow(clippy::trivially_copy_pass_by_ref)]
20
21extern crate fedimint_core;
24pub mod connection_limits;
25pub mod db;
26
27use std::fs;
28use std::path::{Path, PathBuf};
29use std::time::Duration;
30
31use anyhow::{Context, ensure};
32use bitcoin::hashes::hex::FromHex as _;
33use config::ServerConfig;
34use config::io::{PLAINTEXT_PASSWORD, read_server_config};
35pub use connection_limits::ConnectionLimits;
36use fedimint_aead::random_salt;
37use fedimint_connectors::ConnectorRegistry;
38use fedimint_core::config::P2PMessage;
39use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _};
40use fedimint_core::epoch::ConsensusItem;
41use fedimint_core::net::peers::DynP2PConnections;
42use fedimint_core::task::{TaskGroup, sleep};
43use fedimint_core::util::write_new;
44use fedimint_logging::LOG_CONSENSUS;
45pub use fedimint_server_core as core;
46use fedimint_server_core::ServerModuleInitRegistry;
47use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
48use fedimint_server_core::dashboard_ui::DynDashboardApi;
49use fedimint_server_core::setup_ui::{DynSetupApi, ISetupApi};
50use jsonrpsee::RpcModule;
51use net::api::ApiSecrets;
52use net::p2p::P2PStatusReceivers;
53use net::p2p_connector::IrohConnector;
54use tokio::net::TcpListener;
55use tokio_rustls::rustls;
56use tracing::info;
57
58use crate::config::ConfigGenSettings;
59use crate::config::io::{
60 SALT_FILE, finalize_password_change, recover_interrupted_password_change, trim_password,
61 write_server_config,
62};
63use crate::config::setup::{ConfigGenOutcome, SetupApi};
64use crate::db::{ServerInfo, ServerInfoKey};
65use crate::fedimint_core::net::peers::IP2PConnections;
66use crate::metrics::initialize_gauge_metrics;
67use crate::net::api::announcement::start_api_announcement_service;
68use crate::net::api::guardian_metadata::start_guardian_metadata_service;
69use crate::net::api::pkarr_publish::start_pkarr_publish_service;
70use crate::net::p2p::{ReconnectP2PConnections, p2p_status_channels};
71use crate::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
72
73pub mod metrics;
74
75pub mod consensus;
77
78pub mod net;
80
81pub mod config;
83
84pub type DashboardUiRouter = Box<dyn Fn(DynDashboardApi) -> axum::Router + Send>;
86
87pub type SetupUiRouter = Box<dyn Fn(DynSetupApi) -> axum::Router + Send>;
89
90#[allow(clippy::too_many_arguments)]
91pub async fn run(
92 data_dir: PathBuf,
93 force_api_secrets: ApiSecrets,
94 settings: ConfigGenSettings,
95 db: Database,
96 code_version_str: String,
97 code_version_hash: String,
98 module_init_registry: ServerModuleInitRegistry,
99 task_group: TaskGroup,
100 bitcoin_rpc: DynServerBitcoinRpc,
101 setup_ui_router: SetupUiRouter,
102 dashboard_ui_router: DashboardUiRouter,
103 db_checkpoint_retention: u64,
104 iroh_api_limits: ConnectionLimits,
105) -> anyhow::Result<()> {
106 let (cfg, connections, p2p_status_receivers) = match get_config(&data_dir)? {
107 Some(cfg) => {
108 let connector = if cfg.consensus.iroh_endpoints.is_empty() {
109 TlsTcpConnector::new(
110 cfg.tls_config(),
111 settings.p2p_bind,
112 cfg.local.p2p_endpoints.clone(),
113 cfg.local.identity,
114 )
115 .await
116 .into_dyn()
117 } else {
118 IrohConnector::new(
119 cfg.private.iroh_p2p_sk.clone().unwrap(),
120 settings.p2p_bind,
121 settings.iroh_dns.clone(),
122 settings.iroh_relays.clone(),
123 cfg.consensus
124 .iroh_endpoints
125 .iter()
126 .map(|(peer, endpoints)| (*peer, endpoints.p2p_pk))
127 .collect(),
128 )
129 .await?
130 .into_dyn()
131 };
132
133 let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
134
135 let connections = ReconnectP2PConnections::new(
136 cfg.local.identity,
137 connector,
138 &task_group,
139 p2p_status_senders,
140 )
141 .into_dyn();
142
143 (cfg, connections, p2p_status_receivers)
144 }
145 None => {
146 Box::pin(run_config_gen(
147 data_dir.clone(),
148 settings.clone(),
149 db.clone(),
150 &task_group,
151 code_version_str.clone(),
152 code_version_hash.clone(),
153 force_api_secrets.clone(),
154 setup_ui_router,
155 module_init_registry.clone(),
156 ))
157 .await?
158 }
159 };
160
161 let decoders = module_init_registry.decoders_strict(
162 cfg.consensus
163 .modules
164 .iter()
165 .map(|(id, config)| (*id, &config.kind)),
166 )?;
167
168 let db = db.with_decoders(decoders);
169
170 initialize_gauge_metrics(&task_group, &db).await;
171
172 start_api_announcement_service(&db, &task_group, &cfg, force_api_secrets.get_active()).await?;
173 start_guardian_metadata_service(&db, &task_group, &cfg, force_api_secrets.get_active()).await?;
174 start_pkarr_publish_service(&db, &task_group, &cfg).await?;
175
176 info!(target: LOG_CONSENSUS, "Starting consensus...");
177
178 let connectors = ConnectorRegistry::build_from_server_defaults()
179 .bind()
180 .await?;
181
182 Box::pin(consensus::run(
183 connectors,
184 connections,
185 p2p_status_receivers,
186 settings.api_bind,
187 settings.iroh_dns,
188 settings.iroh_relays,
189 cfg,
190 db,
191 module_init_registry.clone(),
192 &task_group,
193 force_api_secrets,
194 data_dir,
195 code_version_str,
196 code_version_hash,
197 bitcoin_rpc,
198 settings.ui_bind,
199 dashboard_ui_router,
200 db_checkpoint_retention,
201 iroh_api_limits,
202 ))
203 .await?;
204
205 info!(target: LOG_CONSENSUS, "Shutting down tasks...");
206
207 task_group.shutdown();
208
209 Ok(())
210}
211
212async fn update_server_info_version_dbtx(
213 dbtx: &mut DatabaseTransaction<'_>,
214 code_version_str: &str,
215) {
216 let mut server_info = dbtx.get_value(&ServerInfoKey).await.unwrap_or(ServerInfo {
217 init_version: code_version_str.to_string(),
218 last_version: code_version_str.to_string(),
219 });
220 server_info.last_version = code_version_str.to_string();
221 dbtx.insert_entry(&ServerInfoKey, &server_info).await;
222}
223
224pub fn get_config(data_dir: &Path) -> anyhow::Result<Option<ServerConfig>> {
225 recover_interrupted_password_change(data_dir)?;
226
227 let path = data_dir.join(PLAINTEXT_PASSWORD);
229 if let Ok(password_untrimmed) = fs::read_to_string(&path) {
230 let password = trim_password(&password_untrimmed);
231 let cfg = read_server_config(password, data_dir)?;
232 finalize_password_change(data_dir)?;
233 return Ok(Some(cfg));
234 }
235
236 Ok(None)
237}
238
239fn validate_restored_tcp_config(cfg: &ServerConfig) -> anyhow::Result<()> {
250 let tls_key = cfg
251 .private
252 .tls_key
253 .as_ref()
254 .context("Restored TCP config is missing the TLS private key")?;
255 let tls_key_bytes = Vec::from_hex(tls_key).context("Parsing restored TLS private key")?;
256 rustls::pki_types::PrivateKeyDer::try_from(tls_key_bytes)
257 .map_err(|e| anyhow::format_err!("Parsing restored TLS private key DER: {e}"))?;
258
259 ensure!(
260 cfg.consensus.tls_certs.contains_key(&cfg.local.identity),
261 "Restored TCP config is missing our TLS certificate"
262 );
263 for (peer, cert) in &cfg.consensus.tls_certs {
264 Vec::from_hex(cert)
265 .with_context(|| format!("Parsing restored TLS certificate for peer {peer}"))?;
266 }
267
268 let tls_config = cfg.tls_config();
269 let mut root_cert_store = rustls::RootCertStore::empty();
270 for cert in tls_config.certificates.values() {
271 root_cert_store
272 .add(cert.clone())
273 .context("Adding restored TLS certificate to root store")?;
274 }
275 let verifier = rustls::server::WebPkiClientVerifier::builder(root_cert_store.into())
276 .build()
277 .context("Creating restored TLS client verifier")?;
278 let certificate = tls_config
279 .certificates
280 .get(&cfg.local.identity)
281 .context("Restored TCP config is missing our TLS certificate")?
282 .clone();
283 rustls::ServerConfig::builder()
284 .with_client_cert_verifier(verifier)
285 .with_single_cert(vec![certificate], tls_config.private_key.clone_key())
286 .context("Creating restored TLS server config")?;
287
288 Ok(())
289}
290
291fn restored_iroh_p2p_key(cfg: &ServerConfig) -> anyhow::Result<iroh::SecretKey> {
298 let iroh_p2p_sk = cfg
299 .private
300 .iroh_p2p_sk
301 .clone()
302 .context("Restored Iroh config is missing the Iroh p2p secret key")?;
303 let local_endpoints = cfg
304 .consensus
305 .iroh_endpoints
306 .get(&cfg.local.identity)
307 .context("Restored Iroh config is missing our Iroh endpoints")?;
308 ensure!(
309 iroh_p2p_sk.public() == local_endpoints.p2p_pk,
310 "Restored Iroh p2p secret key does not match our Iroh endpoint"
311 );
312
313 let iroh_api_sk = cfg
314 .private
315 .iroh_api_sk
316 .clone()
317 .context("Restored Iroh config is missing the Iroh api secret key")?;
318 ensure!(
319 iroh_api_sk.public() == local_endpoints.api_pk,
320 "Restored Iroh api secret key does not match our Iroh endpoint"
321 );
322
323 Ok(iroh_p2p_sk)
324}
325
326#[allow(clippy::too_many_arguments)]
327pub async fn run_config_gen(
328 data_dir: PathBuf,
329 settings: ConfigGenSettings,
330 db: Database,
331 task_group: &TaskGroup,
332 code_version_str: String,
333 code_version_hash: String,
334 api_secrets: ApiSecrets,
335 setup_ui_handler: SetupUiRouter,
336 module_init_registry: ServerModuleInitRegistry,
337) -> anyhow::Result<(
338 ServerConfig,
339 DynP2PConnections<P2PMessage>,
340 P2PStatusReceivers,
341)> {
342 info!(target: LOG_CONSENSUS, "Starting config gen");
343
344 initialize_gauge_metrics(task_group, &db).await;
345
346 let (cgp_sender, mut cgp_receiver) = tokio::sync::mpsc::channel(1);
347
348 let setup_api = SetupApi::new(
349 settings.clone(),
350 db.clone(),
351 data_dir.clone(),
352 cgp_sender,
353 code_version_str.clone(),
354 code_version_hash,
355 );
356
357 let mut rpc_module = RpcModule::new(setup_api.clone());
358
359 net::api::attach_endpoints(&mut rpc_module, config::setup::server_endpoints(), None);
360
361 let api_handler = net::api::spawn(
362 "setup",
363 settings.api_bind,
365 rpc_module,
366 10,
367 api_secrets.clone(),
368 )
369 .await;
370
371 let ui_task_group = TaskGroup::new();
372
373 let ui_service = setup_ui_handler(setup_api.clone().into_dyn()).into_make_service();
374
375 let ui_listener = TcpListener::bind(settings.ui_bind)
376 .await
377 .expect("Failed to bind setup UI");
378
379 ui_task_group.spawn("setup-ui", move |handle| async move {
380 axum::serve(ui_listener, ui_service)
381 .with_graceful_shutdown(handle.make_shutdown_rx())
382 .await
383 .expect("Failed to serve setup UI");
384 });
385
386 info!(target: LOG_CONSENSUS, "Setup UI running at http://{} 🚀", settings.ui_bind);
387
388 loop {
389 let config_gen_outcome = cgp_receiver
390 .recv()
391 .await
392 .expect("Config gen params receiver closed unexpectedly");
393
394 match config_gen_outcome {
395 ConfigGenOutcome::Generated(cg_params) => {
396 sleep(Duration::from_millis(100)).await;
400
401 api_handler
402 .stop()
403 .expect("Config api should still be running");
404
405 api_handler.stopped().await;
406
407 ui_task_group
408 .shutdown_join_all(None)
409 .await
410 .context("Failed to shutdown UI server after config gen")?;
411
412 let cg_params = *cg_params;
413 let connector = if cg_params.iroh_endpoints().is_empty() {
414 TlsTcpConnector::new(
415 cg_params.tls_config(),
416 settings.p2p_bind,
417 cg_params.p2p_urls(),
418 cg_params.identity,
419 )
420 .await
421 .into_dyn()
422 } else {
423 IrohConnector::new(
424 cg_params
425 .iroh_p2p_sk
426 .clone()
427 .expect("Iroh p2p secret key is required for iroh endpoints"),
428 settings.p2p_bind,
429 settings.iroh_dns,
430 settings.iroh_relays,
431 cg_params
432 .iroh_endpoints()
433 .iter()
434 .map(|(peer, endpoints)| (*peer, endpoints.p2p_pk))
435 .collect(),
436 )
437 .await?
438 .into_dyn()
439 };
440
441 let (p2p_status_senders, p2p_status_receivers) =
442 p2p_status_channels(connector.peers());
443
444 let connections = ReconnectP2PConnections::new(
445 cg_params.identity,
446 connector,
447 task_group,
448 p2p_status_senders,
449 )
450 .into_dyn();
451
452 let cfg = ServerConfig::distributed_gen(
453 &cg_params,
454 module_init_registry.clone(),
455 code_version_str.clone(),
456 connections.clone(),
457 p2p_status_receivers.clone(),
458 )
459 .await?;
460
461 assert_ne!(
462 cfg.consensus.iroh_endpoints.is_empty(),
463 cfg.consensus.api_endpoints.is_empty(),
464 );
465
466 write_new(
468 data_dir.join(PLAINTEXT_PASSWORD),
469 cfg.private.api_auth.as_str(),
470 )?;
471 write_new(data_dir.join(SALT_FILE), random_salt())?;
472 write_server_config(
473 &cfg,
474 &data_dir,
475 cfg.private.api_auth.as_str(),
476 &module_init_registry,
477 api_secrets.get_active(),
478 )?;
479
480 return Ok((cfg, connections, p2p_status_receivers));
481 }
482 ConfigGenOutcome::Restored(restored, restore_result_sender) => {
483 let result = async {
487 let restored = *restored;
488 let cfg = &restored.cfg;
489
490 if let Err(e) = module_init_registry.decoders_strict(
491 cfg.consensus
492 .modules
493 .iter()
494 .map(|(id, config)| (*id, &config.kind)),
495 ) {
496 restored.cleanup();
497 return Err(e);
498 }
499
500 if let Err(e) = cfg.validate_config(&cfg.local.identity, &module_init_registry)
501 {
502 restored.cleanup();
503 return Err(e);
504 }
505
506 if cfg.consensus.iroh_endpoints.is_empty()
507 && let Err(e) = validate_restored_tcp_config(cfg)
508 {
509 restored.cleanup();
510 return Err(e);
511 }
512
513 let connector = if cfg.consensus.iroh_endpoints.is_empty() {
517 TlsTcpConnector::new(
518 cfg.tls_config(),
519 settings.p2p_bind,
520 cfg.local.p2p_endpoints.clone(),
521 cfg.local.identity,
522 )
523 .await
524 .into_dyn()
525 } else {
526 let iroh_p2p_sk = match restored_iroh_p2p_key(cfg) {
527 Ok(iroh_p2p_sk) => iroh_p2p_sk,
528 Err(e) => {
529 restored.cleanup();
530 return Err(e);
531 }
532 };
533
534 match IrohConnector::new(
535 iroh_p2p_sk,
536 settings.p2p_bind,
537 settings.iroh_dns.clone(),
538 settings.iroh_relays.clone(),
539 cfg.consensus
540 .iroh_endpoints
541 .iter()
542 .map(|(peer, endpoints)| (*peer, endpoints.p2p_pk))
543 .collect(),
544 )
545 .await
546 {
547 Ok(connector) => connector.into_dyn(),
548 Err(e) => {
549 restored.cleanup();
550 return Err(e);
551 }
552 }
553 };
554
555 let (p2p_status_senders, p2p_status_receivers) =
556 p2p_status_channels(connector.peers());
557 let cfg = restored.install(&data_dir)?;
558
559 Ok((cfg, connector, p2p_status_senders, p2p_status_receivers))
560 }
561 .await;
562
563 let ack = result
564 .as_ref()
565 .map(|_| ())
566 .map_err(std::string::ToString::to_string);
567 let restore_failed = ack.is_err();
568 let _ = restore_result_sender.send(ack);
569
570 if restore_failed {
571 continue;
572 }
573
574 sleep(Duration::from_millis(100)).await;
577
578 api_handler
579 .stop()
580 .expect("Config api should still be running");
581
582 api_handler.stopped().await;
583
584 ui_task_group
585 .shutdown_join_all(None)
586 .await
587 .context("Failed to shutdown UI server after restored config install")?;
588
589 let (cfg, connector, p2p_status_senders, p2p_status_receivers) = result?;
590 let connections = ReconnectP2PConnections::new(
591 cfg.local.identity,
592 connector,
593 task_group,
594 p2p_status_senders,
595 )
596 .into_dyn();
597
598 return Ok((cfg, connections, p2p_status_receivers));
599 }
600 }
601 }
602}