fedimint_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_possible_wrap)]
4#![allow(clippy::cast_precision_loss)]
5#![allow(clippy::cast_sign_loss)]
6#![allow(clippy::doc_markdown)]
7#![allow(clippy::missing_errors_doc)]
8#![allow(clippy::missing_panics_doc)]
9#![allow(clippy::module_name_repetitions)]
10#![allow(clippy::must_use_candidate)]
11#![allow(clippy::needless_lifetimes)]
12#![allow(clippy::ref_option)]
13#![allow(clippy::return_self_not_must_use)]
14#![allow(clippy::similar_names)]
15#![allow(clippy::too_many_lines)]
16#![allow(clippy::needless_pass_by_value)]
17#![allow(clippy::manual_let_else)]
18#![allow(clippy::match_wildcard_for_single_variants)]
19#![allow(clippy::trivially_copy_pass_by_ref)]
20
21//! Server side fedimint module traits
22
23extern crate fedimint_core;
24pub mod db;
25
26use std::collections::BTreeMap;
27use std::fs;
28use std::path::{Path, PathBuf};
29
30use config::ServerConfig;
31use config::io::{PLAINTEXT_PASSWORD, read_server_config};
32use fedimint_aead::random_salt;
33use fedimint_api_client::api::P2PConnectionStatus;
34use fedimint_core::PeerId;
35use fedimint_core::config::P2PMessage;
36use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _};
37use fedimint_core::epoch::ConsensusItem;
38use fedimint_core::net::peers::DynP2PConnections;
39use fedimint_core::task::TaskGroup;
40use fedimint_core::util::write_new;
41use fedimint_logging::{LOG_CONSENSUS, LOG_CORE};
42pub use fedimint_server_core as core;
43use fedimint_server_core::ServerModuleInitRegistry;
44use jsonrpsee::RpcModule;
45use net::api::ApiSecrets;
46use net::p2p_connector::IrohConnector;
47use tokio::sync::watch;
48use tracing::{info, warn};
49
50use crate::config::ConfigGenSettings;
51use crate::config::api::ConfigGenApi;
52use crate::config::io::{SALT_FILE, write_server_config};
53use crate::db::{ServerInfo, ServerInfoKey};
54use crate::fedimint_core::net::peers::IP2PConnections;
55use crate::metrics::initialize_gauge_metrics;
56use crate::net::api::announcement::start_api_announcement_service;
57use crate::net::p2p::{ReconnectP2PConnections, p2p_status_channels};
58use crate::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
59
60pub mod envs;
61pub mod metrics;
62
63/// The actual implementation of consensus
64pub mod consensus;
65
66/// Networking for mint-to-mint and client-to-mint communiccation
67pub mod net;
68
69/// Fedimint toplevel config
70pub mod config;
71
72pub async fn run(
73    data_dir: PathBuf,
74    force_api_secrets: ApiSecrets,
75    settings: ConfigGenSettings,
76    db: Database,
77    code_version_str: String,
78    module_init_registry: &ServerModuleInitRegistry,
79    task_group: TaskGroup,
80) -> anyhow::Result<()> {
81    let (cfg, connections, p2p_status_receivers) = match get_config(&data_dir)? {
82        Some(cfg) => {
83            let connector = if cfg.consensus.iroh_endpoints.is_empty() {
84                TlsTcpConnector::new(
85                    cfg.tls_config(),
86                    settings.p2p_bind,
87                    cfg.local.p2p_endpoints.clone(),
88                    cfg.local.identity,
89                )
90                .await
91                .into_dyn()
92            } else {
93                IrohConnector::new(
94                    cfg.private.iroh_p2p_sk.clone().unwrap(),
95                    settings.p2p_bind,
96                    cfg.consensus
97                        .iroh_endpoints
98                        .iter()
99                        .map(|(peer, endpoints)| (*peer, endpoints.p2p_pk))
100                        .collect(),
101                )
102                .await?
103                .into_dyn()
104            };
105
106            let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
107
108            let connections = ReconnectP2PConnections::new(
109                cfg.local.identity,
110                connector,
111                &task_group,
112                p2p_status_senders,
113            )
114            .into_dyn();
115
116            (cfg, connections, p2p_status_receivers)
117        }
118        None => {
119            run_config_gen(
120                data_dir.clone(),
121                settings.clone(),
122                db.clone(),
123                &task_group,
124                code_version_str.clone(),
125                force_api_secrets.clone(),
126            )
127            .await?
128        }
129    };
130
131    let decoders = module_init_registry.decoders_strict(
132        cfg.consensus
133            .modules
134            .iter()
135            .map(|(id, config)| (*id, &config.kind)),
136    )?;
137
138    let db = db.with_decoders(decoders);
139
140    initialize_gauge_metrics(&db).await;
141
142    if cfg.consensus.iroh_endpoints.is_empty() {
143        start_api_announcement_service(&db, &task_group, &cfg, force_api_secrets.get_active())
144            .await?;
145    }
146
147    info!(target: LOG_CONSENSUS, "Starting consensus...");
148
149    Box::pin(consensus::run(
150        connections,
151        p2p_status_receivers,
152        settings.api_bind,
153        cfg,
154        db,
155        module_init_registry.clone(),
156        &task_group,
157        force_api_secrets,
158        data_dir,
159        code_version_str,
160    ))
161    .await?;
162
163    info!(target: LOG_CONSENSUS, "Shutting down tasks...");
164
165    task_group.shutdown();
166
167    Ok(())
168}
169
170async fn update_server_info_version_dbtx(
171    dbtx: &mut DatabaseTransaction<'_>,
172    code_version_str: &str,
173) {
174    let mut server_info = dbtx.get_value(&ServerInfoKey).await.unwrap_or(ServerInfo {
175        init_version: code_version_str.to_string(),
176        last_version: code_version_str.to_string(),
177    });
178    server_info.last_version = code_version_str.to_string();
179    dbtx.insert_entry(&ServerInfoKey, &server_info).await;
180}
181
182pub fn get_config(data_dir: &Path) -> anyhow::Result<Option<ServerConfig>> {
183    // Attempt get the config with local password, otherwise start config gen
184    let path = data_dir.join(PLAINTEXT_PASSWORD);
185    if let Ok(password_untrimmed) = fs::read_to_string(&path) {
186        // We definitely don't want leading/trailing newlines, and user
187        // editing the file manually will probably get a free newline added
188        // by the text editor.
189        let password = password_untrimmed.trim_matches('\n');
190        // In the future we also don't want to support any leading/trailing newlines
191        let password_fully_trimmed = password.trim();
192        if password_fully_trimmed != password {
193            warn!(
194                target: LOG_CORE,
195                path = %path.display(),
196                "Password in the password file contains leading/trailing whitespaces. This will an error in the future."
197            );
198        }
199        return Ok(Some(read_server_config(password, data_dir)?));
200    }
201
202    Ok(None)
203}
204
205pub async fn run_config_gen(
206    data_dir: PathBuf,
207    settings: ConfigGenSettings,
208    db: Database,
209    task_group: &TaskGroup,
210    code_version_str: String,
211    api_secrets: ApiSecrets,
212) -> anyhow::Result<(
213    ServerConfig,
214    DynP2PConnections<P2PMessage>,
215    BTreeMap<PeerId, watch::Receiver<P2PConnectionStatus>>,
216)> {
217    info!(target: LOG_CONSENSUS, "Starting config gen");
218
219    initialize_gauge_metrics(&db).await;
220
221    let (cgp_sender, mut cgp_receiver) = tokio::sync::mpsc::channel(1);
222
223    let config_gen = ConfigGenApi::new(settings.clone(), db.clone(), cgp_sender);
224
225    let mut rpc_module = RpcModule::new(config_gen);
226
227    net::api::attach_endpoints(&mut rpc_module, config::api::server_endpoints(), None);
228
229    let api_handler = net::api::spawn(
230        "config-gen",
231        settings.api_bind,
232        rpc_module,
233        10,
234        api_secrets.clone(),
235    )
236    .await;
237
238    let cg_params = cgp_receiver
239        .recv()
240        .await
241        .expect("Config gen params receiver closed unexpectedly");
242
243    api_handler
244        .stop()
245        .expect("Config api should still be running");
246
247    api_handler.stopped().await;
248
249    let connector = if cg_params.iroh_endpoints().is_empty() {
250        TlsTcpConnector::new(
251            cg_params.tls_config(),
252            settings.p2p_bind,
253            cg_params.p2p_urls(),
254            cg_params.identity,
255        )
256        .await
257        .into_dyn()
258    } else {
259        IrohConnector::new(
260            cg_params.iroh_p2p_sk.clone().unwrap(),
261            settings.p2p_bind,
262            cg_params
263                .iroh_endpoints()
264                .iter()
265                .map(|(peer, endpoints)| (*peer, endpoints.p2p_pk))
266                .collect(),
267        )
268        .await?
269        .into_dyn()
270    };
271
272    let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
273
274    let connections = ReconnectP2PConnections::new(
275        cg_params.identity,
276        connector,
277        task_group,
278        p2p_status_senders,
279    )
280    .into_dyn();
281
282    let cfg = ServerConfig::distributed_gen(
283        &cg_params,
284        settings.registry.clone(),
285        code_version_str.clone(),
286        connections.clone(),
287        p2p_status_receivers.clone(),
288    )
289    .await?;
290
291    assert_ne!(
292        cfg.consensus.iroh_endpoints.is_empty(),
293        cfg.consensus.api_endpoints.is_empty(),
294    );
295
296    // TODO: Make writing password optional
297    write_new(data_dir.join(PLAINTEXT_PASSWORD), &cfg.private.api_auth.0)?;
298    write_new(data_dir.join(SALT_FILE), random_salt())?;
299    write_server_config(
300        &cfg,
301        &data_dir,
302        &cfg.private.api_auth.0,
303        &settings.registry,
304        api_secrets.get_active(),
305    )?;
306
307    Ok((cfg, connections, p2p_status_receivers))
308}