fedimint_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_possible_wrap)]
4#![allow(clippy::cast_precision_loss)]
5#![allow(clippy::cast_sign_loss)]
6#![allow(clippy::doc_markdown)]
7#![allow(clippy::missing_errors_doc)]
8#![allow(clippy::missing_panics_doc)]
9#![allow(clippy::module_name_repetitions)]
10#![allow(clippy::must_use_candidate)]
11#![allow(clippy::needless_lifetimes)]
12#![allow(clippy::ref_option)]
13#![allow(clippy::return_self_not_must_use)]
14#![allow(clippy::similar_names)]
15#![allow(clippy::too_many_lines)]
16#![allow(clippy::needless_pass_by_value)]
17#![allow(clippy::manual_let_else)]
18#![allow(clippy::match_wildcard_for_single_variants)]
19#![allow(clippy::trivially_copy_pass_by_ref)]
20
21//! Server side fedimint module traits
22
23extern crate fedimint_core;
24pub mod connection_limits;
25pub mod db;
26
27use std::fs;
28use std::path::{Path, PathBuf};
29use std::time::Duration;
30
31use anyhow::Context;
32use config::ServerConfig;
33use config::io::{PLAINTEXT_PASSWORD, read_server_config};
34pub use connection_limits::ConnectionLimits;
35use fedimint_aead::random_salt;
36use fedimint_connectors::ConnectorRegistry;
37use fedimint_core::config::P2PMessage;
38use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped as _};
39use fedimint_core::epoch::ConsensusItem;
40use fedimint_core::net::peers::DynP2PConnections;
41use fedimint_core::task::{TaskGroup, sleep};
42use fedimint_core::util::write_new;
43use fedimint_logging::LOG_CONSENSUS;
44pub use fedimint_server_core as core;
45use fedimint_server_core::ServerModuleInitRegistry;
46use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
47use fedimint_server_core::dashboard_ui::DynDashboardApi;
48use fedimint_server_core::setup_ui::{DynSetupApi, ISetupApi};
49use jsonrpsee::RpcModule;
50use net::api::ApiSecrets;
51use net::p2p::P2PStatusReceivers;
52use net::p2p_connector::IrohConnector;
53use tokio::net::TcpListener;
54use tracing::info;
55
56use crate::config::ConfigGenSettings;
57use crate::config::io::{
58    SALT_FILE, finalize_password_change, recover_interrupted_password_change, trim_password,
59    write_server_config,
60};
61use crate::config::setup::SetupApi;
62use crate::db::{ServerInfo, ServerInfoKey};
63use crate::fedimint_core::net::peers::IP2PConnections;
64use crate::metrics::initialize_gauge_metrics;
65use crate::net::api::announcement::start_api_announcement_service;
66use crate::net::p2p::{
67    P2PConnectionTypeReceivers, ReconnectP2PConnections, p2p_connection_type_channels,
68    p2p_status_channels,
69};
70use crate::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
71
72pub mod metrics;
73
74/// The actual implementation of consensus
75pub mod consensus;
76
77/// Networking for mint-to-mint and client-to-mint communiccation
78pub mod net;
79
80/// Fedimint toplevel config
81pub mod config;
82
83/// A function/closure type for handling dashboard UI
84pub type DashboardUiRouter = Box<dyn Fn(DynDashboardApi) -> axum::Router + Send>;
85
86/// A function/closure type for handling setup UI
87pub type SetupUiRouter = Box<dyn Fn(DynSetupApi) -> axum::Router + Send>;
88
89#[allow(clippy::too_many_arguments)]
90pub async fn run(
91    data_dir: PathBuf,
92    force_api_secrets: ApiSecrets,
93    settings: ConfigGenSettings,
94    db: Database,
95    code_version_str: String,
96    module_init_registry: ServerModuleInitRegistry,
97    task_group: TaskGroup,
98    bitcoin_rpc: DynServerBitcoinRpc,
99    setup_ui_router: SetupUiRouter,
100    dashboard_ui_router: DashboardUiRouter,
101    db_checkpoint_retention: u64,
102    iroh_api_limits: ConnectionLimits,
103) -> anyhow::Result<()> {
104    let (cfg, connections, p2p_status_receivers, p2p_connection_type_receivers) =
105        match get_config(&data_dir)? {
106            Some(cfg) => {
107                let connector = if cfg.consensus.iroh_endpoints.is_empty() {
108                    TlsTcpConnector::new(
109                        cfg.tls_config(),
110                        settings.p2p_bind,
111                        cfg.local.p2p_endpoints.clone(),
112                        cfg.local.identity,
113                    )
114                    .await
115                    .into_dyn()
116                } else {
117                    IrohConnector::new(
118                        cfg.private.iroh_p2p_sk.clone().unwrap(),
119                        settings.p2p_bind,
120                        settings.iroh_dns.clone(),
121                        settings.iroh_relays.clone(),
122                        cfg.consensus
123                            .iroh_endpoints
124                            .iter()
125                            .map(|(peer, endpoints)| (*peer, endpoints.p2p_pk))
126                            .collect(),
127                    )
128                    .await?
129                    .into_dyn()
130                };
131
132                let (p2p_status_senders, p2p_status_receivers) =
133                    p2p_status_channels(connector.peers());
134                let (p2p_connection_type_senders, p2p_connection_type_receivers) =
135                    p2p_connection_type_channels(connector.peers());
136
137                let connections = ReconnectP2PConnections::new(
138                    cfg.local.identity,
139                    connector,
140                    &task_group,
141                    p2p_status_senders,
142                    p2p_connection_type_senders,
143                )
144                .into_dyn();
145
146                (
147                    cfg,
148                    connections,
149                    p2p_status_receivers,
150                    p2p_connection_type_receivers,
151                )
152            }
153            None => {
154                Box::pin(run_config_gen(
155                    data_dir.clone(),
156                    settings.clone(),
157                    db.clone(),
158                    &task_group,
159                    code_version_str.clone(),
160                    force_api_secrets.clone(),
161                    setup_ui_router,
162                ))
163                .await?
164            }
165        };
166
167    let decoders = module_init_registry.decoders_strict(
168        cfg.consensus
169            .modules
170            .iter()
171            .map(|(id, config)| (*id, &config.kind)),
172    )?;
173
174    let db = db.with_decoders(decoders);
175
176    initialize_gauge_metrics(&task_group, &db).await;
177
178    start_api_announcement_service(&db, &task_group, &cfg, force_api_secrets.get_active()).await?;
179
180    info!(target: LOG_CONSENSUS, "Starting consensus...");
181
182    let connectors = ConnectorRegistry::build_from_server_defaults()
183        .bind()
184        .await?;
185
186    Box::pin(consensus::run(
187        connectors,
188        connections,
189        p2p_status_receivers,
190        p2p_connection_type_receivers,
191        settings.api_bind,
192        settings.iroh_dns,
193        settings.iroh_relays,
194        cfg,
195        db,
196        module_init_registry.clone(),
197        &task_group,
198        force_api_secrets,
199        data_dir,
200        code_version_str,
201        bitcoin_rpc,
202        settings.ui_bind,
203        dashboard_ui_router,
204        db_checkpoint_retention,
205        iroh_api_limits,
206    ))
207    .await?;
208
209    info!(target: LOG_CONSENSUS, "Shutting down tasks...");
210
211    task_group.shutdown();
212
213    Ok(())
214}
215
216async fn update_server_info_version_dbtx(
217    dbtx: &mut DatabaseTransaction<'_>,
218    code_version_str: &str,
219) {
220    let mut server_info = dbtx.get_value(&ServerInfoKey).await.unwrap_or(ServerInfo {
221        init_version: code_version_str.to_string(),
222        last_version: code_version_str.to_string(),
223    });
224    server_info.last_version = code_version_str.to_string();
225    dbtx.insert_entry(&ServerInfoKey, &server_info).await;
226}
227
228pub fn get_config(data_dir: &Path) -> anyhow::Result<Option<ServerConfig>> {
229    recover_interrupted_password_change(data_dir)?;
230
231    // Attempt get the config with local password, otherwise start config gen
232    let path = data_dir.join(PLAINTEXT_PASSWORD);
233    if let Ok(password_untrimmed) = fs::read_to_string(&path) {
234        let password = trim_password(&password_untrimmed);
235        let cfg = read_server_config(password, data_dir)?;
236        finalize_password_change(data_dir)?;
237        return Ok(Some(cfg));
238    }
239
240    Ok(None)
241}
242
243pub async fn run_config_gen(
244    data_dir: PathBuf,
245    settings: ConfigGenSettings,
246    db: Database,
247    task_group: &TaskGroup,
248    code_version_str: String,
249    api_secrets: ApiSecrets,
250    setup_ui_handler: SetupUiRouter,
251) -> anyhow::Result<(
252    ServerConfig,
253    DynP2PConnections<P2PMessage>,
254    P2PStatusReceivers,
255    P2PConnectionTypeReceivers,
256)> {
257    info!(target: LOG_CONSENSUS, "Starting config gen");
258
259    initialize_gauge_metrics(task_group, &db).await;
260
261    let (cgp_sender, mut cgp_receiver) = tokio::sync::mpsc::channel(1);
262
263    let setup_api = SetupApi::new(settings.clone(), db.clone(), cgp_sender);
264
265    let mut rpc_module = RpcModule::new(setup_api.clone());
266
267    net::api::attach_endpoints(&mut rpc_module, config::setup::server_endpoints(), None);
268
269    let api_handler = net::api::spawn(
270        "setup",
271        // config gen always uses ws api
272        settings.api_bind,
273        rpc_module,
274        10,
275        api_secrets.clone(),
276    )
277    .await;
278
279    let ui_task_group = TaskGroup::new();
280
281    let ui_service = setup_ui_handler(setup_api.clone().into_dyn()).into_make_service();
282
283    let ui_listener = TcpListener::bind(settings.ui_bind)
284        .await
285        .expect("Failed to bind setup UI");
286
287    ui_task_group.spawn("setup-ui", move |handle| async move {
288        axum::serve(ui_listener, ui_service)
289            .with_graceful_shutdown(handle.make_shutdown_rx())
290            .await
291            .expect("Failed to serve setup UI");
292    });
293
294    info!(target: LOG_CONSENSUS, "Setup UI running at http://{} 🚀", settings.ui_bind);
295
296    let cg_params = cgp_receiver
297        .recv()
298        .await
299        .expect("Config gen params receiver closed unexpectedly");
300
301    // HACK: The `start-dkg` API call needs to have some time to finish
302    // before we shut down api handling. There's no easy and good way to do
303    // that other than just giving it some grace period.
304    sleep(Duration::from_millis(100)).await;
305
306    api_handler
307        .stop()
308        .expect("Config api should still be running");
309
310    api_handler.stopped().await;
311
312    ui_task_group
313        .shutdown_join_all(None)
314        .await
315        .context("Failed to shutdown UI server after config gen")?;
316
317    let connector = if cg_params.iroh_endpoints().is_empty() {
318        TlsTcpConnector::new(
319            cg_params.tls_config(),
320            settings.p2p_bind,
321            cg_params.p2p_urls(),
322            cg_params.identity,
323        )
324        .await
325        .into_dyn()
326    } else {
327        IrohConnector::new(
328            cg_params.iroh_p2p_sk.clone().unwrap(),
329            settings.p2p_bind,
330            settings.iroh_dns,
331            settings.iroh_relays,
332            cg_params
333                .iroh_endpoints()
334                .iter()
335                .map(|(peer, endpoints)| (*peer, endpoints.p2p_pk))
336                .collect(),
337        )
338        .await?
339        .into_dyn()
340    };
341
342    let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
343    let (p2p_connection_type_senders, p2p_connection_type_receivers) =
344        p2p_connection_type_channels(connector.peers());
345
346    let connections = ReconnectP2PConnections::new(
347        cg_params.identity,
348        connector,
349        task_group,
350        p2p_status_senders,
351        p2p_connection_type_senders,
352    )
353    .into_dyn();
354
355    let cfg = ServerConfig::distributed_gen(
356        settings.modules,
357        &cg_params,
358        settings.registry.clone(),
359        code_version_str.clone(),
360        connections.clone(),
361        p2p_status_receivers.clone(),
362    )
363    .await?;
364
365    assert_ne!(
366        cfg.consensus.iroh_endpoints.is_empty(),
367        cfg.consensus.api_endpoints.is_empty(),
368    );
369
370    // TODO: Make writing password optional
371    write_new(data_dir.join(PLAINTEXT_PASSWORD), &cfg.private.api_auth.0)?;
372    write_new(data_dir.join(SALT_FILE), random_salt())?;
373    write_server_config(
374        &cfg,
375        &data_dir,
376        &cfg.private.api_auth.0,
377        &settings.registry,
378        api_secrets.get_active(),
379    )?;
380
381    Ok((
382        cfg,
383        connections,
384        p2p_status_receivers,
385        p2p_connection_type_receivers,
386    ))
387}