1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::env;
10use std::net::SocketAddr;
11use std::path::PathBuf;
12use std::sync::{Arc, RwLock};
13use std::time::Duration;
14
15use anyhow::bail;
16use async_channel::Sender;
17use db::{ServerDbMigrationContext, get_global_database_migrations};
18use fedimint_api_client::api::DynGlobalApi;
19use fedimint_core::NumPeers;
20use fedimint_core::config::P2PMessage;
21use fedimint_core::core::{ModuleInstanceId, ModuleKind};
22use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
23use fedimint_core::envs::is_running_in_test_env;
24use fedimint_core::epoch::ConsensusItem;
25use fedimint_core::module::registry::ModuleRegistry;
26use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
27use fedimint_core::net::peers::DynP2PConnections;
28use fedimint_core::task::TaskGroup;
29use fedimint_core::util::FmtCompactAnyhow as _;
30use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API, LOG_NET_IROH};
31use fedimint_server_core::dashboard_ui::IDashboardApi;
32use fedimint_server_core::migration::apply_migrations_server_dbtx;
33use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
34use futures::FutureExt;
35use iroh::Endpoint;
36use iroh::endpoint::{Incoming, RecvStream, SendStream};
37use jsonrpsee::RpcModule;
38use jsonrpsee::server::ServerHandle;
39use serde_json::Value;
40use tokio::sync::watch;
41use tracing::{info, warn};
42
43use crate::config::{ServerConfig, ServerConfigLocal};
44use crate::consensus::api::{ConsensusApi, server_endpoints};
45use crate::consensus::engine::ConsensusEngine;
46use crate::db::verify_server_db_integrity_dbtx;
47use crate::envs::{FM_DB_CHECKPOINT_RETENTION_DEFAULT, FM_DB_CHECKPOINT_RETENTION_ENV};
48use crate::net::api::announcement::get_api_urls;
49use crate::net::api::{ApiSecrets, HasApiContext};
50use crate::net::p2p::P2PStatusReceivers;
51use crate::{net, update_server_info_version_dbtx};
52
53const TRANSACTION_BUFFER: usize = 1000;
55
56#[allow(clippy::too_many_arguments)]
57pub async fn run(
58 connections: DynP2PConnections<P2PMessage>,
59 p2p_status_receivers: P2PStatusReceivers,
60 ws_api_bind_addr: SocketAddr,
61 iroh_api_bind_addr: SocketAddr,
62 cfg: ServerConfig,
63 db: Database,
64 module_init_registry: ServerModuleInitRegistry,
65 task_group: &TaskGroup,
66 force_api_secrets: ApiSecrets,
67 data_dir: PathBuf,
68 code_version_str: String,
69 ui_bind_addr: SocketAddr,
70 dashboard_ui_handler: Option<crate::DashboardUiHandler>,
71) -> anyhow::Result<()> {
72 cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
73
74 let mut global_dbtx = db.begin_transaction().await;
75 apply_migrations_server_dbtx(
76 &mut global_dbtx.to_ref_nc(),
77 Arc::new(ServerDbMigrationContext),
78 "fedimint-server".to_string(),
79 get_global_database_migrations(),
80 )
81 .await?;
82
83 update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
84
85 if is_running_in_test_env() {
86 verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
87 }
88 global_dbtx.commit_tx_result().await?;
89
90 let mut modules = BTreeMap::new();
91
92 let global_api = DynGlobalApi::from_endpoints(
94 cfg.consensus
95 .api_endpoints()
96 .iter()
97 .map(|(&peer_id, url)| (peer_id, url.url.clone())),
98 &None,
99 )
100 .await?;
101
102 let shared_anymap = Arc::new(RwLock::new(BTreeMap::default()));
103
104 for (module_id, module_cfg) in &cfg.consensus.modules {
105 match module_init_registry.get(&module_cfg.kind) {
106 Some(module_init) => {
107 info!(target: LOG_CORE, "Initialise module {module_id}...");
108
109 let mut dbtx = db.begin_transaction().await;
110 apply_migrations_dbtx(
111 &mut dbtx.to_ref_nc(),
112 Arc::new(ServerDbMigrationContext) as Arc<_>,
113 module_init.module_kind().to_string(),
114 module_init.get_database_migrations(),
115 Some(*module_id),
116 None,
117 )
118 .await?;
119
120 if let Some(used_db_prefixes) = module_init.used_db_prefixes() {
121 if is_running_in_test_env() {
122 verify_module_db_integrity_dbtx(
123 &mut dbtx.to_ref_nc(),
124 *module_id,
125 module_init.module_kind(),
126 &used_db_prefixes,
127 )
128 .await;
129 }
130 }
131 dbtx.commit_tx_result().await?;
132
133 let module = module_init
134 .init(
135 NumPeers::from(cfg.consensus.api_endpoints().len()),
136 cfg.get_module_config(*module_id)?,
137 db.with_prefix_module_id(*module_id).0,
138 task_group,
139 cfg.local.identity,
140 global_api.with_module(*module_id),
141 shared_anymap.clone(),
142 )
143 .await?;
144
145 modules.insert(*module_id, (module_cfg.kind.clone(), module));
146 }
147 None => bail!("Detected configuration for unsupported module id: {module_id}"),
148 };
149 }
150
151 let module_registry = ModuleRegistry::from(modules);
152
153 let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
154
155 let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
156 let (shutdown_sender, shutdown_receiver) = watch::channel(None);
157 let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
158
159 let mut ci_status_senders = BTreeMap::new();
160 let mut ci_status_receivers = BTreeMap::new();
161
162 for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
163 let (ci_sender, ci_receiver) = watch::channel(None);
164
165 ci_status_senders.insert(peer, ci_sender);
166 ci_status_receivers.insert(peer, ci_receiver);
167 }
168
169 let consensus_api = ConsensusApi {
170 cfg: cfg.clone(),
171 db: db.clone(),
172 modules: module_registry.clone(),
173 client_cfg: client_cfg.clone(),
174 submission_sender: submission_sender.clone(),
175 shutdown_sender,
176 shutdown_receiver: shutdown_receiver.clone(),
177 supported_api_versions: ServerConfig::supported_api_versions_summary(
178 &cfg.consensus.modules,
179 &module_init_registry,
180 ),
181 p2p_status_receivers,
182 ci_status_receivers,
183 ord_latency_receiver,
184 force_api_secret: force_api_secrets.get_active(),
185 code_version_str,
186 };
187
188 info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
189
190 let api_handler = start_consensus_api(
191 &cfg.local,
192 consensus_api.clone(),
193 force_api_secrets.clone(),
194 ws_api_bind_addr,
195 )
196 .await;
197
198 if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone() {
199 Box::pin(start_iroh_api(
200 iroh_api_sk,
201 iroh_api_bind_addr,
202 consensus_api.clone(),
203 task_group,
204 ))
205 .await;
206 }
207
208 info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
209
210 for (module_id, kind, module) in module_registry.iter_modules() {
211 submit_module_ci_proposals(
212 task_group,
213 db.clone(),
214 module_id,
215 kind.clone(),
216 module.clone(),
217 submission_sender.clone(),
218 );
219 }
220
221 let checkpoint_retention: String = env::var(FM_DB_CHECKPOINT_RETENTION_ENV)
222 .unwrap_or(FM_DB_CHECKPOINT_RETENTION_DEFAULT.to_string());
223 let checkpoint_retention = checkpoint_retention.parse().unwrap_or_else(|_| {
224 panic!("FM_DB_CHECKPOINT_RETENTION_ENV var is invalid: {checkpoint_retention}")
225 });
226
227 info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
228
229 let api_urls = get_api_urls(&db, &cfg.consensus).await;
230
231 if let Some(dashboard_ui_handler) = dashboard_ui_handler {
232 task_group.spawn("dashboard-ui", move |handle| {
233 dashboard_ui_handler(consensus_api.clone().into_dyn(), ui_bind_addr, handle)
234 });
235
236 info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind_addr} 🚀");
237 }
238
239 ConsensusEngine {
242 db,
243 federation_api: DynGlobalApi::from_endpoints(api_urls, &force_api_secrets.get_active())
244 .await?,
245 cfg: cfg.clone(),
246 connections,
247 ord_latency_sender,
248 ci_status_senders,
249 submission_receiver,
250 shutdown_receiver,
251 modules: module_registry,
252 task_group: task_group.clone(),
253 data_dir,
254 checkpoint_retention,
255 }
256 .run()
257 .await?;
258
259 api_handler
260 .stop()
261 .expect("Consensus api should still be running");
262
263 api_handler.stopped().await;
264
265 Ok(())
266}
267
268async fn start_consensus_api(
269 cfg: &ServerConfigLocal,
270 api: ConsensusApi,
271 force_api_secrets: ApiSecrets,
272 api_bind: SocketAddr,
273) -> ServerHandle {
274 let mut rpc_module = RpcModule::new(api.clone());
275
276 net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
277
278 for (id, _, module) in api.modules.iter_modules() {
279 net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
280 }
281
282 net::api::spawn(
283 "consensus",
284 api_bind,
285 rpc_module,
286 cfg.max_connections,
287 force_api_secrets,
288 )
289 .await
290}
291
292const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
293
294fn submit_module_ci_proposals(
295 task_group: &TaskGroup,
296 db: Database,
297 module_id: ModuleInstanceId,
298 kind: ModuleKind,
299 module: DynServerModule,
300 submission_sender: Sender<ConsensusItem>,
301) {
302 let mut interval = tokio::time::interval(if is_running_in_test_env() {
303 Duration::from_millis(100)
304 } else {
305 Duration::from_secs(1)
306 });
307
308 task_group.spawn(
309 format!("citem_proposals_{module_id}"),
310 move |task_handle| async move {
311 while !task_handle.is_shutting_down() {
312 let module_consensus_items = tokio::time::timeout(
313 CONSENSUS_PROPOSAL_TIMEOUT,
314 module.consensus_proposal(
315 &mut db
316 .begin_transaction_nc()
317 .await
318 .to_ref_with_prefix_module_id(module_id)
319 .0
320 .into_nc(),
321 module_id,
322 ),
323 )
324 .await;
325
326 match module_consensus_items {
327 Ok(items) => {
328 for item in items {
329 if submission_sender
330 .send(ConsensusItem::Module(item))
331 .await
332 .is_err()
333 {
334 warn!(
335 target: LOG_CONSENSUS,
336 module_id,
337 "Unable to submit module consensus item proposal via channel"
338 );
339 }
340 }
341 }
342 Err(..) => {
343 warn!(
344 target: LOG_CONSENSUS,
345 module_id,
346 %kind,
347 "Module failed to propose consensus items on time"
348 );
349 }
350 }
351
352 interval.tick().await;
353 }
354 },
355 );
356}
357
358async fn start_iroh_api(
359 secret_key: iroh::SecretKey,
360 bind_addr: SocketAddr,
361 consensus_api: ConsensusApi,
362 task_group: &TaskGroup,
363) {
364 let builder = Endpoint::builder()
365 .discovery_n0()
366 .discovery_dht()
367 .secret_key(secret_key)
368 .alpns(vec![FEDIMINT_API_ALPN.to_vec()]);
369
370 let builder = match bind_addr {
371 SocketAddr::V4(addr_v4) => builder.bind_addr_v4(addr_v4),
372 SocketAddr::V6(addr_v6) => builder.bind_addr_v6(addr_v6),
373 };
374
375 let endpoint = builder.bind().await.expect("Failed to bind iroh api");
376
377 info!(
378 target: LOG_NET_IROH,
379 %bind_addr,
380 node_id = %endpoint.node_id(),
381 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
382 "Iroh api server endpoint"
383 );
384
385 task_group.spawn_cancellable(
386 "iroh-api",
387 run_iroh_api(consensus_api, endpoint, task_group.clone()),
388 );
389}
390
391async fn run_iroh_api(consensus_api: ConsensusApi, endpoint: Endpoint, task_group: TaskGroup) {
392 let core_api = server_endpoints()
393 .into_iter()
394 .map(|endpoint| (endpoint.path.to_string(), endpoint))
395 .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
396
397 let module_api = consensus_api
398 .modules
399 .iter_modules()
400 .map(|(id, _, module)| {
401 let api_endpoints = module
402 .api_endpoints()
403 .into_iter()
404 .map(|endpoint| (endpoint.path.to_string(), endpoint))
405 .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
406
407 (id, api_endpoints)
408 })
409 .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
410
411 let consensus_api = Arc::new(consensus_api);
412 let core_api = Arc::new(core_api);
413 let module_api = Arc::new(module_api);
414
415 loop {
416 match endpoint.accept().await {
417 Some(incoming) => {
418 task_group.spawn_cancellable_silent(
419 "handle-iroh-connection",
420 handle_incoming(
421 consensus_api.clone(),
422 core_api.clone(),
423 module_api.clone(),
424 task_group.clone(),
425 incoming,
426 )
427 .then(|result| async {
428 if let Err(err) = result {
429 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
430 }
431 }),
432 );
433 }
434 None => return,
435 }
436 }
437}
438
439async fn handle_incoming(
440 consensus_api: Arc<ConsensusApi>,
441 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
442 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
443 task_group: TaskGroup,
444 incoming: Incoming,
445) -> anyhow::Result<()> {
446 let connection = incoming.accept()?.await?;
447
448 loop {
449 let (send_stream, recv_stream) = connection.accept_bi().await?;
450
451 task_group.spawn_cancellable_silent(
452 "handle-iroh-request",
453 handle_request(
454 consensus_api.clone(),
455 core_api.clone(),
456 module_api.clone(),
457 send_stream,
458 recv_stream,
459 )
460 .then(|result| async {
461 if let Err(err) = result {
462 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
463 }
464 }),
465 );
466 }
467}
468
469async fn handle_request(
470 consensus_api: Arc<ConsensusApi>,
471 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
472 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
473 mut send_stream: SendStream,
474 mut recv_stream: RecvStream,
475) -> anyhow::Result<()> {
476 let request = recv_stream.read_to_end(100_000).await?;
477
478 let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
479
480 let response = await_response(consensus_api, core_api, module_api, request).await;
481
482 let response = serde_json::to_vec(&response)?;
483
484 send_stream.write_all(&response).await?;
485
486 send_stream.finish()?;
487
488 Ok(())
489}
490
491async fn await_response(
492 consensus_api: Arc<ConsensusApi>,
493 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
494 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
495 request: IrohApiRequest,
496) -> Result<Value, ApiError> {
497 match request.method {
498 ApiMethod::Core(method) => {
499 let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
500
501 let (state, context) = consensus_api.context(&request.request, None).await;
502
503 (endpoint.handler)(state, context, request.request).await
504 }
505 ApiMethod::Module(module_id, method) => {
506 let endpoint = module_api
507 .get(&module_id)
508 .ok_or(ApiError::not_found(module_id.to_string()))?
509 .get(&method)
510 .ok_or(ApiError::not_found(method))?;
511
512 let (state, context) = consensus_api
513 .context(&request.request, Some(module_id))
514 .await;
515
516 (endpoint.handler)(state, context, request.request).await
517 }
518 }
519}