1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_core::NumPeers;
19use fedimint_core::config::P2PMessage;
20use fedimint_core::core::{ModuleInstanceId, ModuleKind};
21use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
22use fedimint_core::envs::is_running_in_test_env;
23use fedimint_core::epoch::ConsensusItem;
24use fedimint_core::module::registry::ModuleRegistry;
25use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
26use fedimint_core::net::peers::DynP2PConnections;
27use fedimint_core::task::TaskGroup;
28use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
29use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
30use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
31use fedimint_server_core::dashboard_ui::IDashboardApi;
32use fedimint_server_core::migration::apply_migrations_server_dbtx;
33use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
34use futures::FutureExt;
35use iroh::Endpoint;
36use iroh::endpoint::{Incoming, RecvStream, SendStream};
37use jsonrpsee::RpcModule;
38use jsonrpsee::server::ServerHandle;
39use serde_json::Value;
40use tokio::net::TcpListener;
41use tokio::sync::{Semaphore, watch};
42use tracing::{info, warn};
43
44use crate::config::{ServerConfig, ServerConfigLocal};
45use crate::connection_limits::ConnectionLimits;
46use crate::consensus::api::{ConsensusApi, server_endpoints};
47use crate::consensus::engine::ConsensusEngine;
48use crate::db::verify_server_db_integrity_dbtx;
49use crate::net::api::announcement::get_api_urls;
50use crate::net::api::{ApiSecrets, HasApiContext};
51use crate::net::p2p::P2PStatusReceivers;
52use crate::net::p2p_connector::build_iroh_endpoint;
53use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
54
55const TRANSACTION_BUFFER: usize = 1000;
57
58#[allow(clippy::too_many_arguments)]
59pub async fn run(
60 connections: DynP2PConnections<P2PMessage>,
61 p2p_status_receivers: P2PStatusReceivers,
62 api_bind: SocketAddr,
63 iroh_dns: Option<SafeUrl>,
64 iroh_relays: Vec<SafeUrl>,
65 cfg: ServerConfig,
66 db: Database,
67 module_init_registry: ServerModuleInitRegistry,
68 task_group: &TaskGroup,
69 force_api_secrets: ApiSecrets,
70 data_dir: PathBuf,
71 code_version_str: String,
72 dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
73 ui_bind: SocketAddr,
74 dashboard_ui_router: DashboardUiRouter,
75 db_checkpoint_retention: u64,
76 iroh_api_limits: ConnectionLimits,
77) -> anyhow::Result<()> {
78 cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
79
80 let mut global_dbtx = db.begin_transaction().await;
81 apply_migrations_server_dbtx(
82 &mut global_dbtx.to_ref_nc(),
83 Arc::new(ServerDbMigrationContext),
84 "fedimint-server".to_string(),
85 get_global_database_migrations(),
86 )
87 .await?;
88
89 update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
90
91 if is_running_in_test_env() {
92 verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
93 }
94 global_dbtx.commit_tx_result().await?;
95
96 let mut modules = BTreeMap::new();
97
98 let global_api = DynGlobalApi::from_endpoints(
100 cfg.consensus
101 .api_endpoints()
102 .iter()
103 .map(|(&peer_id, url)| (peer_id, url.url.clone())),
104 &None,
105 )
106 .await?;
107
108 let server_bitcoin_rpc_monitor = ServerBitcoinRpcMonitor::new(
109 dyn_server_bitcoin_rpc,
110 if is_running_in_test_env() {
111 Duration::from_millis(100)
112 } else {
113 Duration::from_secs(60)
114 },
115 task_group,
116 );
117
118 for (module_id, module_cfg) in &cfg.consensus.modules {
119 match module_init_registry.get(&module_cfg.kind) {
120 Some(module_init) => {
121 info!(target: LOG_CORE, "Initialise module {module_id}...");
122
123 let mut dbtx = db.begin_transaction().await;
124 apply_migrations_dbtx(
125 &mut dbtx.to_ref_nc(),
126 Arc::new(ServerDbMigrationContext) as Arc<_>,
127 module_init.module_kind().to_string(),
128 module_init.get_database_migrations(),
129 Some(*module_id),
130 None,
131 )
132 .await?;
133
134 if let Some(used_db_prefixes) = module_init.used_db_prefixes() {
135 if is_running_in_test_env() {
136 verify_module_db_integrity_dbtx(
137 &mut dbtx.to_ref_nc(),
138 *module_id,
139 module_init.module_kind(),
140 &used_db_prefixes,
141 )
142 .await;
143 }
144 }
145 dbtx.commit_tx_result().await?;
146
147 let module = module_init
148 .init(
149 NumPeers::from(cfg.consensus.api_endpoints().len()),
150 cfg.get_module_config(*module_id)?,
151 db.with_prefix_module_id(*module_id).0,
152 task_group,
153 cfg.local.identity,
154 global_api.with_module(*module_id),
155 server_bitcoin_rpc_monitor.clone(),
156 )
157 .await?;
158
159 modules.insert(*module_id, (module_cfg.kind.clone(), module));
160 }
161 None => bail!("Detected configuration for unsupported module id: {module_id}"),
162 }
163 }
164
165 let module_registry = ModuleRegistry::from(modules);
166
167 let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
168
169 let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
170 let (shutdown_sender, shutdown_receiver) = watch::channel(None);
171 let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
172
173 let mut ci_status_senders = BTreeMap::new();
174 let mut ci_status_receivers = BTreeMap::new();
175
176 for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
177 let (ci_sender, ci_receiver) = watch::channel(None);
178
179 ci_status_senders.insert(peer, ci_sender);
180 ci_status_receivers.insert(peer, ci_receiver);
181 }
182
183 let consensus_api = ConsensusApi {
184 cfg: cfg.clone(),
185 db: db.clone(),
186 modules: module_registry.clone(),
187 client_cfg: client_cfg.clone(),
188 submission_sender: submission_sender.clone(),
189 shutdown_sender,
190 shutdown_receiver: shutdown_receiver.clone(),
191 supported_api_versions: ServerConfig::supported_api_versions_summary(
192 &cfg.consensus.modules,
193 &module_init_registry,
194 ),
195 p2p_status_receivers,
196 ci_status_receivers,
197 ord_latency_receiver,
198 bitcoin_rpc_connection: server_bitcoin_rpc_monitor,
199 force_api_secret: force_api_secrets.get_active(),
200 code_version_str,
201 };
202
203 info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
204
205 let api_handler = if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone() {
206 Box::pin(start_iroh_api(
207 iroh_api_sk,
208 api_bind,
209 iroh_dns,
210 iroh_relays,
211 consensus_api.clone(),
212 task_group,
213 iroh_api_limits,
214 ))
215 .await?;
216
217 None
218 } else {
219 let handler = start_consensus_api(
220 &cfg.local,
221 consensus_api.clone(),
222 force_api_secrets.clone(),
223 api_bind,
224 )
225 .await;
226
227 Some(handler)
228 };
229
230 info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
231
232 for (module_id, kind, module) in module_registry.iter_modules() {
233 submit_module_ci_proposals(
234 task_group,
235 db.clone(),
236 module_id,
237 kind.clone(),
238 module.clone(),
239 submission_sender.clone(),
240 );
241 }
242
243 info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
244
245 let api_urls = get_api_urls(&db, &cfg.consensus).await;
246
247 let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
248
249 let ui_listener = TcpListener::bind(ui_bind)
250 .await
251 .expect("Failed to bind dashboard UI");
252
253 task_group.spawn("dashboard-ui", move |handle| async move {
254 axum::serve(ui_listener, ui_service)
255 .with_graceful_shutdown(handle.make_shutdown_rx())
256 .await
257 .expect("Failed to serve dashboard UI");
258 });
259
260 info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
261
262 ConsensusEngine {
265 db,
266 federation_api: DynGlobalApi::from_endpoints(api_urls, &force_api_secrets.get_active())
267 .await?,
268 cfg: cfg.clone(),
269 connections,
270 ord_latency_sender,
271 ci_status_senders,
272 submission_receiver,
273 shutdown_receiver,
274 modules: module_registry,
275 task_group: task_group.clone(),
276 data_dir,
277 db_checkpoint_retention,
278 }
279 .run()
280 .await?;
281
282 if let Some(api_handler) = api_handler {
283 api_handler
284 .stop()
285 .expect("Consensus api should still be running");
286
287 api_handler.stopped().await;
288 }
289
290 Ok(())
291}
292
293async fn start_consensus_api(
294 cfg: &ServerConfigLocal,
295 api: ConsensusApi,
296 force_api_secrets: ApiSecrets,
297 api_bind: SocketAddr,
298) -> ServerHandle {
299 let mut rpc_module = RpcModule::new(api.clone());
300
301 net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
302
303 for (id, _, module) in api.modules.iter_modules() {
304 net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
305 }
306
307 net::api::spawn(
308 "consensus",
309 api_bind,
310 rpc_module,
311 cfg.max_connections,
312 force_api_secrets,
313 )
314 .await
315}
316
317const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
318
319fn submit_module_ci_proposals(
320 task_group: &TaskGroup,
321 db: Database,
322 module_id: ModuleInstanceId,
323 kind: ModuleKind,
324 module: DynServerModule,
325 submission_sender: Sender<ConsensusItem>,
326) {
327 let mut interval = tokio::time::interval(if is_running_in_test_env() {
328 Duration::from_millis(100)
329 } else {
330 Duration::from_secs(1)
331 });
332
333 task_group.spawn(
334 format!("citem_proposals_{module_id}"),
335 move |task_handle| async move {
336 while !task_handle.is_shutting_down() {
337 let module_consensus_items = tokio::time::timeout(
338 CONSENSUS_PROPOSAL_TIMEOUT,
339 module.consensus_proposal(
340 &mut db
341 .begin_transaction_nc()
342 .await
343 .to_ref_with_prefix_module_id(module_id)
344 .0
345 .into_nc(),
346 module_id,
347 ),
348 )
349 .await;
350
351 match module_consensus_items {
352 Ok(items) => {
353 for item in items {
354 if submission_sender
355 .send(ConsensusItem::Module(item))
356 .await
357 .is_err()
358 {
359 warn!(
360 target: LOG_CONSENSUS,
361 module_id,
362 "Unable to submit module consensus item proposal via channel"
363 );
364 }
365 }
366 }
367 Err(..) => {
368 warn!(
369 target: LOG_CONSENSUS,
370 module_id,
371 %kind,
372 "Module failed to propose consensus items on time"
373 );
374 }
375 }
376
377 interval.tick().await;
378 }
379 },
380 );
381}
382
383async fn start_iroh_api(
384 secret_key: iroh::SecretKey,
385 api_bind: SocketAddr,
386 iroh_dns: Option<SafeUrl>,
387 iroh_relays: Vec<SafeUrl>,
388 consensus_api: ConsensusApi,
389 task_group: &TaskGroup,
390 iroh_api_limits: ConnectionLimits,
391) -> anyhow::Result<()> {
392 let endpoint = build_iroh_endpoint(
393 secret_key,
394 api_bind,
395 iroh_dns,
396 iroh_relays,
397 FEDIMINT_API_ALPN,
398 )
399 .await?;
400 task_group.spawn_cancellable(
401 "iroh-api",
402 run_iroh_api(consensus_api, endpoint, task_group.clone(), iroh_api_limits),
403 );
404
405 Ok(())
406}
407
408async fn run_iroh_api(
409 consensus_api: ConsensusApi,
410 endpoint: Endpoint,
411 task_group: TaskGroup,
412 iroh_api_limits: ConnectionLimits,
413) {
414 let core_api = server_endpoints()
415 .into_iter()
416 .map(|endpoint| (endpoint.path.to_string(), endpoint))
417 .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
418
419 let module_api = consensus_api
420 .modules
421 .iter_modules()
422 .map(|(id, _, module)| {
423 let api_endpoints = module
424 .api_endpoints()
425 .into_iter()
426 .map(|endpoint| (endpoint.path.to_string(), endpoint))
427 .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
428
429 (id, api_endpoints)
430 })
431 .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
432
433 let consensus_api = Arc::new(consensus_api);
434 let core_api = Arc::new(core_api);
435 let module_api = Arc::new(module_api);
436 let parallel_connections_limit = Arc::new(Semaphore::new(iroh_api_limits.max_connections));
437
438 loop {
439 match endpoint.accept().await {
440 Some(incoming) => {
441 if parallel_connections_limit.available_permits() == 0 {
442 warn!(
443 target: LOG_NET_API,
444 limit = iroh_api_limits.max_connections,
445 "Iroh API connection limit reached, blocking new connections"
446 );
447 }
448 let permit = parallel_connections_limit
449 .clone()
450 .acquire_owned()
451 .await
452 .expect("semaphore should not be closed");
453 task_group.spawn_cancellable_silent(
454 "handle-iroh-connection",
455 handle_incoming(
456 consensus_api.clone(),
457 core_api.clone(),
458 module_api.clone(),
459 task_group.clone(),
460 incoming,
461 permit,
462 iroh_api_limits.max_requests_per_connection,
463 )
464 .then(|result| async {
465 if let Err(err) = result {
466 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
467 }
468 }),
469 );
470 }
471 None => return,
472 }
473 }
474}
475
476async fn handle_incoming(
477 consensus_api: Arc<ConsensusApi>,
478 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
479 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
480 task_group: TaskGroup,
481 incoming: Incoming,
482 _connection_permit: tokio::sync::OwnedSemaphorePermit,
483 iroh_api_max_requests_per_connection: usize,
484) -> anyhow::Result<()> {
485 let connection = incoming.accept()?.await?;
486 let parallel_requests_limit = Arc::new(Semaphore::new(iroh_api_max_requests_per_connection));
487
488 loop {
489 let (send_stream, recv_stream) = connection.accept_bi().await?;
490
491 if parallel_requests_limit.available_permits() == 0 {
492 warn!(
493 target: LOG_NET_API,
494 limit = iroh_api_max_requests_per_connection,
495 "Iroh API request limit reached for connection, blocking new requests"
496 );
497 }
498 let permit = parallel_requests_limit
499 .clone()
500 .acquire_owned()
501 .await
502 .expect("semaphore should not be closed");
503 task_group.spawn_cancellable_silent(
504 "handle-iroh-request",
505 handle_request(
506 consensus_api.clone(),
507 core_api.clone(),
508 module_api.clone(),
509 send_stream,
510 recv_stream,
511 permit,
512 )
513 .then(|result| async {
514 if let Err(err) = result {
515 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
516 }
517 }),
518 );
519 }
520}
521
522async fn handle_request(
523 consensus_api: Arc<ConsensusApi>,
524 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
525 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
526 mut send_stream: SendStream,
527 mut recv_stream: RecvStream,
528 _request_permit: tokio::sync::OwnedSemaphorePermit,
529) -> anyhow::Result<()> {
530 let request = recv_stream.read_to_end(100_000).await?;
531
532 let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
533
534 let response = await_response(consensus_api, core_api, module_api, request).await;
535
536 let response = serde_json::to_vec(&response)?;
537
538 send_stream.write_all(&response).await?;
539
540 send_stream.finish()?;
541
542 Ok(())
543}
544
545async fn await_response(
546 consensus_api: Arc<ConsensusApi>,
547 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
548 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
549 request: IrohApiRequest,
550) -> Result<Value, ApiError> {
551 match request.method {
552 ApiMethod::Core(method) => {
553 let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
554
555 let (state, context) = consensus_api.context(&request.request, None).await;
556
557 (endpoint.handler)(state, context, request.request).await
558 }
559 ApiMethod::Module(module_id, method) => {
560 let endpoint = module_api
561 .get(&module_id)
562 .ok_or(ApiError::not_found(module_id.to_string()))?
563 .get(&method)
564 .ok_or(ApiError::not_found(method))?;
565
566 let (state, context) = consensus_api
567 .context(&request.request, Some(module_id))
568 .await;
569
570 (endpoint.handler)(state, context, request.request).await
571 }
572 }
573}