1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_possible_wrap)]
4#![allow(clippy::cast_precision_loss)]
5#![allow(clippy::cast_sign_loss)]
6#![allow(clippy::doc_markdown)]
7#![allow(clippy::missing_errors_doc)]
8#![allow(clippy::missing_panics_doc)]
9#![allow(clippy::module_name_repetitions)]
10#![allow(clippy::must_use_candidate)]
11#![allow(clippy::needless_lifetimes)]
12#![allow(clippy::ref_option)]
13#![allow(clippy::return_self_not_must_use)]
14#![allow(clippy::similar_names)]
15#![allow(clippy::too_many_lines)]
16#![allow(clippy::needless_pass_by_value)]
17#![allow(clippy::manual_let_else)]
18#![allow(clippy::match_wildcard_for_single_variants)]
19#![allow(clippy::trivially_copy_pass_by_ref)]
20
21extern crate fedimint_core;
24pub mod connection_limits;
25pub mod db;
26
27use std::fs;
28use std::path::{Path, PathBuf};
29use std::time::Duration;
30
31use anyhow::Context;
32use config::ServerConfig;
33use config::io::{PLAINTEXT_PASSWORD, read_server_config};
34pub use connection_limits::ConnectionLimits;
35use fedimint_aead::random_salt;
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::P2PMessage;
38use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _};
39use fedimint_core::epoch::ConsensusItem;
40use fedimint_core::net::peers::DynP2PConnections;
41use fedimint_core::task::{TaskGroup, sleep};
42use fedimint_core::util::write_new;
43use fedimint_logging::LOG_CONSENSUS;
44pub use fedimint_server_core as core;
45use fedimint_server_core::ServerModuleInitRegistry;
46use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
47use fedimint_server_core::dashboard_ui::DynDashboardApi;
48use fedimint_server_core::setup_ui::{DynSetupApi, ISetupApi};
49use jsonrpsee::RpcModule;
50use net::api::ApiSecrets;
51use net::p2p::P2PStatusReceivers;
52use net::p2p_connector::IrohConnector;
53use tokio::net::TcpListener;
54use tracing::info;
55
56use crate::config::ConfigGenSettings;
57use crate::config::io::{
58 SALT_FILE, finalize_password_change, recover_interrupted_password_change, trim_password,
59 write_server_config,
60};
61use crate::config::setup::SetupApi;
62use crate::db::{ServerInfo, ServerInfoKey};
63use crate::fedimint_core::net::peers::IP2PConnections;
64use crate::metrics::initialize_gauge_metrics;
65use crate::net::api::announcement::start_api_announcement_service;
66use crate::net::p2p::{ReconnectP2PConnections, p2p_status_channels};
67use crate::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
68
69pub mod metrics;
70
71pub mod consensus;
73
74pub mod net;
76
77pub mod config;
79
80pub type DashboardUiRouter = Box<dyn Fn(DynDashboardApi) -> axum::Router + Send>;
82
83pub type SetupUiRouter = Box<dyn Fn(DynSetupApi) -> axum::Router + Send>;
85
86#[allow(clippy::too_many_arguments)]
87pub async fn run(
88 data_dir: PathBuf,
89 force_api_secrets: ApiSecrets,
90 settings: ConfigGenSettings,
91 db: Database,
92 code_version_str: String,
93 module_init_registry: ServerModuleInitRegistry,
94 task_group: TaskGroup,
95 bitcoin_rpc: DynServerBitcoinRpc,
96 setup_ui_router: SetupUiRouter,
97 dashboard_ui_router: DashboardUiRouter,
98 db_checkpoint_retention: u64,
99 iroh_api_limits: ConnectionLimits,
100) -> anyhow::Result<()> {
101 let (cfg, connections, p2p_status_receivers) = match get_config(&data_dir)? {
102 Some(cfg) => {
103 let connector = if cfg.consensus.iroh_endpoints.is_empty() {
104 TlsTcpConnector::new(
105 cfg.tls_config(),
106 settings.p2p_bind,
107 cfg.local.p2p_endpoints.clone(),
108 cfg.local.identity,
109 )
110 .await
111 .into_dyn()
112 } else {
113 IrohConnector::new(
114 cfg.private.iroh_p2p_sk.clone().unwrap(),
115 settings.p2p_bind,
116 settings.iroh_dns.clone(),
117 settings.iroh_relays.clone(),
118 cfg.consensus
119 .iroh_endpoints
120 .iter()
121 .map(|(peer, endpoints)| (*peer, endpoints.p2p_pk))
122 .collect(),
123 )
124 .await?
125 .into_dyn()
126 };
127
128 let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
129
130 let connections = ReconnectP2PConnections::new(
131 cfg.local.identity,
132 connector,
133 &task_group,
134 p2p_status_senders,
135 )
136 .into_dyn();
137
138 (cfg, connections, p2p_status_receivers)
139 }
140 None => {
141 Box::pin(run_config_gen(
142 data_dir.clone(),
143 settings.clone(),
144 db.clone(),
145 &task_group,
146 code_version_str.clone(),
147 force_api_secrets.clone(),
148 setup_ui_router,
149 module_init_registry.clone(),
150 ))
151 .await?
152 }
153 };
154
155 let decoders = module_init_registry.decoders_strict(
156 cfg.consensus
157 .modules
158 .iter()
159 .map(|(id, config)| (*id, &config.kind)),
160 )?;
161
162 let db = db.with_decoders(decoders);
163
164 initialize_gauge_metrics(&task_group, &db).await;
165
166 start_api_announcement_service(&db, &task_group, &cfg, force_api_secrets.get_active()).await?;
167
168 info!(target: LOG_CONSENSUS, "Starting consensus...");
169
170 let connectors = ConnectorRegistry::build_from_server_defaults()
171 .bind()
172 .await?;
173
174 Box::pin(consensus::run(
175 connectors,
176 connections,
177 p2p_status_receivers,
178 settings.api_bind,
179 settings.iroh_dns,
180 settings.iroh_relays,
181 cfg,
182 db,
183 module_init_registry.clone(),
184 &task_group,
185 force_api_secrets,
186 data_dir,
187 code_version_str,
188 bitcoin_rpc,
189 settings.ui_bind,
190 dashboard_ui_router,
191 db_checkpoint_retention,
192 iroh_api_limits,
193 ))
194 .await?;
195
196 info!(target: LOG_CONSENSUS, "Shutting down tasks...");
197
198 task_group.shutdown();
199
200 Ok(())
201}
202
203async fn update_server_info_version_dbtx(
204 dbtx: &mut DatabaseTransaction<'_>,
205 code_version_str: &str,
206) {
207 let mut server_info = dbtx.get_value(&ServerInfoKey).await.unwrap_or(ServerInfo {
208 init_version: code_version_str.to_string(),
209 last_version: code_version_str.to_string(),
210 });
211 server_info.last_version = code_version_str.to_string();
212 dbtx.insert_entry(&ServerInfoKey, &server_info).await;
213}
214
215pub fn get_config(data_dir: &Path) -> anyhow::Result<Option<ServerConfig>> {
216 recover_interrupted_password_change(data_dir)?;
217
218 let path = data_dir.join(PLAINTEXT_PASSWORD);
220 if let Ok(password_untrimmed) = fs::read_to_string(&path) {
221 let password = trim_password(&password_untrimmed);
222 let cfg = read_server_config(password, data_dir)?;
223 finalize_password_change(data_dir)?;
224 return Ok(Some(cfg));
225 }
226
227 Ok(None)
228}
229
230#[allow(clippy::too_many_arguments)]
231pub async fn run_config_gen(
232 data_dir: PathBuf,
233 settings: ConfigGenSettings,
234 db: Database,
235 task_group: &TaskGroup,
236 code_version_str: String,
237 api_secrets: ApiSecrets,
238 setup_ui_handler: SetupUiRouter,
239 module_init_registry: ServerModuleInitRegistry,
240) -> anyhow::Result<(
241 ServerConfig,
242 DynP2PConnections<P2PMessage>,
243 P2PStatusReceivers,
244)> {
245 info!(target: LOG_CONSENSUS, "Starting config gen");
246
247 initialize_gauge_metrics(task_group, &db).await;
248
249 let (cgp_sender, mut cgp_receiver) = tokio::sync::mpsc::channel(1);
250
251 let setup_api = SetupApi::new(settings.clone(), db.clone(), cgp_sender);
252
253 let mut rpc_module = RpcModule::new(setup_api.clone());
254
255 net::api::attach_endpoints(&mut rpc_module, config::setup::server_endpoints(), None);
256
257 let api_handler = net::api::spawn(
258 "setup",
259 settings.api_bind,
261 rpc_module,
262 10,
263 api_secrets.clone(),
264 )
265 .await;
266
267 let ui_task_group = TaskGroup::new();
268
269 let ui_service = setup_ui_handler(setup_api.clone().into_dyn()).into_make_service();
270
271 let ui_listener = TcpListener::bind(settings.ui_bind)
272 .await
273 .expect("Failed to bind setup UI");
274
275 ui_task_group.spawn("setup-ui", move |handle| async move {
276 axum::serve(ui_listener, ui_service)
277 .with_graceful_shutdown(handle.make_shutdown_rx())
278 .await
279 .expect("Failed to serve setup UI");
280 });
281
282 info!(target: LOG_CONSENSUS, "Setup UI running at http://{} 🚀", settings.ui_bind);
283
284 let cg_params = cgp_receiver
285 .recv()
286 .await
287 .expect("Config gen params receiver closed unexpectedly");
288
289 sleep(Duration::from_millis(100)).await;
293
294 api_handler
295 .stop()
296 .expect("Config api should still be running");
297
298 api_handler.stopped().await;
299
300 ui_task_group
301 .shutdown_join_all(None)
302 .await
303 .context("Failed to shutdown UI server after config gen")?;
304
305 let connector = if cg_params.iroh_endpoints().is_empty() {
306 TlsTcpConnector::new(
307 cg_params.tls_config(),
308 settings.p2p_bind,
309 cg_params.p2p_urls(),
310 cg_params.identity,
311 )
312 .await
313 .into_dyn()
314 } else {
315 IrohConnector::new(
316 cg_params.iroh_p2p_sk.clone().unwrap(),
317 settings.p2p_bind,
318 settings.iroh_dns,
319 settings.iroh_relays,
320 cg_params
321 .iroh_endpoints()
322 .iter()
323 .map(|(peer, endpoints)| (*peer, endpoints.p2p_pk))
324 .collect(),
325 )
326 .await?
327 .into_dyn()
328 };
329
330 let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
331
332 let connections = ReconnectP2PConnections::new(
333 cg_params.identity,
334 connector,
335 task_group,
336 p2p_status_senders,
337 )
338 .into_dyn();
339
340 let cfg = ServerConfig::distributed_gen(
341 &cg_params,
342 module_init_registry.clone(),
343 code_version_str.clone(),
344 connections.clone(),
345 p2p_status_receivers.clone(),
346 )
347 .await?;
348
349 assert_ne!(
350 cfg.consensus.iroh_endpoints.is_empty(),
351 cfg.consensus.api_endpoints.is_empty(),
352 );
353
354 write_new(data_dir.join(PLAINTEXT_PASSWORD), &cfg.private.api_auth.0)?;
356 write_new(data_dir.join(SALT_FILE), random_salt())?;
357 write_server_config(
358 &cfg,
359 &data_dir,
360 &cfg.private.api_auth.0,
361 &module_init_registry,
362 api_secrets.get_active(),
363 )?;
364
365 Ok((cfg, connections, p2p_status_receivers))
366}