1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_core::NumPeers;
19use fedimint_core::config::P2PMessage;
20use fedimint_core::core::{ModuleInstanceId, ModuleKind};
21use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
22use fedimint_core::envs::is_running_in_test_env;
23use fedimint_core::epoch::ConsensusItem;
24use fedimint_core::module::registry::ModuleRegistry;
25use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
26use fedimint_core::net::peers::DynP2PConnections;
27use fedimint_core::task::TaskGroup;
28use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
29use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
30use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
31use fedimint_server_core::dashboard_ui::IDashboardApi;
32use fedimint_server_core::migration::apply_migrations_server_dbtx;
33use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
34use futures::FutureExt;
35use iroh::Endpoint;
36use iroh::endpoint::{Incoming, RecvStream, SendStream};
37use jsonrpsee::RpcModule;
38use jsonrpsee::server::ServerHandle;
39use serde_json::Value;
40use tokio::net::TcpListener;
41use tokio::sync::watch;
42use tracing::{info, warn};
43
44use crate::config::{ServerConfig, ServerConfigLocal};
45use crate::consensus::api::{ConsensusApi, server_endpoints};
46use crate::consensus::engine::ConsensusEngine;
47use crate::db::verify_server_db_integrity_dbtx;
48use crate::net::api::announcement::get_api_urls;
49use crate::net::api::{ApiSecrets, HasApiContext};
50use crate::net::p2p::P2PStatusReceivers;
51use crate::net::p2p_connector::build_iroh_endpoint;
52use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
53
54const TRANSACTION_BUFFER: usize = 1000;
56
57#[allow(clippy::too_many_arguments)]
58pub async fn run(
59 connections: DynP2PConnections<P2PMessage>,
60 p2p_status_receivers: P2PStatusReceivers,
61 api_bind: SocketAddr,
62 iroh_dns: Option<SafeUrl>,
63 iroh_relays: Vec<SafeUrl>,
64 cfg: ServerConfig,
65 db: Database,
66 module_init_registry: ServerModuleInitRegistry,
67 task_group: &TaskGroup,
68 force_api_secrets: ApiSecrets,
69 data_dir: PathBuf,
70 code_version_str: String,
71 dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
72 ui_bind: SocketAddr,
73 dashboard_ui_router: DashboardUiRouter,
74 db_checkpoint_retention: u64,
75) -> anyhow::Result<()> {
76 cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
77
78 let mut global_dbtx = db.begin_transaction().await;
79 apply_migrations_server_dbtx(
80 &mut global_dbtx.to_ref_nc(),
81 Arc::new(ServerDbMigrationContext),
82 "fedimint-server".to_string(),
83 get_global_database_migrations(),
84 )
85 .await?;
86
87 update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
88
89 if is_running_in_test_env() {
90 verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
91 }
92 global_dbtx.commit_tx_result().await?;
93
94 let mut modules = BTreeMap::new();
95
96 let global_api = DynGlobalApi::from_endpoints(
98 cfg.consensus
99 .api_endpoints()
100 .iter()
101 .map(|(&peer_id, url)| (peer_id, url.url.clone())),
102 &None,
103 )
104 .await?;
105
106 let server_bitcoin_rpc_monitor = ServerBitcoinRpcMonitor::new(
107 dyn_server_bitcoin_rpc,
108 if is_running_in_test_env() {
109 Duration::from_millis(100)
110 } else {
111 Duration::from_secs(60)
112 },
113 task_group,
114 );
115
116 for (module_id, module_cfg) in &cfg.consensus.modules {
117 match module_init_registry.get(&module_cfg.kind) {
118 Some(module_init) => {
119 info!(target: LOG_CORE, "Initialise module {module_id}...");
120
121 let mut dbtx = db.begin_transaction().await;
122 apply_migrations_dbtx(
123 &mut dbtx.to_ref_nc(),
124 Arc::new(ServerDbMigrationContext) as Arc<_>,
125 module_init.module_kind().to_string(),
126 module_init.get_database_migrations(),
127 Some(*module_id),
128 None,
129 )
130 .await?;
131
132 if let Some(used_db_prefixes) = module_init.used_db_prefixes() {
133 if is_running_in_test_env() {
134 verify_module_db_integrity_dbtx(
135 &mut dbtx.to_ref_nc(),
136 *module_id,
137 module_init.module_kind(),
138 &used_db_prefixes,
139 )
140 .await;
141 }
142 }
143 dbtx.commit_tx_result().await?;
144
145 let module = module_init
146 .init(
147 NumPeers::from(cfg.consensus.api_endpoints().len()),
148 cfg.get_module_config(*module_id)?,
149 db.with_prefix_module_id(*module_id).0,
150 task_group,
151 cfg.local.identity,
152 global_api.with_module(*module_id),
153 server_bitcoin_rpc_monitor.clone(),
154 )
155 .await?;
156
157 modules.insert(*module_id, (module_cfg.kind.clone(), module));
158 }
159 None => bail!("Detected configuration for unsupported module id: {module_id}"),
160 }
161 }
162
163 let module_registry = ModuleRegistry::from(modules);
164
165 let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
166
167 let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
168 let (shutdown_sender, shutdown_receiver) = watch::channel(None);
169 let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
170
171 let mut ci_status_senders = BTreeMap::new();
172 let mut ci_status_receivers = BTreeMap::new();
173
174 for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
175 let (ci_sender, ci_receiver) = watch::channel(None);
176
177 ci_status_senders.insert(peer, ci_sender);
178 ci_status_receivers.insert(peer, ci_receiver);
179 }
180
181 let consensus_api = ConsensusApi {
182 cfg: cfg.clone(),
183 db: db.clone(),
184 modules: module_registry.clone(),
185 client_cfg: client_cfg.clone(),
186 submission_sender: submission_sender.clone(),
187 shutdown_sender,
188 shutdown_receiver: shutdown_receiver.clone(),
189 supported_api_versions: ServerConfig::supported_api_versions_summary(
190 &cfg.consensus.modules,
191 &module_init_registry,
192 ),
193 p2p_status_receivers,
194 ci_status_receivers,
195 ord_latency_receiver,
196 bitcoin_rpc_connection: server_bitcoin_rpc_monitor,
197 force_api_secret: force_api_secrets.get_active(),
198 code_version_str,
199 };
200
201 info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
202
203 let api_handler = if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone() {
204 Box::pin(start_iroh_api(
205 iroh_api_sk,
206 api_bind,
207 iroh_dns,
208 iroh_relays,
209 consensus_api.clone(),
210 task_group,
211 ))
212 .await?;
213
214 None
215 } else {
216 let handler = start_consensus_api(
217 &cfg.local,
218 consensus_api.clone(),
219 force_api_secrets.clone(),
220 api_bind,
221 )
222 .await;
223
224 Some(handler)
225 };
226
227 info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
228
229 for (module_id, kind, module) in module_registry.iter_modules() {
230 submit_module_ci_proposals(
231 task_group,
232 db.clone(),
233 module_id,
234 kind.clone(),
235 module.clone(),
236 submission_sender.clone(),
237 );
238 }
239
240 info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
241
242 let api_urls = get_api_urls(&db, &cfg.consensus).await;
243
244 let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
245
246 let ui_listener = TcpListener::bind(ui_bind)
247 .await
248 .expect("Failed to bind dashboard UI");
249
250 task_group.spawn("dashboard-ui", move |handle| async move {
251 axum::serve(ui_listener, ui_service)
252 .with_graceful_shutdown(handle.make_shutdown_rx())
253 .await
254 .expect("Failed to serve dashboard UI");
255 });
256
257 info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
258
259 ConsensusEngine {
262 db,
263 federation_api: DynGlobalApi::from_endpoints(api_urls, &force_api_secrets.get_active())
264 .await?,
265 cfg: cfg.clone(),
266 connections,
267 ord_latency_sender,
268 ci_status_senders,
269 submission_receiver,
270 shutdown_receiver,
271 modules: module_registry,
272 task_group: task_group.clone(),
273 data_dir,
274 db_checkpoint_retention,
275 }
276 .run()
277 .await?;
278
279 if let Some(api_handler) = api_handler {
280 api_handler
281 .stop()
282 .expect("Consensus api should still be running");
283
284 api_handler.stopped().await;
285 }
286
287 Ok(())
288}
289
290async fn start_consensus_api(
291 cfg: &ServerConfigLocal,
292 api: ConsensusApi,
293 force_api_secrets: ApiSecrets,
294 api_bind: SocketAddr,
295) -> ServerHandle {
296 let mut rpc_module = RpcModule::new(api.clone());
297
298 net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
299
300 for (id, _, module) in api.modules.iter_modules() {
301 net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
302 }
303
304 net::api::spawn(
305 "consensus",
306 api_bind,
307 rpc_module,
308 cfg.max_connections,
309 force_api_secrets,
310 )
311 .await
312}
313
314const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
315
316fn submit_module_ci_proposals(
317 task_group: &TaskGroup,
318 db: Database,
319 module_id: ModuleInstanceId,
320 kind: ModuleKind,
321 module: DynServerModule,
322 submission_sender: Sender<ConsensusItem>,
323) {
324 let mut interval = tokio::time::interval(if is_running_in_test_env() {
325 Duration::from_millis(100)
326 } else {
327 Duration::from_secs(1)
328 });
329
330 task_group.spawn(
331 format!("citem_proposals_{module_id}"),
332 move |task_handle| async move {
333 while !task_handle.is_shutting_down() {
334 let module_consensus_items = tokio::time::timeout(
335 CONSENSUS_PROPOSAL_TIMEOUT,
336 module.consensus_proposal(
337 &mut db
338 .begin_transaction_nc()
339 .await
340 .to_ref_with_prefix_module_id(module_id)
341 .0
342 .into_nc(),
343 module_id,
344 ),
345 )
346 .await;
347
348 match module_consensus_items {
349 Ok(items) => {
350 for item in items {
351 if submission_sender
352 .send(ConsensusItem::Module(item))
353 .await
354 .is_err()
355 {
356 warn!(
357 target: LOG_CONSENSUS,
358 module_id,
359 "Unable to submit module consensus item proposal via channel"
360 );
361 }
362 }
363 }
364 Err(..) => {
365 warn!(
366 target: LOG_CONSENSUS,
367 module_id,
368 %kind,
369 "Module failed to propose consensus items on time"
370 );
371 }
372 }
373
374 interval.tick().await;
375 }
376 },
377 );
378}
379
380async fn start_iroh_api(
381 secret_key: iroh::SecretKey,
382 api_bind: SocketAddr,
383 iroh_dns: Option<SafeUrl>,
384 iroh_relays: Vec<SafeUrl>,
385 consensus_api: ConsensusApi,
386 task_group: &TaskGroup,
387) -> anyhow::Result<()> {
388 let endpoint = build_iroh_endpoint(
389 secret_key,
390 api_bind,
391 iroh_dns,
392 iroh_relays,
393 FEDIMINT_API_ALPN,
394 )
395 .await?;
396 task_group.spawn_cancellable(
397 "iroh-api",
398 run_iroh_api(consensus_api, endpoint, task_group.clone()),
399 );
400
401 Ok(())
402}
403
404async fn run_iroh_api(consensus_api: ConsensusApi, endpoint: Endpoint, task_group: TaskGroup) {
405 let core_api = server_endpoints()
406 .into_iter()
407 .map(|endpoint| (endpoint.path.to_string(), endpoint))
408 .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
409
410 let module_api = consensus_api
411 .modules
412 .iter_modules()
413 .map(|(id, _, module)| {
414 let api_endpoints = module
415 .api_endpoints()
416 .into_iter()
417 .map(|endpoint| (endpoint.path.to_string(), endpoint))
418 .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
419
420 (id, api_endpoints)
421 })
422 .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
423
424 let consensus_api = Arc::new(consensus_api);
425 let core_api = Arc::new(core_api);
426 let module_api = Arc::new(module_api);
427
428 loop {
429 match endpoint.accept().await {
430 Some(incoming) => {
431 task_group.spawn_cancellable_silent(
432 "handle-iroh-connection",
433 handle_incoming(
434 consensus_api.clone(),
435 core_api.clone(),
436 module_api.clone(),
437 task_group.clone(),
438 incoming,
439 )
440 .then(|result| async {
441 if let Err(err) = result {
442 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
443 }
444 }),
445 );
446 }
447 None => return,
448 }
449 }
450}
451
452async fn handle_incoming(
453 consensus_api: Arc<ConsensusApi>,
454 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
455 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
456 task_group: TaskGroup,
457 incoming: Incoming,
458) -> anyhow::Result<()> {
459 let connection = incoming.accept()?.await?;
460
461 loop {
462 let (send_stream, recv_stream) = connection.accept_bi().await?;
463
464 task_group.spawn_cancellable_silent(
465 "handle-iroh-request",
466 handle_request(
467 consensus_api.clone(),
468 core_api.clone(),
469 module_api.clone(),
470 send_stream,
471 recv_stream,
472 )
473 .then(|result| async {
474 if let Err(err) = result {
475 warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
476 }
477 }),
478 );
479 }
480}
481
482async fn handle_request(
483 consensus_api: Arc<ConsensusApi>,
484 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
485 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
486 mut send_stream: SendStream,
487 mut recv_stream: RecvStream,
488) -> anyhow::Result<()> {
489 let request = recv_stream.read_to_end(100_000).await?;
490
491 let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
492
493 let response = await_response(consensus_api, core_api, module_api, request).await;
494
495 let response = serde_json::to_vec(&response)?;
496
497 send_stream.write_all(&response).await?;
498
499 send_stream.finish()?;
500
501 Ok(())
502}
503
504async fn await_response(
505 consensus_api: Arc<ConsensusApi>,
506 core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
507 module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
508 request: IrohApiRequest,
509) -> Result<Value, ApiError> {
510 match request.method {
511 ApiMethod::Core(method) => {
512 let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
513
514 let (state, context) = consensus_api.context(&request.request, None).await;
515
516 (endpoint.handler)(state, context, request.request).await
517 }
518 ApiMethod::Module(module_id, method) => {
519 let endpoint = module_api
520 .get(&module_id)
521 .ok_or(ApiError::not_found(module_id.to_string()))?
522 .get(&method)
523 .ok_or(ApiError::not_found(method))?;
524
525 let (state, context) = consensus_api
526 .context(&request.request, Some(module_id))
527 .await;
528
529 (endpoint.handler)(state, context, request.request).await
530 }
531 }
532}