1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_connectors::ConnectorRegistry;
19use fedimint_core::NumPeers;
20use fedimint_core::config::P2PMessage;
21use fedimint_core::core::{ModuleInstanceId, ModuleKind};
22use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
23use fedimint_core::envs::is_running_in_test_env;
24use fedimint_core::epoch::ConsensusItem;
25use fedimint_core::module::registry::ModuleRegistry;
26use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
27use fedimint_core::net::iroh::build_iroh_endpoint;
28use fedimint_core::net::peers::DynP2PConnections;
29use fedimint_core::task::{TaskGroup, sleep};
30use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
31use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
32use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
33use fedimint_server_core::dashboard_ui::IDashboardApi;
34use fedimint_server_core::migration::apply_migrations_server_dbtx;
35use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
36use futures::FutureExt;
37use iroh::Endpoint;
38use iroh::endpoint::{Incoming, RecvStream, SendStream};
39use jsonrpsee::RpcModule;
40use jsonrpsee::server::ServerHandle;
41use serde_json::Value;
42use tokio::net::TcpListener;
43use tokio::sync::{Semaphore, watch};
44use tracing::{info, warn};
45
46use crate::config::{ServerConfig, ServerConfigLocal};
47use crate::connection_limits::ConnectionLimits;
48use crate::consensus::api::{ConsensusApi, server_endpoints};
49use crate::consensus::engine::ConsensusEngine;
50use crate::db::verify_server_db_integrity_dbtx;
51use crate::metrics::{
52 IROH_API_CONNECTION_DURATION_SECONDS, IROH_API_CONNECTIONS_ACTIVE,
53 IROH_API_REQUEST_DURATION_SECONDS,
54};
55use crate::net::api::announcement::get_api_urls;
56use crate::net::api::{ApiSecrets, HasApiContext};
57use crate::net::p2p::P2PStatusReceivers;
58use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
59
60const TRANSACTION_BUFFER: usize = 1000;
62
63#[allow(clippy::too_many_arguments)]
64pub async fn run(
65 connectors: ConnectorRegistry,
66 connections: DynP2PConnections<P2PMessage>,
67 p2p_status_receivers: P2PStatusReceivers,
68 api_bind: SocketAddr,
69 iroh_dns: Option<SafeUrl>,
70 iroh_relays: Vec<SafeUrl>,
71 cfg: ServerConfig,
72 db: Database,
73 module_init_registry: ServerModuleInitRegistry,
74 task_group: &TaskGroup,
75 force_api_secrets: ApiSecrets,
76 data_dir: PathBuf,
77 code_version_str: String,
78 dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
79 ui_bind: SocketAddr,
80 dashboard_ui_router: DashboardUiRouter,
81 db_checkpoint_retention: u64,
82 iroh_api_limits: ConnectionLimits,
83) -> anyhow::Result<()> {
84 cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
85
86 let mut global_dbtx = db.begin_transaction().await;
87 apply_migrations_server_dbtx(
88 &mut global_dbtx.to_ref_nc(),
89 Arc::new(ServerDbMigrationContext),
90 "fedimint-server".to_string(),
91 get_global_database_migrations(),
92 )
93 .await?;
94
95 update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
96
97 if is_running_in_test_env() {
98 verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
99 }
100 global_dbtx.commit_tx_result().await?;
101
102 let mut modules = BTreeMap::new();
103
104 let global_api = DynGlobalApi::new(
106 connectors.clone(),
107 cfg.consensus
108 .api_endpoints()
109 .iter()
110 .map(|(&peer_id, url)| (peer_id, url.url.clone()))
111 .collect(),
112 None,
113 )?;
114
115 let bitcoin_rpc_connection = ServerBitcoinRpcMonitor::new(
116 dyn_server_bitcoin_rpc,
117 if is_running_in_test_env() {
118 Duration::from_millis(100)
119 } else {
120 Duration::from_mins(1)
121 },
122 task_group,
123 );
124
125 for (module_id, module_cfg) in &cfg.consensus.modules {
126 match module_init_registry.get(&module_cfg.kind) {
127 Some(module_init) => {
128 info!(target: LOG_CORE, "Initialise module {module_id}...");
129
130 let mut dbtx = db.begin_transaction().await;
131 apply_migrations_dbtx(
132 &mut dbtx.to_ref_nc(),
133 Arc::new(ServerDbMigrationContext) as Arc<_>,
134 module_init.module_kind().to_string(),
135 module_init.get_database_migrations(),
136 Some(*module_id),
137 None,
138 )
139 .await?;
140
141 if let Some(used_db_prefixes) = module_init.used_db_prefixes()
142 && is_running_in_test_env()
143 {
144 verify_module_db_integrity_dbtx(
145 &mut dbtx.to_ref_nc(),
146 *module_id,
147 module_init.module_kind(),
148 &used_db_prefixes,
149 )
150 .await;
151 }
152 dbtx.commit_tx_result().await?;
153
154 let module = module_init
155 .init(
156 NumPeers::from(cfg.consensus.api_endpoints().len()),
157 cfg.get_module_config(*module_id)?,
158 db.with_prefix_module_id(*module_id).0,
159 task_group,
160 cfg.local.identity,
161 global_api.with_module(*module_id),
162 bitcoin_rpc_connection.clone(),
163 )
164 .await?;
165
166 modules.insert(*module_id, (module_cfg.kind.clone(), module));
167 }
168 None => bail!("Detected configuration for unsupported module id: {module_id}"),
169 }
170 }
171
172 let module_registry = ModuleRegistry::from(modules);
173
174 let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
175
176 let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
177 let (shutdown_sender, shutdown_receiver) = watch::channel(None);
178 let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
179
180 let mut ci_status_senders = BTreeMap::new();
181 let mut ci_status_receivers = BTreeMap::new();
182
183 for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
184 let (ci_sender, ci_receiver) = watch::channel(None);
185
186 ci_status_senders.insert(peer, ci_sender);
187 ci_status_receivers.insert(peer, ci_receiver);
188 }
189
190 let consensus_api = ConsensusApi {
191 cfg: cfg.clone(),
192 cfg_dir: data_dir.clone(),
193 db: db.clone(),
194 modules: module_registry.clone(),
195 client_cfg: client_cfg.clone(),
196 submission_sender: submission_sender.clone(),
197 shutdown_sender,
198 shutdown_receiver: shutdown_receiver.clone(),
199 supported_api_versions: ServerConfig::supported_api_versions_summary(
200 &cfg.consensus.modules,
201 &module_init_registry,
202 ),
203 p2p_status_receivers,
204 ci_status_receivers,
205 ord_latency_receiver,
206 bitcoin_rpc_connection: bitcoin_rpc_connection.clone(),
207 force_api_secret: force_api_secrets.get_active(),
208 code_version_str,
209 task_group: task_group.clone(),
210 };
211
212 info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
213
214 let api_handler = start_consensus_api(
215 &cfg.local,
216 consensus_api.clone(),
217 force_api_secrets.clone(),
218 api_bind,
219 )
220 .await;
221
222 if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone()
223 && let Err(e) = Box::pin(start_iroh_api(
224 iroh_api_sk,
225 api_bind,
226 iroh_dns,
227 iroh_relays,
228 consensus_api.clone(),
229 task_group,
230 iroh_api_limits,
231 ))
232 .await
233 {
234 api_handler.stop().expect("Just started");
236 api_handler.stopped().await;
237 return Err(e);
238 }
239
240 info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
241
242 for (module_id, kind, module) in module_registry.iter_modules() {
243 submit_module_ci_proposals(
244 task_group,
245 db.clone(),
246 module_id,
247 kind.clone(),
248 module.clone(),
249 submission_sender.clone(),
250 );
251 }
252
253 let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
254
255 let ui_listener = TcpListener::bind(ui_bind)
256 .await
257 .expect("Failed to bind dashboard UI");
258
259 task_group.spawn("dashboard-ui", move |handle| async move {
260 axum::serve(ui_listener, ui_service)
261 .with_graceful_shutdown(handle.make_shutdown_rx())
262 .await
263 .expect("Failed to serve dashboard UI");
264 });
265
266 info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
267
268 loop {
269 match bitcoin_rpc_connection.status() {
270 Some(status) => {
271 if let Some(progress) = status.sync_progress {
272 if progress >= 0.999 {
273 break;
274 }
275
276 info!(target: LOG_CONSENSUS, "Waiting for bitcoin backend to sync... {progress:.1}%");
277 } else {
278 break;
279 }
280 }
281 None => {
282 info!(target: LOG_CONSENSUS, "Waiting to connect to bitcoin backend...");
283 }
284 }
285
286 sleep(Duration::from_secs(1)).await;
287 }
288
289 info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
290
291 let api_urls = get_api_urls(&db, &cfg.consensus).await;
292
293 ConsensusEngine {
296 db,
297 federation_api: DynGlobalApi::new(
298 connectors,
299 api_urls,
300 force_api_secrets.get_active().as_deref(),
301 )?,
302 cfg: cfg.clone(),
303 connections,
304 ord_latency_sender,
305 ci_status_senders,
306 submission_receiver,
307 shutdown_receiver,
308 modules: module_registry,
309 task_group: task_group.clone(),
310 data_dir,
311 db_checkpoint_retention,
312 }
313 .run()
314 .await?;
315
316 api_handler
317 .stop()
318 .expect("Consensus api should still be running");
319
320 api_handler.stopped().await;
321
322 Ok(())
323}
324
325async fn start_consensus_api(
326 cfg: &ServerConfigLocal,
327 api: ConsensusApi,
328 force_api_secrets: ApiSecrets,
329 api_bind: SocketAddr,
330) -> ServerHandle {
331 let mut rpc_module = RpcModule::new(api.clone());
332
333 net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
334
335 for (id, _, module) in api.modules.iter_modules() {
336 net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
337 }
338
339 net::api::spawn(
340 "consensus",
341 api_bind,
342 rpc_module,
343 cfg.max_connections,
344 force_api_secrets,
345 )
346 .await
347}
348
349const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
350
351fn submit_module_ci_proposals(
352 task_group: &TaskGroup,
353 db: Database,
354 module_id: ModuleInstanceId,
355 kind: ModuleKind,
356 module: DynServerModule,
357 submission_sender: Sender<ConsensusItem>,
358) {
359 let mut interval = tokio::time::interval(if is_running_in_test_env() {
360 Duration::from_millis(100)
361 } else {
362 Duration::from_secs(1)
363 });
364
365 task_group.spawn(
366 format!("citem_proposals_{module_id}"),
367 move |task_handle| async move {
368 while !task_handle.is_shutting_down() {
369 let module_consensus_items = tokio::time::timeout(
370 CONSENSUS_PROPOSAL_TIMEOUT,
371 module.consensus_proposal(
372 &mut db
373 .begin_transaction_nc()
374 .await
375 .to_ref_with_prefix_module_id(module_id)
376 .0
377 .into_nc(),
378 module_id,
379 ),
380 )
381 .await;
382
383 match module_consensus_items {
384 Ok(items) => {
385 for item in items {
386 if submission_sender
387 .send(ConsensusItem::Module(item))
388 .await
389 .is_err()
390 {
391 warn!(
392 target: LOG_CONSENSUS,
393 module_id,
394 "Unable to submit module consensus item proposal via channel"
395 );
396 }
397 }
398 }
399 Err(..) => {
400 warn!(
401 target: LOG_CONSENSUS,
402 module_id,
403 %kind,
404 "Module failed to propose consensus items on time"
405 );
406 }
407 }
408
409 interval.tick().await;
410 }
411 },
412 );
413}
414
415async fn start_iroh_api(
416 secret_key: iroh::SecretKey,
417 api_bind: SocketAddr,
418 iroh_dns: Option<SafeUrl>,
419 iroh_relays: Vec<SafeUrl>,
420 consensus_api: ConsensusApi,
421 task_group: &TaskGroup,
422 iroh_api_limits: ConnectionLimits,
423) -> anyhow::Result<()> {
424 let endpoint = build_iroh_endpoint(
425 secret_key,
426 api_bind,
427 iroh_dns,
428 iroh_relays,
429 FEDIMINT_API_ALPN,
430 )
431 .await?;
432 task_group.spawn_cancellable(
433 "iroh-api",
434 run_iroh_api(consensus_api, endpoint, task_group.clone(), iroh_api_limits),
435 );
436
437 Ok(())
438}
439
440async fn run_iroh_api(
441 consensus_api: ConsensusApi,
442 endpoint: Endpoint,
443 task_group: TaskGroup,
444 iroh_api_limits: ConnectionLimits,
445) {
446 let core_api = server_endpoints()
447 .into_iter()
448 .map(|endpoint| (endpoint.path.to_string(), endpoint))
449 .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
450
451 let module_api = consensus_api
452 .modules
453 .iter_modules()
454 .map(|(id, _, module)| {
455 let api_endpoints = module
456 .api_endpoints()
457 .into_iter()
458 .map(|endpoint| (endpoint.path.to_string(), endpoint))
459 .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
460
461 (id, api_endpoints)
462 })
463 .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
464
465 let consensus_api = Arc::new(consensus_api);
466 let core_api = Arc::new(core_api);
467 let module_api = Arc::new(module_api);
468 let parallel_connections_limit = Arc::new(Semaphore::new(iroh_api_limits.max_connections));
469
470 loop {
471 match endpoint.accept().await {
472 Some(incoming) => {
473 if parallel_connections_limit.available_permits() == 0 {
474 warn!(
475 target: LOG_NET_API,
476 limit = iroh_api_limits.max_connections,
477 "Iroh API connection limit reached, blocking new connections"
478 );
479 }
480 let permit = parallel_connections_limit
481 .clone()
482 .acquire_owned()
483 .await
484 .expect("semaphore should not be closed");
485 task_group.spawn_cancellable_silent(
486 "handle-iroh-connection",
487 handle_incoming(
488 consensus_api.clone(),
489 core_api.clone(),
490 module_api.clone(),
491 task_group.clone(),
492 incoming,
493 permit,
494 iroh_api_limits.max_requests_per_connection,
495 )
496 .then(|result| async {
497 if let Err(err) = result {
498 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
499 }
500 }),
501 );
502 }
503 None => return,
504 }
505 }
506}
507
508async fn handle_incoming(
509 consensus_api: Arc<ConsensusApi>,
510 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
511 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
512 task_group: TaskGroup,
513 incoming: Incoming,
514 _connection_permit: tokio::sync::OwnedSemaphorePermit,
515 iroh_api_max_requests_per_connection: usize,
516) -> anyhow::Result<()> {
517 let connection = incoming.accept()?.await?;
518 let parallel_requests_limit = Arc::new(Semaphore::new(iroh_api_max_requests_per_connection));
519
520 IROH_API_CONNECTIONS_ACTIVE.inc();
521 let connection_timer = IROH_API_CONNECTION_DURATION_SECONDS.start_timer();
522 scopeguard::defer! {
523 IROH_API_CONNECTIONS_ACTIVE.dec();
524 connection_timer.observe_duration();
525 }
526
527 loop {
528 let (send_stream, recv_stream) = connection.accept_bi().await?;
529
530 if parallel_requests_limit.available_permits() == 0 {
531 warn!(
532 target: LOG_NET_API,
533 limit = iroh_api_max_requests_per_connection,
534 "Iroh API request limit reached for connection, blocking new requests"
535 );
536 }
537 let permit = parallel_requests_limit
538 .clone()
539 .acquire_owned()
540 .await
541 .expect("semaphore should not be closed");
542 task_group.spawn_cancellable_silent(
543 "handle-iroh-request",
544 handle_request(
545 consensus_api.clone(),
546 core_api.clone(),
547 module_api.clone(),
548 send_stream,
549 recv_stream,
550 permit,
551 )
552 .then(|result| async {
553 if let Err(err) = result {
554 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
555 }
556 }),
557 );
558 }
559}
560
561async fn handle_request(
562 consensus_api: Arc<ConsensusApi>,
563 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
564 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
565 mut send_stream: SendStream,
566 mut recv_stream: RecvStream,
567 _request_permit: tokio::sync::OwnedSemaphorePermit,
568) -> anyhow::Result<()> {
569 let request = recv_stream.read_to_end(100_000).await?;
570
571 let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
572
573 let method = request.method.to_string();
574 let timer = IROH_API_REQUEST_DURATION_SECONDS
575 .with_label_values(&[&method])
576 .start_timer();
577
578 let response = await_response(consensus_api, core_api, module_api, request).await;
579
580 timer.observe_duration();
581
582 let response = serde_json::to_vec(&response)?;
583
584 send_stream.write_all(&response).await?;
585
586 send_stream.finish()?;
587
588 Ok(())
589}
590
591async fn await_response(
592 consensus_api: Arc<ConsensusApi>,
593 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
594 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
595 request: IrohApiRequest,
596) -> Result<Value, ApiError> {
597 match request.method {
598 ApiMethod::Core(method) => {
599 let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
600
601 let (state, context) = consensus_api.context(&request.request, None).await;
602
603 (endpoint.handler)(state, context, request.request).await
604 }
605 ApiMethod::Module(module_id, method) => {
606 let endpoint = module_api
607 .get(&module_id)
608 .ok_or(ApiError::not_found(module_id.to_string()))?
609 .get(&method)
610 .ok_or(ApiError::not_found(method))?;
611
612 let (state, context) = consensus_api
613 .context(&request.request, Some(module_id))
614 .await;
615
616 (endpoint.handler)(state, context, request.request).await
617 }
618 }
619}