1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_core::NumPeers;
19use fedimint_core::config::P2PMessage;
20use fedimint_core::core::{ModuleInstanceId, ModuleKind};
21use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
22use fedimint_core::envs::is_running_in_test_env;
23use fedimint_core::epoch::ConsensusItem;
24use fedimint_core::module::registry::ModuleRegistry;
25use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
26use fedimint_core::net::peers::DynP2PConnections;
27use fedimint_core::task::TaskGroup;
28use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
29use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
30use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
31use fedimint_server_core::dashboard_ui::IDashboardApi;
32use fedimint_server_core::migration::apply_migrations_server_dbtx;
33use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
34use futures::FutureExt;
35use iroh::Endpoint;
36use iroh::endpoint::{Incoming, RecvStream, SendStream};
37use jsonrpsee::RpcModule;
38use jsonrpsee::server::ServerHandle;
39use serde_json::Value;
40use tokio::net::TcpListener;
41use tokio::sync::{Semaphore, watch};
42use tracing::{info, warn};
43
44use crate::config::{ServerConfig, ServerConfigLocal};
45use crate::connection_limits::ConnectionLimits;
46use crate::consensus::api::{ConsensusApi, server_endpoints};
47use crate::consensus::engine::ConsensusEngine;
48use crate::db::verify_server_db_integrity_dbtx;
49use crate::net::api::announcement::get_api_urls;
50use crate::net::api::{ApiSecrets, HasApiContext};
51use crate::net::p2p::{P2PConnectionTypeReceivers, P2PStatusReceivers};
52use crate::net::p2p_connector::build_iroh_endpoint;
53use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
54
55const TRANSACTION_BUFFER: usize = 1000;
57
58#[allow(clippy::too_many_arguments)]
59pub async fn run(
60 connections: DynP2PConnections<P2PMessage>,
61 p2p_status_receivers: P2PStatusReceivers,
62 p2p_connection_type_receivers: P2PConnectionTypeReceivers,
63 api_bind: SocketAddr,
64 iroh_dns: Option<SafeUrl>,
65 iroh_relays: Vec<SafeUrl>,
66 cfg: ServerConfig,
67 db: Database,
68 module_init_registry: ServerModuleInitRegistry,
69 task_group: &TaskGroup,
70 force_api_secrets: ApiSecrets,
71 data_dir: PathBuf,
72 code_version_str: String,
73 dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
74 ui_bind: SocketAddr,
75 dashboard_ui_router: DashboardUiRouter,
76 db_checkpoint_retention: u64,
77 iroh_api_limits: ConnectionLimits,
78) -> anyhow::Result<()> {
79 cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
80
81 let mut global_dbtx = db.begin_transaction().await;
82 apply_migrations_server_dbtx(
83 &mut global_dbtx.to_ref_nc(),
84 Arc::new(ServerDbMigrationContext),
85 "fedimint-server".to_string(),
86 get_global_database_migrations(),
87 )
88 .await?;
89
90 update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
91
92 if is_running_in_test_env() {
93 verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
94 }
95 global_dbtx.commit_tx_result().await?;
96
97 let mut modules = BTreeMap::new();
98
99 let global_api = DynGlobalApi::from_endpoints(
101 cfg.consensus
102 .api_endpoints()
103 .iter()
104 .map(|(&peer_id, url)| (peer_id, url.url.clone())),
105 &None,
106 true,
107 true,
108 )
109 .await?;
110
111 let server_bitcoin_rpc_monitor = ServerBitcoinRpcMonitor::new(
112 dyn_server_bitcoin_rpc,
113 if is_running_in_test_env() {
114 Duration::from_millis(100)
115 } else {
116 Duration::from_secs(60)
117 },
118 task_group,
119 );
120
121 for (module_id, module_cfg) in &cfg.consensus.modules {
122 match module_init_registry.get(&module_cfg.kind) {
123 Some(module_init) => {
124 info!(target: LOG_CORE, "Initialise module {module_id}...");
125
126 let mut dbtx = db.begin_transaction().await;
127 apply_migrations_dbtx(
128 &mut dbtx.to_ref_nc(),
129 Arc::new(ServerDbMigrationContext) as Arc<_>,
130 module_init.module_kind().to_string(),
131 module_init.get_database_migrations(),
132 Some(*module_id),
133 None,
134 )
135 .await?;
136
137 if let Some(used_db_prefixes) = module_init.used_db_prefixes()
138 && is_running_in_test_env()
139 {
140 verify_module_db_integrity_dbtx(
141 &mut dbtx.to_ref_nc(),
142 *module_id,
143 module_init.module_kind(),
144 &used_db_prefixes,
145 )
146 .await;
147 }
148 dbtx.commit_tx_result().await?;
149
150 let module = module_init
151 .init(
152 NumPeers::from(cfg.consensus.api_endpoints().len()),
153 cfg.get_module_config(*module_id)?,
154 db.with_prefix_module_id(*module_id).0,
155 task_group,
156 cfg.local.identity,
157 global_api.with_module(*module_id),
158 server_bitcoin_rpc_monitor.clone(),
159 )
160 .await?;
161
162 modules.insert(*module_id, (module_cfg.kind.clone(), module));
163 }
164 None => bail!("Detected configuration for unsupported module id: {module_id}"),
165 }
166 }
167
168 let module_registry = ModuleRegistry::from(modules);
169
170 let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
171
172 let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
173 let (shutdown_sender, shutdown_receiver) = watch::channel(None);
174 let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
175
176 let mut ci_status_senders = BTreeMap::new();
177 let mut ci_status_receivers = BTreeMap::new();
178
179 for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
180 let (ci_sender, ci_receiver) = watch::channel(None);
181
182 ci_status_senders.insert(peer, ci_sender);
183 ci_status_receivers.insert(peer, ci_receiver);
184 }
185
186 let consensus_api = ConsensusApi {
187 cfg: cfg.clone(),
188 cfg_dir: data_dir.clone(),
189 db: db.clone(),
190 modules: module_registry.clone(),
191 client_cfg: client_cfg.clone(),
192 submission_sender: submission_sender.clone(),
193 shutdown_sender,
194 shutdown_receiver: shutdown_receiver.clone(),
195 supported_api_versions: ServerConfig::supported_api_versions_summary(
196 &cfg.consensus.modules,
197 &module_init_registry,
198 ),
199 p2p_status_receivers,
200 p2p_connection_type_receivers,
201 ci_status_receivers,
202 ord_latency_receiver,
203 bitcoin_rpc_connection: server_bitcoin_rpc_monitor,
204 force_api_secret: force_api_secrets.get_active(),
205 code_version_str,
206 task_group: task_group.clone(),
207 };
208
209 info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
210
211 let api_handler = start_consensus_api(
212 &cfg.local,
213 consensus_api.clone(),
214 force_api_secrets.clone(),
215 api_bind,
216 )
217 .await;
218
219 if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone()
220 && let Err(e) = Box::pin(start_iroh_api(
221 iroh_api_sk,
222 api_bind,
223 iroh_dns,
224 iroh_relays,
225 consensus_api.clone(),
226 task_group,
227 iroh_api_limits,
228 ))
229 .await
230 {
231 api_handler.stop().expect("Just started");
233 api_handler.stopped().await;
234 return Err(e);
235 }
236
237 info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
238
239 for (module_id, kind, module) in module_registry.iter_modules() {
240 submit_module_ci_proposals(
241 task_group,
242 db.clone(),
243 module_id,
244 kind.clone(),
245 module.clone(),
246 submission_sender.clone(),
247 );
248 }
249
250 info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
251
252 let api_urls = get_api_urls(&db, &cfg.consensus).await;
253
254 let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
255
256 let ui_listener = TcpListener::bind(ui_bind)
257 .await
258 .expect("Failed to bind dashboard UI");
259
260 task_group.spawn("dashboard-ui", move |handle| async move {
261 axum::serve(ui_listener, ui_service)
262 .with_graceful_shutdown(handle.make_shutdown_rx())
263 .await
264 .expect("Failed to serve dashboard UI");
265 });
266
267 info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
268
269 ConsensusEngine {
272 db,
273 federation_api: DynGlobalApi::from_endpoints(
274 api_urls,
275 &force_api_secrets.get_active(),
276 true,
277 true,
278 )
279 .await?,
280 cfg: cfg.clone(),
281 connections,
282 ord_latency_sender,
283 ci_status_senders,
284 submission_receiver,
285 shutdown_receiver,
286 modules: module_registry,
287 task_group: task_group.clone(),
288 data_dir,
289 db_checkpoint_retention,
290 }
291 .run()
292 .await?;
293
294 api_handler
295 .stop()
296 .expect("Consensus api should still be running");
297
298 api_handler.stopped().await;
299
300 Ok(())
301}
302
303async fn start_consensus_api(
304 cfg: &ServerConfigLocal,
305 api: ConsensusApi,
306 force_api_secrets: ApiSecrets,
307 api_bind: SocketAddr,
308) -> ServerHandle {
309 let mut rpc_module = RpcModule::new(api.clone());
310
311 net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
312
313 for (id, _, module) in api.modules.iter_modules() {
314 net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
315 }
316
317 net::api::spawn(
318 "consensus",
319 api_bind,
320 rpc_module,
321 cfg.max_connections,
322 force_api_secrets,
323 )
324 .await
325}
326
327const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
328
329fn submit_module_ci_proposals(
330 task_group: &TaskGroup,
331 db: Database,
332 module_id: ModuleInstanceId,
333 kind: ModuleKind,
334 module: DynServerModule,
335 submission_sender: Sender<ConsensusItem>,
336) {
337 let mut interval = tokio::time::interval(if is_running_in_test_env() {
338 Duration::from_millis(100)
339 } else {
340 Duration::from_secs(1)
341 });
342
343 task_group.spawn(
344 format!("citem_proposals_{module_id}"),
345 move |task_handle| async move {
346 while !task_handle.is_shutting_down() {
347 let module_consensus_items = tokio::time::timeout(
348 CONSENSUS_PROPOSAL_TIMEOUT,
349 module.consensus_proposal(
350 &mut db
351 .begin_transaction_nc()
352 .await
353 .to_ref_with_prefix_module_id(module_id)
354 .0
355 .into_nc(),
356 module_id,
357 ),
358 )
359 .await;
360
361 match module_consensus_items {
362 Ok(items) => {
363 for item in items {
364 if submission_sender
365 .send(ConsensusItem::Module(item))
366 .await
367 .is_err()
368 {
369 warn!(
370 target: LOG_CONSENSUS,
371 module_id,
372 "Unable to submit module consensus item proposal via channel"
373 );
374 }
375 }
376 }
377 Err(..) => {
378 warn!(
379 target: LOG_CONSENSUS,
380 module_id,
381 %kind,
382 "Module failed to propose consensus items on time"
383 );
384 }
385 }
386
387 interval.tick().await;
388 }
389 },
390 );
391}
392
393async fn start_iroh_api(
394 secret_key: iroh::SecretKey,
395 api_bind: SocketAddr,
396 iroh_dns: Option<SafeUrl>,
397 iroh_relays: Vec<SafeUrl>,
398 consensus_api: ConsensusApi,
399 task_group: &TaskGroup,
400 iroh_api_limits: ConnectionLimits,
401) -> anyhow::Result<()> {
402 let endpoint = build_iroh_endpoint(
403 secret_key,
404 api_bind,
405 iroh_dns,
406 iroh_relays,
407 FEDIMINT_API_ALPN,
408 )
409 .await?;
410 task_group.spawn_cancellable(
411 "iroh-api",
412 run_iroh_api(consensus_api, endpoint, task_group.clone(), iroh_api_limits),
413 );
414
415 Ok(())
416}
417
418async fn run_iroh_api(
419 consensus_api: ConsensusApi,
420 endpoint: Endpoint,
421 task_group: TaskGroup,
422 iroh_api_limits: ConnectionLimits,
423) {
424 let core_api = server_endpoints()
425 .into_iter()
426 .map(|endpoint| (endpoint.path.to_string(), endpoint))
427 .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
428
429 let module_api = consensus_api
430 .modules
431 .iter_modules()
432 .map(|(id, _, module)| {
433 let api_endpoints = module
434 .api_endpoints()
435 .into_iter()
436 .map(|endpoint| (endpoint.path.to_string(), endpoint))
437 .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
438
439 (id, api_endpoints)
440 })
441 .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
442
443 let consensus_api = Arc::new(consensus_api);
444 let core_api = Arc::new(core_api);
445 let module_api = Arc::new(module_api);
446 let parallel_connections_limit = Arc::new(Semaphore::new(iroh_api_limits.max_connections));
447
448 loop {
449 match endpoint.accept().await {
450 Some(incoming) => {
451 if parallel_connections_limit.available_permits() == 0 {
452 warn!(
453 target: LOG_NET_API,
454 limit = iroh_api_limits.max_connections,
455 "Iroh API connection limit reached, blocking new connections"
456 );
457 }
458 let permit = parallel_connections_limit
459 .clone()
460 .acquire_owned()
461 .await
462 .expect("semaphore should not be closed");
463 task_group.spawn_cancellable_silent(
464 "handle-iroh-connection",
465 handle_incoming(
466 consensus_api.clone(),
467 core_api.clone(),
468 module_api.clone(),
469 task_group.clone(),
470 incoming,
471 permit,
472 iroh_api_limits.max_requests_per_connection,
473 )
474 .then(|result| async {
475 if let Err(err) = result {
476 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
477 }
478 }),
479 );
480 }
481 None => return,
482 }
483 }
484}
485
486async fn handle_incoming(
487 consensus_api: Arc<ConsensusApi>,
488 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
489 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
490 task_group: TaskGroup,
491 incoming: Incoming,
492 _connection_permit: tokio::sync::OwnedSemaphorePermit,
493 iroh_api_max_requests_per_connection: usize,
494) -> anyhow::Result<()> {
495 let connection = incoming.accept()?.await?;
496 let parallel_requests_limit = Arc::new(Semaphore::new(iroh_api_max_requests_per_connection));
497
498 loop {
499 let (send_stream, recv_stream) = connection.accept_bi().await?;
500
501 if parallel_requests_limit.available_permits() == 0 {
502 warn!(
503 target: LOG_NET_API,
504 limit = iroh_api_max_requests_per_connection,
505 "Iroh API request limit reached for connection, blocking new requests"
506 );
507 }
508 let permit = parallel_requests_limit
509 .clone()
510 .acquire_owned()
511 .await
512 .expect("semaphore should not be closed");
513 task_group.spawn_cancellable_silent(
514 "handle-iroh-request",
515 handle_request(
516 consensus_api.clone(),
517 core_api.clone(),
518 module_api.clone(),
519 send_stream,
520 recv_stream,
521 permit,
522 )
523 .then(|result| async {
524 if let Err(err) = result {
525 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
526 }
527 }),
528 );
529 }
530}
531
532async fn handle_request(
533 consensus_api: Arc<ConsensusApi>,
534 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
535 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
536 mut send_stream: SendStream,
537 mut recv_stream: RecvStream,
538 _request_permit: tokio::sync::OwnedSemaphorePermit,
539) -> anyhow::Result<()> {
540 let request = recv_stream.read_to_end(100_000).await?;
541
542 let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
543
544 let response = await_response(consensus_api, core_api, module_api, request).await;
545
546 let response = serde_json::to_vec(&response)?;
547
548 send_stream.write_all(&response).await?;
549
550 send_stream.finish()?;
551
552 Ok(())
553}
554
555async fn await_response(
556 consensus_api: Arc<ConsensusApi>,
557 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
558 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
559 request: IrohApiRequest,
560) -> Result<Value, ApiError> {
561 match request.method {
562 ApiMethod::Core(method) => {
563 let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
564
565 let (state, context) = consensus_api.context(&request.request, None).await;
566
567 (endpoint.handler)(state, context, request.request).await
568 }
569 ApiMethod::Module(module_id, method) => {
570 let endpoint = module_api
571 .get(&module_id)
572 .ok_or(ApiError::not_found(module_id.to_string()))?
573 .get(&method)
574 .ok_or(ApiError::not_found(method))?;
575
576 let (state, context) = consensus_api
577 .context(&request.request, Some(module_id))
578 .await;
579
580 (endpoint.handler)(state, context, request.request).await
581 }
582 }
583}