1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_connectors::ConnectorRegistry;
19use fedimint_core::NumPeers;
20use fedimint_core::config::P2PMessage;
21use fedimint_core::core::{ModuleInstanceId, ModuleKind};
22use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
23use fedimint_core::envs::is_running_in_test_env;
24use fedimint_core::epoch::ConsensusItem;
25use fedimint_core::module::registry::ModuleRegistry;
26use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
27use fedimint_core::net::iroh::build_iroh_endpoint;
28use fedimint_core::net::peers::DynP2PConnections;
29use fedimint_core::task::{TaskGroup, sleep};
30use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
31use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
32use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
33use fedimint_server_core::dashboard_ui::IDashboardApi;
34use fedimint_server_core::migration::apply_migrations_server_dbtx;
35use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
36use futures::FutureExt;
37use iroh::Endpoint;
38use iroh::endpoint::{Incoming, RecvStream, SendStream};
39use jsonrpsee::RpcModule;
40use jsonrpsee::server::ServerHandle;
41use serde_json::Value;
42use tokio::net::TcpListener;
43use tokio::sync::{Semaphore, watch};
44use tracing::{info, warn};
45
46use crate::config::{ServerConfig, ServerConfigLocal};
47use crate::connection_limits::ConnectionLimits;
48use crate::consensus::api::{ConsensusApi, server_endpoints};
49use crate::consensus::engine::ConsensusEngine;
50use crate::db::verify_server_db_integrity_dbtx;
51use crate::net::api::announcement::get_api_urls;
52use crate::net::api::{ApiSecrets, HasApiContext};
53use crate::net::p2p::P2PStatusReceivers;
54use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
55
56const TRANSACTION_BUFFER: usize = 1000;
58
59#[allow(clippy::too_many_arguments)]
60pub async fn run(
61 connectors: ConnectorRegistry,
62 connections: DynP2PConnections<P2PMessage>,
63 p2p_status_receivers: P2PStatusReceivers,
64 api_bind: SocketAddr,
65 iroh_dns: Option<SafeUrl>,
66 iroh_relays: Vec<SafeUrl>,
67 cfg: ServerConfig,
68 db: Database,
69 module_init_registry: ServerModuleInitRegistry,
70 task_group: &TaskGroup,
71 force_api_secrets: ApiSecrets,
72 data_dir: PathBuf,
73 code_version_str: String,
74 dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
75 ui_bind: SocketAddr,
76 dashboard_ui_router: DashboardUiRouter,
77 db_checkpoint_retention: u64,
78 iroh_api_limits: ConnectionLimits,
79) -> anyhow::Result<()> {
80 cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
81
82 let mut global_dbtx = db.begin_transaction().await;
83 apply_migrations_server_dbtx(
84 &mut global_dbtx.to_ref_nc(),
85 Arc::new(ServerDbMigrationContext),
86 "fedimint-server".to_string(),
87 get_global_database_migrations(),
88 )
89 .await?;
90
91 update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
92
93 if is_running_in_test_env() {
94 verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
95 }
96 global_dbtx.commit_tx_result().await?;
97
98 let mut modules = BTreeMap::new();
99
100 let global_api = DynGlobalApi::new(
102 connectors.clone(),
103 cfg.consensus
104 .api_endpoints()
105 .iter()
106 .map(|(&peer_id, url)| (peer_id, url.url.clone()))
107 .collect(),
108 None,
109 )?;
110
111 let bitcoin_rpc_connection = ServerBitcoinRpcMonitor::new(
112 dyn_server_bitcoin_rpc,
113 if is_running_in_test_env() {
114 Duration::from_millis(100)
115 } else {
116 Duration::from_secs(60)
117 },
118 task_group,
119 );
120
121 for (module_id, module_cfg) in &cfg.consensus.modules {
122 match module_init_registry.get(&module_cfg.kind) {
123 Some(module_init) => {
124 info!(target: LOG_CORE, "Initialise module {module_id}...");
125
126 let mut dbtx = db.begin_transaction().await;
127 apply_migrations_dbtx(
128 &mut dbtx.to_ref_nc(),
129 Arc::new(ServerDbMigrationContext) as Arc<_>,
130 module_init.module_kind().to_string(),
131 module_init.get_database_migrations(),
132 Some(*module_id),
133 None,
134 )
135 .await?;
136
137 if let Some(used_db_prefixes) = module_init.used_db_prefixes()
138 && is_running_in_test_env()
139 {
140 verify_module_db_integrity_dbtx(
141 &mut dbtx.to_ref_nc(),
142 *module_id,
143 module_init.module_kind(),
144 &used_db_prefixes,
145 )
146 .await;
147 }
148 dbtx.commit_tx_result().await?;
149
150 let module = module_init
151 .init(
152 NumPeers::from(cfg.consensus.api_endpoints().len()),
153 cfg.get_module_config(*module_id)?,
154 db.with_prefix_module_id(*module_id).0,
155 task_group,
156 cfg.local.identity,
157 global_api.with_module(*module_id),
158 bitcoin_rpc_connection.clone(),
159 )
160 .await?;
161
162 modules.insert(*module_id, (module_cfg.kind.clone(), module));
163 }
164 None => bail!("Detected configuration for unsupported module id: {module_id}"),
165 }
166 }
167
168 let module_registry = ModuleRegistry::from(modules);
169
170 let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
171
172 let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
173 let (shutdown_sender, shutdown_receiver) = watch::channel(None);
174 let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
175
176 let mut ci_status_senders = BTreeMap::new();
177 let mut ci_status_receivers = BTreeMap::new();
178
179 for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
180 let (ci_sender, ci_receiver) = watch::channel(None);
181
182 ci_status_senders.insert(peer, ci_sender);
183 ci_status_receivers.insert(peer, ci_receiver);
184 }
185
186 let consensus_api = ConsensusApi {
187 cfg: cfg.clone(),
188 cfg_dir: data_dir.clone(),
189 db: db.clone(),
190 modules: module_registry.clone(),
191 client_cfg: client_cfg.clone(),
192 submission_sender: submission_sender.clone(),
193 shutdown_sender,
194 shutdown_receiver: shutdown_receiver.clone(),
195 supported_api_versions: ServerConfig::supported_api_versions_summary(
196 &cfg.consensus.modules,
197 &module_init_registry,
198 ),
199 p2p_status_receivers,
200 ci_status_receivers,
201 ord_latency_receiver,
202 bitcoin_rpc_connection: bitcoin_rpc_connection.clone(),
203 force_api_secret: force_api_secrets.get_active(),
204 code_version_str,
205 task_group: task_group.clone(),
206 };
207
208 info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
209
210 let api_handler = start_consensus_api(
211 &cfg.local,
212 consensus_api.clone(),
213 force_api_secrets.clone(),
214 api_bind,
215 )
216 .await;
217
218 if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone()
219 && let Err(e) = Box::pin(start_iroh_api(
220 iroh_api_sk,
221 api_bind,
222 iroh_dns,
223 iroh_relays,
224 consensus_api.clone(),
225 task_group,
226 iroh_api_limits,
227 ))
228 .await
229 {
230 api_handler.stop().expect("Just started");
232 api_handler.stopped().await;
233 return Err(e);
234 }
235
236 info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
237
238 for (module_id, kind, module) in module_registry.iter_modules() {
239 submit_module_ci_proposals(
240 task_group,
241 db.clone(),
242 module_id,
243 kind.clone(),
244 module.clone(),
245 submission_sender.clone(),
246 );
247 }
248
249 let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
250
251 let ui_listener = TcpListener::bind(ui_bind)
252 .await
253 .expect("Failed to bind dashboard UI");
254
255 task_group.spawn("dashboard-ui", move |handle| async move {
256 axum::serve(ui_listener, ui_service)
257 .with_graceful_shutdown(handle.make_shutdown_rx())
258 .await
259 .expect("Failed to serve dashboard UI");
260 });
261
262 info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
263
264 loop {
265 match bitcoin_rpc_connection.status() {
266 Some(status) => {
267 if let Some(progress) = status.sync_progress {
268 if progress >= 0.999 {
269 break;
270 }
271
272 info!(target: LOG_CONSENSUS, "Waiting for bitcoin backend to sync... {progress:.1}%");
273 } else {
274 break;
275 }
276 }
277 None => {
278 info!(target: LOG_CONSENSUS, "Waiting to connect to bitcoin backend...");
279 }
280 }
281
282 sleep(Duration::from_secs(1)).await;
283 }
284
285 info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
286
287 let api_urls = get_api_urls(&db, &cfg.consensus).await;
288
289 ConsensusEngine {
292 db,
293 federation_api: DynGlobalApi::new(
294 connectors,
295 api_urls,
296 force_api_secrets.get_active().as_deref(),
297 )?,
298 cfg: cfg.clone(),
299 connections,
300 ord_latency_sender,
301 ci_status_senders,
302 submission_receiver,
303 shutdown_receiver,
304 modules: module_registry,
305 task_group: task_group.clone(),
306 data_dir,
307 db_checkpoint_retention,
308 }
309 .run()
310 .await?;
311
312 api_handler
313 .stop()
314 .expect("Consensus api should still be running");
315
316 api_handler.stopped().await;
317
318 Ok(())
319}
320
321async fn start_consensus_api(
322 cfg: &ServerConfigLocal,
323 api: ConsensusApi,
324 force_api_secrets: ApiSecrets,
325 api_bind: SocketAddr,
326) -> ServerHandle {
327 let mut rpc_module = RpcModule::new(api.clone());
328
329 net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
330
331 for (id, _, module) in api.modules.iter_modules() {
332 net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
333 }
334
335 net::api::spawn(
336 "consensus",
337 api_bind,
338 rpc_module,
339 cfg.max_connections,
340 force_api_secrets,
341 )
342 .await
343}
344
345const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
346
347fn submit_module_ci_proposals(
348 task_group: &TaskGroup,
349 db: Database,
350 module_id: ModuleInstanceId,
351 kind: ModuleKind,
352 module: DynServerModule,
353 submission_sender: Sender<ConsensusItem>,
354) {
355 let mut interval = tokio::time::interval(if is_running_in_test_env() {
356 Duration::from_millis(100)
357 } else {
358 Duration::from_secs(1)
359 });
360
361 task_group.spawn(
362 format!("citem_proposals_{module_id}"),
363 move |task_handle| async move {
364 while !task_handle.is_shutting_down() {
365 let module_consensus_items = tokio::time::timeout(
366 CONSENSUS_PROPOSAL_TIMEOUT,
367 module.consensus_proposal(
368 &mut db
369 .begin_transaction_nc()
370 .await
371 .to_ref_with_prefix_module_id(module_id)
372 .0
373 .into_nc(),
374 module_id,
375 ),
376 )
377 .await;
378
379 match module_consensus_items {
380 Ok(items) => {
381 for item in items {
382 if submission_sender
383 .send(ConsensusItem::Module(item))
384 .await
385 .is_err()
386 {
387 warn!(
388 target: LOG_CONSENSUS,
389 module_id,
390 "Unable to submit module consensus item proposal via channel"
391 );
392 }
393 }
394 }
395 Err(..) => {
396 warn!(
397 target: LOG_CONSENSUS,
398 module_id,
399 %kind,
400 "Module failed to propose consensus items on time"
401 );
402 }
403 }
404
405 interval.tick().await;
406 }
407 },
408 );
409}
410
411async fn start_iroh_api(
412 secret_key: iroh::SecretKey,
413 api_bind: SocketAddr,
414 iroh_dns: Option<SafeUrl>,
415 iroh_relays: Vec<SafeUrl>,
416 consensus_api: ConsensusApi,
417 task_group: &TaskGroup,
418 iroh_api_limits: ConnectionLimits,
419) -> anyhow::Result<()> {
420 let endpoint = build_iroh_endpoint(
421 secret_key,
422 api_bind,
423 iroh_dns,
424 iroh_relays,
425 FEDIMINT_API_ALPN,
426 )
427 .await?;
428 task_group.spawn_cancellable(
429 "iroh-api",
430 run_iroh_api(consensus_api, endpoint, task_group.clone(), iroh_api_limits),
431 );
432
433 Ok(())
434}
435
436async fn run_iroh_api(
437 consensus_api: ConsensusApi,
438 endpoint: Endpoint,
439 task_group: TaskGroup,
440 iroh_api_limits: ConnectionLimits,
441) {
442 let core_api = server_endpoints()
443 .into_iter()
444 .map(|endpoint| (endpoint.path.to_string(), endpoint))
445 .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
446
447 let module_api = consensus_api
448 .modules
449 .iter_modules()
450 .map(|(id, _, module)| {
451 let api_endpoints = module
452 .api_endpoints()
453 .into_iter()
454 .map(|endpoint| (endpoint.path.to_string(), endpoint))
455 .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
456
457 (id, api_endpoints)
458 })
459 .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
460
461 let consensus_api = Arc::new(consensus_api);
462 let core_api = Arc::new(core_api);
463 let module_api = Arc::new(module_api);
464 let parallel_connections_limit = Arc::new(Semaphore::new(iroh_api_limits.max_connections));
465
466 loop {
467 match endpoint.accept().await {
468 Some(incoming) => {
469 if parallel_connections_limit.available_permits() == 0 {
470 warn!(
471 target: LOG_NET_API,
472 limit = iroh_api_limits.max_connections,
473 "Iroh API connection limit reached, blocking new connections"
474 );
475 }
476 let permit = parallel_connections_limit
477 .clone()
478 .acquire_owned()
479 .await
480 .expect("semaphore should not be closed");
481 task_group.spawn_cancellable_silent(
482 "handle-iroh-connection",
483 handle_incoming(
484 consensus_api.clone(),
485 core_api.clone(),
486 module_api.clone(),
487 task_group.clone(),
488 incoming,
489 permit,
490 iroh_api_limits.max_requests_per_connection,
491 )
492 .then(|result| async {
493 if let Err(err) = result {
494 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
495 }
496 }),
497 );
498 }
499 None => return,
500 }
501 }
502}
503
504async fn handle_incoming(
505 consensus_api: Arc<ConsensusApi>,
506 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
507 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
508 task_group: TaskGroup,
509 incoming: Incoming,
510 _connection_permit: tokio::sync::OwnedSemaphorePermit,
511 iroh_api_max_requests_per_connection: usize,
512) -> anyhow::Result<()> {
513 let connection = incoming.accept()?.await?;
514 let parallel_requests_limit = Arc::new(Semaphore::new(iroh_api_max_requests_per_connection));
515
516 loop {
517 let (send_stream, recv_stream) = connection.accept_bi().await?;
518
519 if parallel_requests_limit.available_permits() == 0 {
520 warn!(
521 target: LOG_NET_API,
522 limit = iroh_api_max_requests_per_connection,
523 "Iroh API request limit reached for connection, blocking new requests"
524 );
525 }
526 let permit = parallel_requests_limit
527 .clone()
528 .acquire_owned()
529 .await
530 .expect("semaphore should not be closed");
531 task_group.spawn_cancellable_silent(
532 "handle-iroh-request",
533 handle_request(
534 consensus_api.clone(),
535 core_api.clone(),
536 module_api.clone(),
537 send_stream,
538 recv_stream,
539 permit,
540 )
541 .then(|result| async {
542 if let Err(err) = result {
543 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
544 }
545 }),
546 );
547 }
548}
549
550async fn handle_request(
551 consensus_api: Arc<ConsensusApi>,
552 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
553 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
554 mut send_stream: SendStream,
555 mut recv_stream: RecvStream,
556 _request_permit: tokio::sync::OwnedSemaphorePermit,
557) -> anyhow::Result<()> {
558 let request = recv_stream.read_to_end(100_000).await?;
559
560 let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
561
562 let response = await_response(consensus_api, core_api, module_api, request).await;
563
564 let response = serde_json::to_vec(&response)?;
565
566 send_stream.write_all(&response).await?;
567
568 send_stream.finish()?;
569
570 Ok(())
571}
572
573async fn await_response(
574 consensus_api: Arc<ConsensusApi>,
575 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
576 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
577 request: IrohApiRequest,
578) -> Result<Value, ApiError> {
579 match request.method {
580 ApiMethod::Core(method) => {
581 let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
582
583 let (state, context) = consensus_api.context(&request.request, None).await;
584
585 (endpoint.handler)(state, context, request.request).await
586 }
587 ApiMethod::Module(module_id, method) => {
588 let endpoint = module_api
589 .get(&module_id)
590 .ok_or(ApiError::not_found(module_id.to_string()))?
591 .get(&method)
592 .ok_or(ApiError::not_found(method))?;
593
594 let (state, context) = consensus_api
595 .context(&request.request, Some(module_id))
596 .await;
597
598 (endpoint.handler)(state, context, request.request).await
599 }
600 }
601}