1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_connectors::ConnectorRegistry;
19use fedimint_core::NumPeers;
20use fedimint_core::config::P2PMessage;
21use fedimint_core::core::{ModuleInstanceId, ModuleKind};
22use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
23use fedimint_core::envs::is_running_in_test_env;
24use fedimint_core::epoch::ConsensusItem;
25use fedimint_core::module::registry::ModuleRegistry;
26use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
27use fedimint_core::net::iroh::build_iroh_endpoint;
28use fedimint_core::net::peers::DynP2PConnections;
29use fedimint_core::task::{TaskGroup, sleep};
30use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
31use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
32use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
33use fedimint_server_core::dashboard_ui::IDashboardApi;
34use fedimint_server_core::migration::apply_migrations_server_dbtx;
35use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
36use futures::FutureExt;
37use iroh::Endpoint;
38use iroh::endpoint::{Incoming, RecvStream, SendStream};
39use jsonrpsee::RpcModule;
40use jsonrpsee::server::ServerHandle;
41use serde_json::Value;
42use tokio::net::TcpListener;
43use tokio::sync::{Semaphore, watch};
44use tracing::{info, warn};
45
46use crate::config::{ServerConfig, ServerConfigLocal};
47use crate::connection_limits::ConnectionLimits;
48use crate::consensus::api::{ConsensusApi, server_endpoints};
49use crate::consensus::engine::ConsensusEngine;
50use crate::db::verify_server_db_integrity_dbtx;
51use crate::net::api::announcement::get_api_urls;
52use crate::net::api::{ApiSecrets, HasApiContext};
53use crate::net::p2p::{P2PConnectionTypeReceivers, P2PStatusReceivers};
54use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
55
56const TRANSACTION_BUFFER: usize = 1000;
58
59#[allow(clippy::too_many_arguments)]
60pub async fn run(
61 connectors: ConnectorRegistry,
62 connections: DynP2PConnections<P2PMessage>,
63 p2p_status_receivers: P2PStatusReceivers,
64 p2p_connection_type_receivers: P2PConnectionTypeReceivers,
65 api_bind: SocketAddr,
66 iroh_dns: Option<SafeUrl>,
67 iroh_relays: Vec<SafeUrl>,
68 cfg: ServerConfig,
69 db: Database,
70 module_init_registry: ServerModuleInitRegistry,
71 task_group: &TaskGroup,
72 force_api_secrets: ApiSecrets,
73 data_dir: PathBuf,
74 code_version_str: String,
75 dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
76 ui_bind: SocketAddr,
77 dashboard_ui_router: DashboardUiRouter,
78 db_checkpoint_retention: u64,
79 iroh_api_limits: ConnectionLimits,
80) -> anyhow::Result<()> {
81 cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
82
83 let mut global_dbtx = db.begin_transaction().await;
84 apply_migrations_server_dbtx(
85 &mut global_dbtx.to_ref_nc(),
86 Arc::new(ServerDbMigrationContext),
87 "fedimint-server".to_string(),
88 get_global_database_migrations(),
89 )
90 .await?;
91
92 update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
93
94 if is_running_in_test_env() {
95 verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
96 }
97 global_dbtx.commit_tx_result().await?;
98
99 let mut modules = BTreeMap::new();
100
101 let global_api = DynGlobalApi::new(
103 connectors.clone(),
104 cfg.consensus
105 .api_endpoints()
106 .iter()
107 .map(|(&peer_id, url)| (peer_id, url.url.clone()))
108 .collect(),
109 None,
110 )?;
111
112 let bitcoin_rpc_connection = ServerBitcoinRpcMonitor::new(
113 dyn_server_bitcoin_rpc,
114 if is_running_in_test_env() {
115 Duration::from_millis(100)
116 } else {
117 Duration::from_secs(60)
118 },
119 task_group,
120 );
121
122 for (module_id, module_cfg) in &cfg.consensus.modules {
123 match module_init_registry.get(&module_cfg.kind) {
124 Some(module_init) => {
125 info!(target: LOG_CORE, "Initialise module {module_id}...");
126
127 let mut dbtx = db.begin_transaction().await;
128 apply_migrations_dbtx(
129 &mut dbtx.to_ref_nc(),
130 Arc::new(ServerDbMigrationContext) as Arc<_>,
131 module_init.module_kind().to_string(),
132 module_init.get_database_migrations(),
133 Some(*module_id),
134 None,
135 )
136 .await?;
137
138 if let Some(used_db_prefixes) = module_init.used_db_prefixes()
139 && is_running_in_test_env()
140 {
141 verify_module_db_integrity_dbtx(
142 &mut dbtx.to_ref_nc(),
143 *module_id,
144 module_init.module_kind(),
145 &used_db_prefixes,
146 )
147 .await;
148 }
149 dbtx.commit_tx_result().await?;
150
151 let module = module_init
152 .init(
153 NumPeers::from(cfg.consensus.api_endpoints().len()),
154 cfg.get_module_config(*module_id)?,
155 db.with_prefix_module_id(*module_id).0,
156 task_group,
157 cfg.local.identity,
158 global_api.with_module(*module_id),
159 bitcoin_rpc_connection.clone(),
160 )
161 .await?;
162
163 modules.insert(*module_id, (module_cfg.kind.clone(), module));
164 }
165 None => bail!("Detected configuration for unsupported module id: {module_id}"),
166 }
167 }
168
169 let module_registry = ModuleRegistry::from(modules);
170
171 let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
172
173 let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
174 let (shutdown_sender, shutdown_receiver) = watch::channel(None);
175 let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
176
177 let mut ci_status_senders = BTreeMap::new();
178 let mut ci_status_receivers = BTreeMap::new();
179
180 for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
181 let (ci_sender, ci_receiver) = watch::channel(None);
182
183 ci_status_senders.insert(peer, ci_sender);
184 ci_status_receivers.insert(peer, ci_receiver);
185 }
186
187 let consensus_api = ConsensusApi {
188 cfg: cfg.clone(),
189 cfg_dir: data_dir.clone(),
190 db: db.clone(),
191 modules: module_registry.clone(),
192 client_cfg: client_cfg.clone(),
193 submission_sender: submission_sender.clone(),
194 shutdown_sender,
195 shutdown_receiver: shutdown_receiver.clone(),
196 supported_api_versions: ServerConfig::supported_api_versions_summary(
197 &cfg.consensus.modules,
198 &module_init_registry,
199 ),
200 p2p_status_receivers,
201 p2p_connection_type_receivers,
202 ci_status_receivers,
203 ord_latency_receiver,
204 bitcoin_rpc_connection: bitcoin_rpc_connection.clone(),
205 force_api_secret: force_api_secrets.get_active(),
206 code_version_str,
207 task_group: task_group.clone(),
208 };
209
210 info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
211
212 let api_handler = start_consensus_api(
213 &cfg.local,
214 consensus_api.clone(),
215 force_api_secrets.clone(),
216 api_bind,
217 )
218 .await;
219
220 if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone()
221 && let Err(e) = Box::pin(start_iroh_api(
222 iroh_api_sk,
223 api_bind,
224 iroh_dns,
225 iroh_relays,
226 consensus_api.clone(),
227 task_group,
228 iroh_api_limits,
229 ))
230 .await
231 {
232 api_handler.stop().expect("Just started");
234 api_handler.stopped().await;
235 return Err(e);
236 }
237
238 info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
239
240 for (module_id, kind, module) in module_registry.iter_modules() {
241 submit_module_ci_proposals(
242 task_group,
243 db.clone(),
244 module_id,
245 kind.clone(),
246 module.clone(),
247 submission_sender.clone(),
248 );
249 }
250
251 let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
252
253 let ui_listener = TcpListener::bind(ui_bind)
254 .await
255 .expect("Failed to bind dashboard UI");
256
257 task_group.spawn("dashboard-ui", move |handle| async move {
258 axum::serve(ui_listener, ui_service)
259 .with_graceful_shutdown(handle.make_shutdown_rx())
260 .await
261 .expect("Failed to serve dashboard UI");
262 });
263
264 info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
265
266 loop {
267 match bitcoin_rpc_connection.status() {
268 Some(status) => {
269 if let Some(progress) = status.sync_progress {
270 if progress >= 0.999 {
271 break;
272 }
273
274 info!(target: LOG_CONSENSUS, "Waiting for bitcoin backend to sync... {progress:.1}%");
275 } else {
276 break;
277 }
278 }
279 None => {
280 info!(target: LOG_CONSENSUS, "Waiting to connect to bitcoin backend...");
281 }
282 }
283
284 sleep(Duration::from_secs(1)).await;
285 }
286
287 info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
288
289 let api_urls = get_api_urls(&db, &cfg.consensus).await;
290
291 ConsensusEngine {
294 db,
295 federation_api: DynGlobalApi::new(
296 connectors,
297 api_urls,
298 force_api_secrets.get_active().as_deref(),
299 )?,
300 cfg: cfg.clone(),
301 connections,
302 ord_latency_sender,
303 ci_status_senders,
304 submission_receiver,
305 shutdown_receiver,
306 modules: module_registry,
307 task_group: task_group.clone(),
308 data_dir,
309 db_checkpoint_retention,
310 }
311 .run()
312 .await?;
313
314 api_handler
315 .stop()
316 .expect("Consensus api should still be running");
317
318 api_handler.stopped().await;
319
320 Ok(())
321}
322
323async fn start_consensus_api(
324 cfg: &ServerConfigLocal,
325 api: ConsensusApi,
326 force_api_secrets: ApiSecrets,
327 api_bind: SocketAddr,
328) -> ServerHandle {
329 let mut rpc_module = RpcModule::new(api.clone());
330
331 net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
332
333 for (id, _, module) in api.modules.iter_modules() {
334 net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
335 }
336
337 net::api::spawn(
338 "consensus",
339 api_bind,
340 rpc_module,
341 cfg.max_connections,
342 force_api_secrets,
343 )
344 .await
345}
346
347const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
348
349fn submit_module_ci_proposals(
350 task_group: &TaskGroup,
351 db: Database,
352 module_id: ModuleInstanceId,
353 kind: ModuleKind,
354 module: DynServerModule,
355 submission_sender: Sender<ConsensusItem>,
356) {
357 let mut interval = tokio::time::interval(if is_running_in_test_env() {
358 Duration::from_millis(100)
359 } else {
360 Duration::from_secs(1)
361 });
362
363 task_group.spawn(
364 format!("citem_proposals_{module_id}"),
365 move |task_handle| async move {
366 while !task_handle.is_shutting_down() {
367 let module_consensus_items = tokio::time::timeout(
368 CONSENSUS_PROPOSAL_TIMEOUT,
369 module.consensus_proposal(
370 &mut db
371 .begin_transaction_nc()
372 .await
373 .to_ref_with_prefix_module_id(module_id)
374 .0
375 .into_nc(),
376 module_id,
377 ),
378 )
379 .await;
380
381 match module_consensus_items {
382 Ok(items) => {
383 for item in items {
384 if submission_sender
385 .send(ConsensusItem::Module(item))
386 .await
387 .is_err()
388 {
389 warn!(
390 target: LOG_CONSENSUS,
391 module_id,
392 "Unable to submit module consensus item proposal via channel"
393 );
394 }
395 }
396 }
397 Err(..) => {
398 warn!(
399 target: LOG_CONSENSUS,
400 module_id,
401 %kind,
402 "Module failed to propose consensus items on time"
403 );
404 }
405 }
406
407 interval.tick().await;
408 }
409 },
410 );
411}
412
413async fn start_iroh_api(
414 secret_key: iroh::SecretKey,
415 api_bind: SocketAddr,
416 iroh_dns: Option<SafeUrl>,
417 iroh_relays: Vec<SafeUrl>,
418 consensus_api: ConsensusApi,
419 task_group: &TaskGroup,
420 iroh_api_limits: ConnectionLimits,
421) -> anyhow::Result<()> {
422 let endpoint = build_iroh_endpoint(
423 secret_key,
424 api_bind,
425 iroh_dns,
426 iroh_relays,
427 FEDIMINT_API_ALPN,
428 )
429 .await?;
430 task_group.spawn_cancellable(
431 "iroh-api",
432 run_iroh_api(consensus_api, endpoint, task_group.clone(), iroh_api_limits),
433 );
434
435 Ok(())
436}
437
438async fn run_iroh_api(
439 consensus_api: ConsensusApi,
440 endpoint: Endpoint,
441 task_group: TaskGroup,
442 iroh_api_limits: ConnectionLimits,
443) {
444 let core_api = server_endpoints()
445 .into_iter()
446 .map(|endpoint| (endpoint.path.to_string(), endpoint))
447 .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
448
449 let module_api = consensus_api
450 .modules
451 .iter_modules()
452 .map(|(id, _, module)| {
453 let api_endpoints = module
454 .api_endpoints()
455 .into_iter()
456 .map(|endpoint| (endpoint.path.to_string(), endpoint))
457 .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
458
459 (id, api_endpoints)
460 })
461 .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
462
463 let consensus_api = Arc::new(consensus_api);
464 let core_api = Arc::new(core_api);
465 let module_api = Arc::new(module_api);
466 let parallel_connections_limit = Arc::new(Semaphore::new(iroh_api_limits.max_connections));
467
468 loop {
469 match endpoint.accept().await {
470 Some(incoming) => {
471 if parallel_connections_limit.available_permits() == 0 {
472 warn!(
473 target: LOG_NET_API,
474 limit = iroh_api_limits.max_connections,
475 "Iroh API connection limit reached, blocking new connections"
476 );
477 }
478 let permit = parallel_connections_limit
479 .clone()
480 .acquire_owned()
481 .await
482 .expect("semaphore should not be closed");
483 task_group.spawn_cancellable_silent(
484 "handle-iroh-connection",
485 handle_incoming(
486 consensus_api.clone(),
487 core_api.clone(),
488 module_api.clone(),
489 task_group.clone(),
490 incoming,
491 permit,
492 iroh_api_limits.max_requests_per_connection,
493 )
494 .then(|result| async {
495 if let Err(err) = result {
496 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
497 }
498 }),
499 );
500 }
501 None => return,
502 }
503 }
504}
505
506async fn handle_incoming(
507 consensus_api: Arc<ConsensusApi>,
508 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
509 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
510 task_group: TaskGroup,
511 incoming: Incoming,
512 _connection_permit: tokio::sync::OwnedSemaphorePermit,
513 iroh_api_max_requests_per_connection: usize,
514) -> anyhow::Result<()> {
515 let connection = incoming.accept()?.await?;
516 let parallel_requests_limit = Arc::new(Semaphore::new(iroh_api_max_requests_per_connection));
517
518 loop {
519 let (send_stream, recv_stream) = connection.accept_bi().await?;
520
521 if parallel_requests_limit.available_permits() == 0 {
522 warn!(
523 target: LOG_NET_API,
524 limit = iroh_api_max_requests_per_connection,
525 "Iroh API request limit reached for connection, blocking new requests"
526 );
527 }
528 let permit = parallel_requests_limit
529 .clone()
530 .acquire_owned()
531 .await
532 .expect("semaphore should not be closed");
533 task_group.spawn_cancellable_silent(
534 "handle-iroh-request",
535 handle_request(
536 consensus_api.clone(),
537 core_api.clone(),
538 module_api.clone(),
539 send_stream,
540 recv_stream,
541 permit,
542 )
543 .then(|result| async {
544 if let Err(err) = result {
545 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
546 }
547 }),
548 );
549 }
550}
551
552async fn handle_request(
553 consensus_api: Arc<ConsensusApi>,
554 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
555 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
556 mut send_stream: SendStream,
557 mut recv_stream: RecvStream,
558 _request_permit: tokio::sync::OwnedSemaphorePermit,
559) -> anyhow::Result<()> {
560 let request = recv_stream.read_to_end(100_000).await?;
561
562 let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
563
564 let response = await_response(consensus_api, core_api, module_api, request).await;
565
566 let response = serde_json::to_vec(&response)?;
567
568 send_stream.write_all(&response).await?;
569
570 send_stream.finish()?;
571
572 Ok(())
573}
574
575async fn await_response(
576 consensus_api: Arc<ConsensusApi>,
577 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
578 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
579 request: IrohApiRequest,
580) -> Result<Value, ApiError> {
581 match request.method {
582 ApiMethod::Core(method) => {
583 let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
584
585 let (state, context) = consensus_api.context(&request.request, None).await;
586
587 (endpoint.handler)(state, context, request.request).await
588 }
589 ApiMethod::Module(module_id, method) => {
590 let endpoint = module_api
591 .get(&module_id)
592 .ok_or(ApiError::not_found(module_id.to_string()))?
593 .get(&method)
594 .ok_or(ApiError::not_found(method))?;
595
596 let (state, context) = consensus_api
597 .context(&request.request, Some(module_id))
598 .await;
599
600 (endpoint.handler)(state, context, request.request).await
601 }
602 }
603}