1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_connectors::ConnectorRegistry;
19use fedimint_core::NumPeers;
20use fedimint_core::config::P2PMessage;
21use fedimint_core::core::{ModuleInstanceId, ModuleKind};
22use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
23use fedimint_core::envs::is_running_in_test_env;
24use fedimint_core::epoch::ConsensusItem;
25use fedimint_core::module::registry::ModuleRegistry;
26use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
27use fedimint_core::net::iroh::build_iroh_endpoint;
28use fedimint_core::net::peers::DynP2PConnections;
29use fedimint_core::task::{TaskGroup, sleep};
30use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
31use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
32use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
33use fedimint_server_core::dashboard_ui::IDashboardApi;
34use fedimint_server_core::migration::apply_migrations_server_dbtx;
35use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
36use futures::FutureExt;
37use iroh::Endpoint;
38use iroh::endpoint::{Incoming, RecvStream, SendStream};
39use jsonrpsee::RpcModule;
40use jsonrpsee::server::ServerHandle;
41use serde_json::Value;
42use tokio::net::TcpListener;
43use tokio::sync::{Semaphore, watch};
44use tracing::{info, warn};
45
46use crate::config::{ServerConfig, ServerConfigLocal};
47use crate::connection_limits::ConnectionLimits;
48use crate::consensus::api::{ConsensusApi, server_endpoints};
49use crate::consensus::engine::ConsensusEngine;
50use crate::db::verify_server_db_integrity_dbtx;
51use crate::metrics::{
52 IROH_API_CONNECTION_DURATION_SECONDS, IROH_API_CONNECTIONS_ACTIVE,
53 IROH_API_REQUEST_DURATION_SECONDS,
54};
55use crate::net::api::announcement::get_api_urls;
56use crate::net::api::{ApiSecrets, HasApiContext};
57use crate::net::p2p::P2PStatusReceivers;
58use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
59
60const TRANSACTION_BUFFER: usize = 1000;
62
63#[allow(clippy::too_many_arguments)]
64pub async fn run(
65 connectors: ConnectorRegistry,
66 connections: DynP2PConnections<P2PMessage>,
67 p2p_status_receivers: P2PStatusReceivers,
68 api_bind: SocketAddr,
69 iroh_dns: Option<SafeUrl>,
70 iroh_relays: Vec<SafeUrl>,
71 cfg: ServerConfig,
72 db: Database,
73 module_init_registry: ServerModuleInitRegistry,
74 task_group: &TaskGroup,
75 force_api_secrets: ApiSecrets,
76 data_dir: PathBuf,
77 code_version_str: String,
78 code_version_hash: String,
79 dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
80 ui_bind: SocketAddr,
81 dashboard_ui_router: DashboardUiRouter,
82 db_checkpoint_retention: u64,
83 iroh_api_limits: ConnectionLimits,
84) -> anyhow::Result<()> {
85 cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
86
87 let mut global_dbtx = db.begin_transaction().await;
88 apply_migrations_server_dbtx(
89 &mut global_dbtx.to_ref_nc(),
90 Arc::new(ServerDbMigrationContext),
91 "fedimint-server".to_string(),
92 get_global_database_migrations(),
93 )
94 .await?;
95
96 update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
97
98 if is_running_in_test_env() {
99 verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
100 }
101 global_dbtx.commit_tx_result().await?;
102
103 let mut modules = BTreeMap::new();
104
105 let global_api = DynGlobalApi::new(
107 connectors.clone(),
108 cfg.consensus
109 .api_endpoints()
110 .iter()
111 .map(|(&peer_id, url)| (peer_id, url.url.clone()))
112 .collect(),
113 None,
114 )?;
115
116 let bitcoin_rpc_connection = ServerBitcoinRpcMonitor::new(
117 dyn_server_bitcoin_rpc,
118 if is_running_in_test_env() {
119 Duration::from_millis(100)
120 } else {
121 Duration::from_mins(1)
122 },
123 task_group,
124 );
125
126 for (module_id, module_cfg) in &cfg.consensus.modules {
127 match module_init_registry.get(&module_cfg.kind) {
128 Some(module_init) => {
129 info!(target: LOG_CORE, "Initialise module {module_id}...");
130
131 let mut dbtx = db.begin_transaction().await;
132 apply_migrations_dbtx(
133 &mut dbtx.to_ref_nc(),
134 Arc::new(ServerDbMigrationContext) as Arc<_>,
135 module_init.module_kind().to_string(),
136 module_init.get_database_migrations(),
137 Some(*module_id),
138 None,
139 )
140 .await?;
141
142 if let Some(used_db_prefixes) = module_init.used_db_prefixes()
143 && is_running_in_test_env()
144 {
145 verify_module_db_integrity_dbtx(
146 &mut dbtx.to_ref_nc(),
147 *module_id,
148 module_init.module_kind(),
149 &used_db_prefixes,
150 )
151 .await;
152 }
153 dbtx.commit_tx_result().await?;
154
155 let module = module_init
156 .init(
157 NumPeers::from(cfg.consensus.api_endpoints().len()),
158 cfg.get_module_config(*module_id)?,
159 db.with_prefix_module_id(*module_id).0,
160 task_group,
161 cfg.local.identity,
162 global_api.with_module(*module_id),
163 bitcoin_rpc_connection.clone(),
164 )
165 .await?;
166
167 modules.insert(*module_id, (module_cfg.kind.clone(), module));
168 }
169 None => bail!("Detected configuration for unsupported module id: {module_id}"),
170 }
171 }
172
173 let module_registry = ModuleRegistry::from(modules);
174
175 let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
176
177 let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
178 let (shutdown_sender, shutdown_receiver) = watch::channel(None);
179 let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
180
181 let mut ci_status_senders = BTreeMap::new();
182 let mut ci_status_receivers = BTreeMap::new();
183
184 for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
185 let (ci_sender, ci_receiver) = watch::channel(None);
186
187 ci_status_senders.insert(peer, ci_sender);
188 ci_status_receivers.insert(peer, ci_receiver);
189 }
190
191 let consensus_api = ConsensusApi {
192 cfg: cfg.clone(),
193 cfg_dir: data_dir.clone(),
194 db: db.clone(),
195 modules: module_registry.clone(),
196 client_cfg: client_cfg.clone(),
197 submission_sender: submission_sender.clone(),
198 shutdown_sender,
199 shutdown_receiver: shutdown_receiver.clone(),
200 supported_api_versions: ServerConfig::supported_api_versions_summary(
201 &cfg.consensus.modules,
202 &module_init_registry,
203 ),
204 p2p_status_receivers,
205 ci_status_receivers,
206 ord_latency_receiver,
207 bitcoin_rpc_connection: bitcoin_rpc_connection.clone(),
208 force_api_secret: force_api_secrets.get_active(),
209 code_version_str,
210 code_version_hash,
211 task_group: task_group.clone(),
212 };
213
214 info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
215
216 let api_handler = start_consensus_api(
217 &cfg.local,
218 consensus_api.clone(),
219 force_api_secrets.clone(),
220 api_bind,
221 )
222 .await;
223
224 if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone()
225 && let Err(e) = Box::pin(start_iroh_api(
226 iroh_api_sk,
227 api_bind,
228 iroh_dns,
229 iroh_relays,
230 consensus_api.clone(),
231 task_group,
232 iroh_api_limits,
233 ))
234 .await
235 {
236 api_handler.stop().expect("Just started");
238 api_handler.stopped().await;
239 return Err(e);
240 }
241
242 info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
243
244 for (module_id, kind, module) in module_registry.iter_modules() {
245 submit_module_ci_proposals(
246 task_group,
247 db.clone(),
248 module_id,
249 kind.clone(),
250 module.clone(),
251 submission_sender.clone(),
252 );
253 }
254
255 let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
256
257 let ui_listener = TcpListener::bind(ui_bind)
258 .await
259 .expect("Failed to bind dashboard UI");
260
261 task_group.spawn("dashboard-ui", move |handle| async move {
262 axum::serve(ui_listener, ui_service)
263 .with_graceful_shutdown(handle.make_shutdown_rx())
264 .await
265 .expect("Failed to serve dashboard UI");
266 });
267
268 info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
269
270 loop {
271 match bitcoin_rpc_connection.status() {
272 Some(status) => {
273 if let Some(progress) = status.sync_progress {
274 if progress >= 0.999 {
275 break;
276 }
277
278 info!(target: LOG_CONSENSUS, "Waiting for bitcoin backend to sync... {progress:.1}%");
279 } else {
280 break;
281 }
282 }
283 None => {
284 info!(target: LOG_CONSENSUS, "Waiting to connect to bitcoin backend...");
285 }
286 }
287
288 sleep(Duration::from_secs(1)).await;
289 }
290
291 info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
292
293 let api_urls = get_api_urls(&db, &cfg.consensus).await;
294
295 ConsensusEngine {
298 db,
299 federation_api: DynGlobalApi::new(
300 connectors,
301 api_urls,
302 force_api_secrets.get_active().as_deref(),
303 )?,
304 cfg: cfg.clone(),
305 connections,
306 ord_latency_sender,
307 ci_status_senders,
308 submission_receiver,
309 shutdown_receiver,
310 modules: module_registry,
311 task_group: task_group.clone(),
312 data_dir,
313 db_checkpoint_retention,
314 }
315 .run()
316 .await?;
317
318 api_handler
319 .stop()
320 .expect("Consensus api should still be running");
321
322 api_handler.stopped().await;
323
324 Ok(())
325}
326
327async fn start_consensus_api(
328 cfg: &ServerConfigLocal,
329 api: ConsensusApi,
330 force_api_secrets: ApiSecrets,
331 api_bind: SocketAddr,
332) -> ServerHandle {
333 let mut rpc_module = RpcModule::new(api.clone());
334
335 net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
336
337 for (id, _, module) in api.modules.iter_modules() {
338 net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
339 }
340
341 net::api::spawn(
342 "consensus",
343 api_bind,
344 rpc_module,
345 cfg.max_connections,
346 force_api_secrets,
347 )
348 .await
349}
350
351const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
352
353fn submit_module_ci_proposals(
354 task_group: &TaskGroup,
355 db: Database,
356 module_id: ModuleInstanceId,
357 kind: ModuleKind,
358 module: DynServerModule,
359 submission_sender: Sender<ConsensusItem>,
360) {
361 let mut interval = tokio::time::interval(if is_running_in_test_env() {
362 Duration::from_millis(100)
363 } else {
364 Duration::from_secs(1)
365 });
366
367 task_group.spawn(
368 format!("citem_proposals_{module_id}"),
369 move |task_handle| async move {
370 while !task_handle.is_shutting_down() {
371 let module_consensus_items = tokio::time::timeout(
372 CONSENSUS_PROPOSAL_TIMEOUT,
373 module.consensus_proposal(
374 &mut db
375 .begin_transaction_nc()
376 .await
377 .to_ref_with_prefix_module_id(module_id)
378 .0
379 .into_nc(),
380 module_id,
381 ),
382 )
383 .await;
384
385 match module_consensus_items {
386 Ok(items) => {
387 for item in items {
388 if submission_sender
389 .send(ConsensusItem::Module(item))
390 .await
391 .is_err()
392 {
393 warn!(
394 target: LOG_CONSENSUS,
395 module_id,
396 "Unable to submit module consensus item proposal via channel"
397 );
398 }
399 }
400 }
401 Err(..) => {
402 warn!(
403 target: LOG_CONSENSUS,
404 module_id,
405 %kind,
406 "Module failed to propose consensus items on time"
407 );
408 }
409 }
410
411 interval.tick().await;
412 }
413 },
414 );
415}
416
417async fn start_iroh_api(
418 secret_key: iroh::SecretKey,
419 api_bind: SocketAddr,
420 iroh_dns: Option<SafeUrl>,
421 iroh_relays: Vec<SafeUrl>,
422 consensus_api: ConsensusApi,
423 task_group: &TaskGroup,
424 iroh_api_limits: ConnectionLimits,
425) -> anyhow::Result<()> {
426 let endpoint = build_iroh_endpoint(
427 secret_key,
428 api_bind,
429 iroh_dns,
430 iroh_relays,
431 FEDIMINT_API_ALPN,
432 )
433 .await?;
434 task_group.spawn_cancellable(
435 "iroh-api",
436 run_iroh_api(consensus_api, endpoint, task_group.clone(), iroh_api_limits),
437 );
438
439 Ok(())
440}
441
442async fn run_iroh_api(
443 consensus_api: ConsensusApi,
444 endpoint: Endpoint,
445 task_group: TaskGroup,
446 iroh_api_limits: ConnectionLimits,
447) {
448 let core_api = server_endpoints()
449 .into_iter()
450 .map(|endpoint| (endpoint.path.to_string(), endpoint))
451 .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
452
453 let module_api = consensus_api
454 .modules
455 .iter_modules()
456 .map(|(id, _, module)| {
457 let api_endpoints = module
458 .api_endpoints()
459 .into_iter()
460 .map(|endpoint| (endpoint.path.to_string(), endpoint))
461 .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
462
463 (id, api_endpoints)
464 })
465 .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
466
467 let consensus_api = Arc::new(consensus_api);
468 let core_api = Arc::new(core_api);
469 let module_api = Arc::new(module_api);
470 let parallel_connections_limit = Arc::new(Semaphore::new(iroh_api_limits.max_connections));
471
472 loop {
473 match endpoint.accept().await {
474 Some(incoming) => {
475 if parallel_connections_limit.available_permits() == 0 {
476 warn!(
477 target: LOG_NET_API,
478 limit = iroh_api_limits.max_connections,
479 "Iroh API connection limit reached, blocking new connections"
480 );
481 }
482 let permit = parallel_connections_limit
483 .clone()
484 .acquire_owned()
485 .await
486 .expect("semaphore should not be closed");
487 task_group.spawn_cancellable_silent(
488 "handle-iroh-connection",
489 handle_incoming(
490 consensus_api.clone(),
491 core_api.clone(),
492 module_api.clone(),
493 task_group.clone(),
494 incoming,
495 permit,
496 iroh_api_limits.max_requests_per_connection,
497 )
498 .then(|result| async {
499 if let Err(err) = result {
500 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
501 }
502 }),
503 );
504 }
505 None => return,
506 }
507 }
508}
509
510async fn handle_incoming(
511 consensus_api: Arc<ConsensusApi>,
512 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
513 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
514 task_group: TaskGroup,
515 incoming: Incoming,
516 _connection_permit: tokio::sync::OwnedSemaphorePermit,
517 iroh_api_max_requests_per_connection: usize,
518) -> anyhow::Result<()> {
519 let connection = incoming.accept()?.await?;
520 let parallel_requests_limit = Arc::new(Semaphore::new(iroh_api_max_requests_per_connection));
521
522 IROH_API_CONNECTIONS_ACTIVE.inc();
523 let connection_timer = IROH_API_CONNECTION_DURATION_SECONDS.start_timer();
524 scopeguard::defer! {
525 IROH_API_CONNECTIONS_ACTIVE.dec();
526 connection_timer.observe_duration();
527 }
528
529 loop {
530 let (send_stream, recv_stream) = connection.accept_bi().await?;
531
532 if parallel_requests_limit.available_permits() == 0 {
533 warn!(
534 target: LOG_NET_API,
535 limit = iroh_api_max_requests_per_connection,
536 "Iroh API request limit reached for connection, blocking new requests"
537 );
538 }
539 let permit = parallel_requests_limit
540 .clone()
541 .acquire_owned()
542 .await
543 .expect("semaphore should not be closed");
544 task_group.spawn_cancellable_silent(
545 "handle-iroh-request",
546 handle_request(
547 consensus_api.clone(),
548 core_api.clone(),
549 module_api.clone(),
550 send_stream,
551 recv_stream,
552 permit,
553 )
554 .then(|result| async {
555 if let Err(err) = result {
556 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
557 }
558 }),
559 );
560 }
561}
562
563async fn handle_request(
564 consensus_api: Arc<ConsensusApi>,
565 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
566 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
567 mut send_stream: SendStream,
568 mut recv_stream: RecvStream,
569 _request_permit: tokio::sync::OwnedSemaphorePermit,
570) -> anyhow::Result<()> {
571 let request = recv_stream.read_to_end(100_000).await?;
572
573 let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
574
575 let method = request.method.to_string();
576 let timer = IROH_API_REQUEST_DURATION_SECONDS
577 .with_label_values(&[&method])
578 .start_timer();
579
580 let response = await_response(consensus_api, core_api, module_api, request).await;
581
582 timer.observe_duration();
583
584 let response = serde_json::to_vec(&response)?;
585
586 send_stream.write_all(&response).await?;
587
588 send_stream.finish()?;
589
590 Ok(())
591}
592
593async fn await_response(
594 consensus_api: Arc<ConsensusApi>,
595 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
596 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
597 request: IrohApiRequest,
598) -> Result<Value, ApiError> {
599 match request.method {
600 ApiMethod::Core(method) => {
601 let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
602
603 let (state, context) = consensus_api.context(&request.request, None).await;
604
605 (endpoint.handler)(state, context, request.request).await
606 }
607 ApiMethod::Module(module_id, method) => {
608 let endpoint = module_api
609 .get(&module_id)
610 .ok_or(ApiError::not_found(module_id.to_string()))?
611 .get(&method)
612 .ok_or(ApiError::not_found(method))?;
613
614 let (state, context) = consensus_api
615 .context(&request.request, Some(module_id))
616 .await;
617
618 (endpoint.handler)(state, context, request.request).await
619 }
620 }
621}