1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_possible_wrap)]
4#![allow(clippy::cast_precision_loss)]
5#![allow(clippy::cast_sign_loss)]
6#![allow(clippy::doc_markdown)]
7#![allow(clippy::missing_errors_doc)]
8#![allow(clippy::missing_panics_doc)]
9#![allow(clippy::module_name_repetitions)]
10#![allow(clippy::must_use_candidate)]
11#![allow(clippy::needless_lifetimes)]
12#![allow(clippy::ref_option)]
13#![allow(clippy::return_self_not_must_use)]
14#![allow(clippy::similar_names)]
15#![allow(clippy::too_many_lines)]
16#![allow(clippy::needless_pass_by_value)]
17#![allow(clippy::manual_let_else)]
18#![allow(clippy::match_wildcard_for_single_variants)]
19#![allow(clippy::trivially_copy_pass_by_ref)]
20
21extern crate fedimint_core;
24pub mod db;
25
26use std::fs;
27use std::future::Future;
28use std::net::SocketAddr;
29use std::path::{Path, PathBuf};
30use std::pin::Pin;
31
32use anyhow::Context;
33use config::ServerConfig;
34use config::io::{PLAINTEXT_PASSWORD, read_server_config};
35use fedimint_aead::random_salt;
36use fedimint_core::config::P2PMessage;
37use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _};
38use fedimint_core::epoch::ConsensusItem;
39use fedimint_core::net::peers::DynP2PConnections;
40use fedimint_core::task::{TaskGroup, TaskHandle};
41use fedimint_core::util::write_new;
42use fedimint_logging::{LOG_CONSENSUS, LOG_CORE};
43pub use fedimint_server_core as core;
44use fedimint_server_core::ServerModuleInitRegistry;
45use fedimint_server_core::dashboard_ui::DynDashboardApi;
46use fedimint_server_core::setup_ui::{DynSetupApi, ISetupApi};
47use jsonrpsee::RpcModule;
48use net::api::ApiSecrets;
49use net::p2p::P2PStatusReceivers;
50use net::p2p_connector::IrohConnector;
51use tracing::{info, warn};
52
53use crate::config::ConfigGenSettings;
54use crate::config::io::{SALT_FILE, write_server_config};
55use crate::config::setup::SetupApi;
56use crate::db::{ServerInfo, ServerInfoKey};
57use crate::fedimint_core::net::peers::IP2PConnections;
58use crate::metrics::initialize_gauge_metrics;
59use crate::net::api::announcement::start_api_announcement_service;
60use crate::net::p2p::{ReconnectP2PConnections, p2p_status_channels};
61use crate::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
62
63pub mod envs;
64pub mod metrics;
65
66pub mod consensus;
68
69pub mod net;
71
72pub mod config;
74
75pub type DashboardUiHandler = Box<
77 dyn Fn(DynDashboardApi, SocketAddr, TaskHandle) -> Pin<Box<dyn Future<Output = ()> + Send>>
78 + Send
79 + Sync
80 + 'static,
81>;
82
83pub type SetupUiHandler = Box<
85 dyn Fn(DynSetupApi, SocketAddr, TaskHandle) -> Pin<Box<dyn Future<Output = ()> + Send>>
86 + Send
87 + Sync
88 + 'static,
89>;
90
91#[allow(clippy::too_many_arguments)]
92pub async fn run(
93 data_dir: PathBuf,
94 force_api_secrets: ApiSecrets,
95 settings: ConfigGenSettings,
96 db: Database,
97 code_version_str: String,
98 module_init_registry: &ServerModuleInitRegistry,
99 task_group: TaskGroup,
100 dashboard_ui_handler: Option<DashboardUiHandler>,
101 setup_ui_handler: Option<SetupUiHandler>,
102) -> anyhow::Result<()> {
103 let (cfg, connections, p2p_status_receivers) = match get_config(&data_dir)? {
104 Some(cfg) => {
105 let connector = if cfg.consensus.iroh_endpoints.is_empty() {
106 TlsTcpConnector::new(
107 cfg.tls_config(),
108 settings.p2p_bind,
109 cfg.local.p2p_endpoints.clone(),
110 cfg.local.identity,
111 )
112 .await
113 .into_dyn()
114 } else {
115 IrohConnector::new(
116 cfg.private.iroh_p2p_sk.clone().unwrap(),
117 settings.p2p_bind,
118 cfg.consensus
119 .iroh_endpoints
120 .iter()
121 .map(|(peer, endpoints)| (*peer, endpoints.p2p_pk))
122 .collect(),
123 )
124 .await?
125 .into_dyn()
126 };
127
128 let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
129
130 let connections = ReconnectP2PConnections::new(
131 cfg.local.identity,
132 connector,
133 &task_group,
134 p2p_status_senders,
135 )
136 .into_dyn();
137
138 (cfg, connections, p2p_status_receivers)
139 }
140 None => {
141 Box::pin(run_config_gen(
142 data_dir.clone(),
143 settings.clone(),
144 db.clone(),
145 &task_group,
146 code_version_str.clone(),
147 force_api_secrets.clone(),
148 setup_ui_handler,
149 ))
150 .await?
151 }
152 };
153
154 let decoders = module_init_registry.decoders_strict(
155 cfg.consensus
156 .modules
157 .iter()
158 .map(|(id, config)| (*id, &config.kind)),
159 )?;
160
161 let db = db.with_decoders(decoders);
162
163 initialize_gauge_metrics(&task_group, &db).await;
164
165 start_api_announcement_service(&db, &task_group, &cfg, force_api_secrets.get_active()).await?;
166
167 info!(target: LOG_CONSENSUS, "Starting consensus...");
168
169 Box::pin(consensus::run(
170 connections,
171 p2p_status_receivers,
172 settings.bind_api_ws,
173 settings.bind_api_iroh,
174 cfg,
175 db,
176 module_init_registry.clone(),
177 &task_group,
178 force_api_secrets,
179 data_dir,
180 code_version_str,
181 settings.ui_bind,
182 dashboard_ui_handler,
183 ))
184 .await?;
185
186 info!(target: LOG_CONSENSUS, "Shutting down tasks...");
187
188 task_group.shutdown();
189
190 Ok(())
191}
192
193async fn update_server_info_version_dbtx(
194 dbtx: &mut DatabaseTransaction<'_>,
195 code_version_str: &str,
196) {
197 let mut server_info = dbtx.get_value(&ServerInfoKey).await.unwrap_or(ServerInfo {
198 init_version: code_version_str.to_string(),
199 last_version: code_version_str.to_string(),
200 });
201 server_info.last_version = code_version_str.to_string();
202 dbtx.insert_entry(&ServerInfoKey, &server_info).await;
203}
204
205pub fn get_config(data_dir: &Path) -> anyhow::Result<Option<ServerConfig>> {
206 let path = data_dir.join(PLAINTEXT_PASSWORD);
208 if let Ok(password_untrimmed) = fs::read_to_string(&path) {
209 let password = password_untrimmed.trim_matches('\n');
213 let password_fully_trimmed = password.trim();
215 if password_fully_trimmed != password {
216 warn!(
217 target: LOG_CORE,
218 path = %path.display(),
219 "Password in the password file contains leading/trailing whitespaces. This will an error in the future."
220 );
221 }
222 return Ok(Some(read_server_config(password, data_dir)?));
223 }
224
225 Ok(None)
226}
227
228pub async fn run_config_gen(
229 data_dir: PathBuf,
230 settings: ConfigGenSettings,
231 db: Database,
232 task_group: &TaskGroup,
233 code_version_str: String,
234 api_secrets: ApiSecrets,
235 setup_ui_handler: Option<SetupUiHandler>,
236) -> anyhow::Result<(
237 ServerConfig,
238 DynP2PConnections<P2PMessage>,
239 P2PStatusReceivers,
240)> {
241 info!(target: LOG_CONSENSUS, "Starting config gen");
242
243 initialize_gauge_metrics(task_group, &db).await;
244
245 let (cgp_sender, mut cgp_receiver) = tokio::sync::mpsc::channel(1);
246
247 let config_gen = SetupApi::new(settings.clone(), db.clone(), cgp_sender);
248
249 let mut rpc_module = RpcModule::new(config_gen.clone());
250
251 net::api::attach_endpoints(&mut rpc_module, config::setup::server_endpoints(), None);
252
253 let api_handler = net::api::spawn(
254 "setup",
255 settings.bind_api_ws,
257 rpc_module,
258 10,
259 api_secrets.clone(),
260 )
261 .await;
262
263 let ui_task_group = TaskGroup::new();
264
265 if let Some(setup_ui_handler) = setup_ui_handler {
266 ui_task_group.spawn("web-ui", move |handle| {
267 setup_ui_handler(config_gen.clone().into_dyn(), settings.ui_bind, handle)
268 });
269
270 info!(target: LOG_CONSENSUS, "Setup UI running at http://{} 🚀", settings.ui_bind);
271 }
272
273 let cg_params = cgp_receiver
274 .recv()
275 .await
276 .expect("Config gen params receiver closed unexpectedly");
277
278 api_handler
279 .stop()
280 .expect("Config api should still be running");
281
282 api_handler.stopped().await;
283
284 ui_task_group
285 .shutdown_join_all(None)
286 .await
287 .context("Failed to shutdown UI server after config gen")?;
288
289 let connector = if cg_params.iroh_endpoints().is_empty() {
290 TlsTcpConnector::new(
291 cg_params.tls_config(),
292 settings.p2p_bind,
293 cg_params.p2p_urls(),
294 cg_params.identity,
295 )
296 .await
297 .into_dyn()
298 } else {
299 IrohConnector::new(
300 cg_params.iroh_p2p_sk.clone().unwrap(),
301 settings.p2p_bind,
302 cg_params
303 .iroh_endpoints()
304 .iter()
305 .map(|(peer, endpoints)| (*peer, endpoints.p2p_pk))
306 .collect(),
307 )
308 .await?
309 .into_dyn()
310 };
311
312 let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
313
314 let connections = ReconnectP2PConnections::new(
315 cg_params.identity,
316 connector,
317 task_group,
318 p2p_status_senders,
319 )
320 .into_dyn();
321
322 let cfg = ServerConfig::distributed_gen(
323 settings.modules,
324 &cg_params,
325 settings.registry.clone(),
326 code_version_str.clone(),
327 connections.clone(),
328 p2p_status_receivers.clone(),
329 )
330 .await?;
331
332 assert_ne!(
333 cfg.consensus.iroh_endpoints.is_empty(),
334 cfg.consensus.api_endpoints.is_empty(),
335 );
336
337 write_new(data_dir.join(PLAINTEXT_PASSWORD), &cfg.private.api_auth.0)?;
339 write_new(data_dir.join(SALT_FILE), random_salt())?;
340 write_server_config(
341 &cfg,
342 &data_dir,
343 &cfg.private.api_auth.0,
344 &settings.registry,
345 api_secrets.get_active(),
346 )?;
347
348 Ok((cfg, connections, p2p_status_receivers))
349}