1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_possible_wrap)]
4#![allow(clippy::cast_precision_loss)]
5#![allow(clippy::cast_sign_loss)]
6#![allow(clippy::doc_markdown)]
7#![allow(clippy::missing_errors_doc)]
8#![allow(clippy::missing_panics_doc)]
9#![allow(clippy::module_name_repetitions)]
10#![allow(clippy::must_use_candidate)]
11#![allow(clippy::needless_lifetimes)]
12#![allow(clippy::ref_option)]
13#![allow(clippy::return_self_not_must_use)]
14#![allow(clippy::similar_names)]
15#![allow(clippy::too_many_lines)]
16#![allow(clippy::needless_pass_by_value)]
17#![allow(clippy::manual_let_else)]
18#![allow(clippy::match_wildcard_for_single_variants)]
19#![allow(clippy::trivially_copy_pass_by_ref)]
20
21extern crate fedimint_core;
24pub mod db;
25
26use std::collections::BTreeMap;
27use std::fs;
28use std::path::{Path, PathBuf};
29
30use config::ServerConfig;
31use config::io::{PLAINTEXT_PASSWORD, read_server_config};
32use fedimint_aead::random_salt;
33use fedimint_api_client::api::P2PConnectionStatus;
34use fedimint_core::PeerId;
35use fedimint_core::config::P2PMessage;
36use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _};
37use fedimint_core::epoch::ConsensusItem;
38use fedimint_core::net::peers::DynP2PConnections;
39use fedimint_core::task::TaskGroup;
40use fedimint_core::util::write_new;
41use fedimint_logging::{LOG_CONSENSUS, LOG_CORE};
42pub use fedimint_server_core as core;
43use fedimint_server_core::ServerModuleInitRegistry;
44use jsonrpsee::RpcModule;
45use net::api::ApiSecrets;
46use net::p2p_connector::IrohConnector;
47use tokio::sync::watch;
48use tracing::{info, warn};
49
50use crate::config::ConfigGenSettings;
51use crate::config::api::ConfigGenApi;
52use crate::config::io::{SALT_FILE, write_server_config};
53use crate::db::{ServerInfo, ServerInfoKey};
54use crate::fedimint_core::net::peers::IP2PConnections;
55use crate::metrics::initialize_gauge_metrics;
56use crate::net::api::announcement::start_api_announcement_service;
57use crate::net::p2p::{ReconnectP2PConnections, p2p_status_channels};
58use crate::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
59
60pub mod envs;
61pub mod metrics;
62
63pub mod consensus;
65
66pub mod net;
68
69pub mod config;
71
72pub async fn run(
73 data_dir: PathBuf,
74 force_api_secrets: ApiSecrets,
75 settings: ConfigGenSettings,
76 db: Database,
77 code_version_str: String,
78 module_init_registry: &ServerModuleInitRegistry,
79 task_group: TaskGroup,
80) -> anyhow::Result<()> {
81 let (cfg, connections, p2p_status_receivers) = match get_config(&data_dir)? {
82 Some(cfg) => {
83 let connector = if cfg.consensus.iroh_endpoints.is_empty() {
84 TlsTcpConnector::new(
85 cfg.tls_config(),
86 settings.p2p_bind,
87 cfg.local.p2p_endpoints.clone(),
88 cfg.local.identity,
89 )
90 .await
91 .into_dyn()
92 } else {
93 IrohConnector::new(
94 cfg.private.iroh_p2p_sk.clone().unwrap(),
95 settings.p2p_bind,
96 cfg.consensus
97 .iroh_endpoints
98 .iter()
99 .map(|(peer, endpoints)| (*peer, endpoints.p2p_pk))
100 .collect(),
101 )
102 .await?
103 .into_dyn()
104 };
105
106 let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
107
108 let connections = ReconnectP2PConnections::new(
109 cfg.local.identity,
110 connector,
111 &task_group,
112 p2p_status_senders,
113 )
114 .into_dyn();
115
116 (cfg, connections, p2p_status_receivers)
117 }
118 None => {
119 run_config_gen(
120 data_dir.clone(),
121 settings.clone(),
122 db.clone(),
123 &task_group,
124 code_version_str.clone(),
125 force_api_secrets.clone(),
126 )
127 .await?
128 }
129 };
130
131 let decoders = module_init_registry.decoders_strict(
132 cfg.consensus
133 .modules
134 .iter()
135 .map(|(id, config)| (*id, &config.kind)),
136 )?;
137
138 let db = db.with_decoders(decoders);
139
140 initialize_gauge_metrics(&db).await;
141
142 if cfg.consensus.iroh_endpoints.is_empty() {
143 start_api_announcement_service(&db, &task_group, &cfg, force_api_secrets.get_active())
144 .await?;
145 }
146
147 info!(target: LOG_CONSENSUS, "Starting consensus...");
148
149 Box::pin(consensus::run(
150 connections,
151 p2p_status_receivers,
152 settings.api_bind,
153 cfg,
154 db,
155 module_init_registry.clone(),
156 &task_group,
157 force_api_secrets,
158 data_dir,
159 code_version_str,
160 ))
161 .await?;
162
163 info!(target: LOG_CONSENSUS, "Shutting down tasks...");
164
165 task_group.shutdown();
166
167 Ok(())
168}
169
170async fn update_server_info_version_dbtx(
171 dbtx: &mut DatabaseTransaction<'_>,
172 code_version_str: &str,
173) {
174 let mut server_info = dbtx.get_value(&ServerInfoKey).await.unwrap_or(ServerInfo {
175 init_version: code_version_str.to_string(),
176 last_version: code_version_str.to_string(),
177 });
178 server_info.last_version = code_version_str.to_string();
179 dbtx.insert_entry(&ServerInfoKey, &server_info).await;
180}
181
182pub fn get_config(data_dir: &Path) -> anyhow::Result<Option<ServerConfig>> {
183 let path = data_dir.join(PLAINTEXT_PASSWORD);
185 if let Ok(password_untrimmed) = fs::read_to_string(&path) {
186 let password = password_untrimmed.trim_matches('\n');
190 let password_fully_trimmed = password.trim();
192 if password_fully_trimmed != password {
193 warn!(
194 target: LOG_CORE,
195 path = %path.display(),
196 "Password in the password file contains leading/trailing whitespaces. This will an error in the future."
197 );
198 }
199 return Ok(Some(read_server_config(password, data_dir)?));
200 }
201
202 Ok(None)
203}
204
205pub async fn run_config_gen(
206 data_dir: PathBuf,
207 settings: ConfigGenSettings,
208 db: Database,
209 task_group: &TaskGroup,
210 code_version_str: String,
211 api_secrets: ApiSecrets,
212) -> anyhow::Result<(
213 ServerConfig,
214 DynP2PConnections<P2PMessage>,
215 BTreeMap<PeerId, watch::Receiver<P2PConnectionStatus>>,
216)> {
217 info!(target: LOG_CONSENSUS, "Starting config gen");
218
219 initialize_gauge_metrics(&db).await;
220
221 let (cgp_sender, mut cgp_receiver) = tokio::sync::mpsc::channel(1);
222
223 let config_gen = ConfigGenApi::new(settings.clone(), db.clone(), cgp_sender);
224
225 let mut rpc_module = RpcModule::new(config_gen);
226
227 net::api::attach_endpoints(&mut rpc_module, config::api::server_endpoints(), None);
228
229 let api_handler = net::api::spawn(
230 "config-gen",
231 settings.api_bind,
232 rpc_module,
233 10,
234 api_secrets.clone(),
235 )
236 .await;
237
238 let cg_params = cgp_receiver
239 .recv()
240 .await
241 .expect("Config gen params receiver closed unexpectedly");
242
243 api_handler
244 .stop()
245 .expect("Config api should still be running");
246
247 api_handler.stopped().await;
248
249 let connector = if cg_params.iroh_endpoints().is_empty() {
250 TlsTcpConnector::new(
251 cg_params.tls_config(),
252 settings.p2p_bind,
253 cg_params.p2p_urls(),
254 cg_params.identity,
255 )
256 .await
257 .into_dyn()
258 } else {
259 IrohConnector::new(
260 cg_params.iroh_p2p_sk.clone().unwrap(),
261 settings.p2p_bind,
262 cg_params
263 .iroh_endpoints()
264 .iter()
265 .map(|(peer, endpoints)| (*peer, endpoints.p2p_pk))
266 .collect(),
267 )
268 .await?
269 .into_dyn()
270 };
271
272 let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
273
274 let connections = ReconnectP2PConnections::new(
275 cg_params.identity,
276 connector,
277 task_group,
278 p2p_status_senders,
279 )
280 .into_dyn();
281
282 let cfg = ServerConfig::distributed_gen(
283 &cg_params,
284 settings.registry.clone(),
285 code_version_str.clone(),
286 connections.clone(),
287 p2p_status_receivers.clone(),
288 )
289 .await?;
290
291 assert_ne!(
292 cfg.consensus.iroh_endpoints.is_empty(),
293 cfg.consensus.api_endpoints.is_empty(),
294 );
295
296 write_new(data_dir.join(PLAINTEXT_PASSWORD), &cfg.private.api_auth.0)?;
298 write_new(data_dir.join(SALT_FILE), random_salt())?;
299 write_server_config(
300 &cfg,
301 &data_dir,
302 &cfg.private.api_auth.0,
303 &settings.registry,
304 api_secrets.get_active(),
305 )?;
306
307 Ok((cfg, connections, p2p_status_receivers))
308}