fedimint_server/consensus/
mod.rs

1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_core::NumPeers;
19use fedimint_core::config::P2PMessage;
20use fedimint_core::core::{ModuleInstanceId, ModuleKind};
21use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
22use fedimint_core::envs::is_running_in_test_env;
23use fedimint_core::epoch::ConsensusItem;
24use fedimint_core::module::registry::ModuleRegistry;
25use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
26use fedimint_core::net::peers::DynP2PConnections;
27use fedimint_core::task::TaskGroup;
28use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
29use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
30use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
31use fedimint_server_core::dashboard_ui::IDashboardApi;
32use fedimint_server_core::migration::apply_migrations_server_dbtx;
33use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
34use futures::FutureExt;
35use iroh::Endpoint;
36use iroh::endpoint::{Incoming, RecvStream, SendStream};
37use jsonrpsee::RpcModule;
38use jsonrpsee::server::ServerHandle;
39use serde_json::Value;
40use tokio::net::TcpListener;
41use tokio::sync::{Semaphore, watch};
42use tracing::{info, warn};
43
44use crate::config::{ServerConfig, ServerConfigLocal};
45use crate::connection_limits::ConnectionLimits;
46use crate::consensus::api::{ConsensusApi, server_endpoints};
47use crate::consensus::engine::ConsensusEngine;
48use crate::db::verify_server_db_integrity_dbtx;
49use crate::net::api::announcement::get_api_urls;
50use crate::net::api::{ApiSecrets, HasApiContext};
51use crate::net::p2p::{P2PConnectionTypeReceivers, P2PStatusReceivers};
52use crate::net::p2p_connector::build_iroh_endpoint;
53use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
54
55/// How many txs can be stored in memory before blocking the API
56const TRANSACTION_BUFFER: usize = 1000;
57
58#[allow(clippy::too_many_arguments)]
59pub async fn run(
60    connections: DynP2PConnections<P2PMessage>,
61    p2p_status_receivers: P2PStatusReceivers,
62    p2p_connection_type_receivers: P2PConnectionTypeReceivers,
63    api_bind: SocketAddr,
64    iroh_dns: Option<SafeUrl>,
65    iroh_relays: Vec<SafeUrl>,
66    cfg: ServerConfig,
67    db: Database,
68    module_init_registry: ServerModuleInitRegistry,
69    task_group: &TaskGroup,
70    force_api_secrets: ApiSecrets,
71    data_dir: PathBuf,
72    code_version_str: String,
73    dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
74    ui_bind: SocketAddr,
75    dashboard_ui_router: DashboardUiRouter,
76    db_checkpoint_retention: u64,
77    iroh_api_limits: ConnectionLimits,
78) -> anyhow::Result<()> {
79    cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
80
81    let mut global_dbtx = db.begin_transaction().await;
82    apply_migrations_server_dbtx(
83        &mut global_dbtx.to_ref_nc(),
84        Arc::new(ServerDbMigrationContext),
85        "fedimint-server".to_string(),
86        get_global_database_migrations(),
87    )
88    .await?;
89
90    update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
91
92    if is_running_in_test_env() {
93        verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
94    }
95    global_dbtx.commit_tx_result().await?;
96
97    let mut modules = BTreeMap::new();
98
99    // TODO: make it work with all transports and federation secrets
100    let global_api = DynGlobalApi::from_endpoints(
101        cfg.consensus
102            .api_endpoints()
103            .iter()
104            .map(|(&peer_id, url)| (peer_id, url.url.clone())),
105        &None,
106        true,
107        true,
108    )
109    .await?;
110
111    let server_bitcoin_rpc_monitor = ServerBitcoinRpcMonitor::new(
112        dyn_server_bitcoin_rpc,
113        if is_running_in_test_env() {
114            Duration::from_millis(100)
115        } else {
116            Duration::from_secs(60)
117        },
118        task_group,
119    );
120
121    for (module_id, module_cfg) in &cfg.consensus.modules {
122        match module_init_registry.get(&module_cfg.kind) {
123            Some(module_init) => {
124                info!(target: LOG_CORE, "Initialise module {module_id}...");
125
126                let mut dbtx = db.begin_transaction().await;
127                apply_migrations_dbtx(
128                    &mut dbtx.to_ref_nc(),
129                    Arc::new(ServerDbMigrationContext) as Arc<_>,
130                    module_init.module_kind().to_string(),
131                    module_init.get_database_migrations(),
132                    Some(*module_id),
133                    None,
134                )
135                .await?;
136
137                if let Some(used_db_prefixes) = module_init.used_db_prefixes()
138                    && is_running_in_test_env()
139                {
140                    verify_module_db_integrity_dbtx(
141                        &mut dbtx.to_ref_nc(),
142                        *module_id,
143                        module_init.module_kind(),
144                        &used_db_prefixes,
145                    )
146                    .await;
147                }
148                dbtx.commit_tx_result().await?;
149
150                let module = module_init
151                    .init(
152                        NumPeers::from(cfg.consensus.api_endpoints().len()),
153                        cfg.get_module_config(*module_id)?,
154                        db.with_prefix_module_id(*module_id).0,
155                        task_group,
156                        cfg.local.identity,
157                        global_api.with_module(*module_id),
158                        server_bitcoin_rpc_monitor.clone(),
159                    )
160                    .await?;
161
162                modules.insert(*module_id, (module_cfg.kind.clone(), module));
163            }
164            None => bail!("Detected configuration for unsupported module id: {module_id}"),
165        }
166    }
167
168    let module_registry = ModuleRegistry::from(modules);
169
170    let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
171
172    let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
173    let (shutdown_sender, shutdown_receiver) = watch::channel(None);
174    let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
175
176    let mut ci_status_senders = BTreeMap::new();
177    let mut ci_status_receivers = BTreeMap::new();
178
179    for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
180        let (ci_sender, ci_receiver) = watch::channel(None);
181
182        ci_status_senders.insert(peer, ci_sender);
183        ci_status_receivers.insert(peer, ci_receiver);
184    }
185
186    let consensus_api = ConsensusApi {
187        cfg: cfg.clone(),
188        cfg_dir: data_dir.clone(),
189        db: db.clone(),
190        modules: module_registry.clone(),
191        client_cfg: client_cfg.clone(),
192        submission_sender: submission_sender.clone(),
193        shutdown_sender,
194        shutdown_receiver: shutdown_receiver.clone(),
195        supported_api_versions: ServerConfig::supported_api_versions_summary(
196            &cfg.consensus.modules,
197            &module_init_registry,
198        ),
199        p2p_status_receivers,
200        p2p_connection_type_receivers,
201        ci_status_receivers,
202        ord_latency_receiver,
203        bitcoin_rpc_connection: server_bitcoin_rpc_monitor,
204        force_api_secret: force_api_secrets.get_active(),
205        code_version_str,
206        task_group: task_group.clone(),
207    };
208
209    info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
210
211    let api_handler = start_consensus_api(
212        &cfg.local,
213        consensus_api.clone(),
214        force_api_secrets.clone(),
215        api_bind,
216    )
217    .await;
218
219    if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone()
220        && let Err(e) = Box::pin(start_iroh_api(
221            iroh_api_sk,
222            api_bind,
223            iroh_dns,
224            iroh_relays,
225            consensus_api.clone(),
226            task_group,
227            iroh_api_limits,
228        ))
229        .await
230    {
231        // clean up ws api before propagating error
232        api_handler.stop().expect("Just started");
233        api_handler.stopped().await;
234        return Err(e);
235    }
236
237    info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
238
239    for (module_id, kind, module) in module_registry.iter_modules() {
240        submit_module_ci_proposals(
241            task_group,
242            db.clone(),
243            module_id,
244            kind.clone(),
245            module.clone(),
246            submission_sender.clone(),
247        );
248    }
249
250    info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
251
252    let api_urls = get_api_urls(&db, &cfg.consensus).await;
253
254    let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
255
256    let ui_listener = TcpListener::bind(ui_bind)
257        .await
258        .expect("Failed to bind dashboard UI");
259
260    task_group.spawn("dashboard-ui", move |handle| async move {
261        axum::serve(ui_listener, ui_service)
262            .with_graceful_shutdown(handle.make_shutdown_rx())
263            .await
264            .expect("Failed to serve dashboard UI");
265    });
266
267    info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
268
269    // FIXME: (@leonardo) How should this be handled ?
270    // Using the `Connector::default()` for now!
271    ConsensusEngine {
272        db,
273        federation_api: DynGlobalApi::from_endpoints(
274            api_urls,
275            &force_api_secrets.get_active(),
276            true,
277            true,
278        )
279        .await?,
280        cfg: cfg.clone(),
281        connections,
282        ord_latency_sender,
283        ci_status_senders,
284        submission_receiver,
285        shutdown_receiver,
286        modules: module_registry,
287        task_group: task_group.clone(),
288        data_dir,
289        db_checkpoint_retention,
290    }
291    .run()
292    .await?;
293
294    api_handler
295        .stop()
296        .expect("Consensus api should still be running");
297
298    api_handler.stopped().await;
299
300    Ok(())
301}
302
303async fn start_consensus_api(
304    cfg: &ServerConfigLocal,
305    api: ConsensusApi,
306    force_api_secrets: ApiSecrets,
307    api_bind: SocketAddr,
308) -> ServerHandle {
309    let mut rpc_module = RpcModule::new(api.clone());
310
311    net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
312
313    for (id, _, module) in api.modules.iter_modules() {
314        net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
315    }
316
317    net::api::spawn(
318        "consensus",
319        api_bind,
320        rpc_module,
321        cfg.max_connections,
322        force_api_secrets,
323    )
324    .await
325}
326
327const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
328
329fn submit_module_ci_proposals(
330    task_group: &TaskGroup,
331    db: Database,
332    module_id: ModuleInstanceId,
333    kind: ModuleKind,
334    module: DynServerModule,
335    submission_sender: Sender<ConsensusItem>,
336) {
337    let mut interval = tokio::time::interval(if is_running_in_test_env() {
338        Duration::from_millis(100)
339    } else {
340        Duration::from_secs(1)
341    });
342
343    task_group.spawn(
344        format!("citem_proposals_{module_id}"),
345        move |task_handle| async move {
346            while !task_handle.is_shutting_down() {
347                let module_consensus_items = tokio::time::timeout(
348                    CONSENSUS_PROPOSAL_TIMEOUT,
349                    module.consensus_proposal(
350                        &mut db
351                            .begin_transaction_nc()
352                            .await
353                            .to_ref_with_prefix_module_id(module_id)
354                            .0
355                            .into_nc(),
356                        module_id,
357                    ),
358                )
359                .await;
360
361                match module_consensus_items {
362                    Ok(items) => {
363                        for item in items {
364                            if submission_sender
365                                .send(ConsensusItem::Module(item))
366                                .await
367                                .is_err()
368                            {
369                                warn!(
370                                    target: LOG_CONSENSUS,
371                                    module_id,
372                                    "Unable to submit module consensus item proposal via channel"
373                                );
374                            }
375                        }
376                    }
377                    Err(..) => {
378                        warn!(
379                            target: LOG_CONSENSUS,
380                            module_id,
381                            %kind,
382                            "Module failed to propose consensus items on time"
383                        );
384                    }
385                }
386
387                interval.tick().await;
388            }
389        },
390    );
391}
392
393async fn start_iroh_api(
394    secret_key: iroh::SecretKey,
395    api_bind: SocketAddr,
396    iroh_dns: Option<SafeUrl>,
397    iroh_relays: Vec<SafeUrl>,
398    consensus_api: ConsensusApi,
399    task_group: &TaskGroup,
400    iroh_api_limits: ConnectionLimits,
401) -> anyhow::Result<()> {
402    let endpoint = build_iroh_endpoint(
403        secret_key,
404        api_bind,
405        iroh_dns,
406        iroh_relays,
407        FEDIMINT_API_ALPN,
408    )
409    .await?;
410    task_group.spawn_cancellable(
411        "iroh-api",
412        run_iroh_api(consensus_api, endpoint, task_group.clone(), iroh_api_limits),
413    );
414
415    Ok(())
416}
417
418async fn run_iroh_api(
419    consensus_api: ConsensusApi,
420    endpoint: Endpoint,
421    task_group: TaskGroup,
422    iroh_api_limits: ConnectionLimits,
423) {
424    let core_api = server_endpoints()
425        .into_iter()
426        .map(|endpoint| (endpoint.path.to_string(), endpoint))
427        .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
428
429    let module_api = consensus_api
430        .modules
431        .iter_modules()
432        .map(|(id, _, module)| {
433            let api_endpoints = module
434                .api_endpoints()
435                .into_iter()
436                .map(|endpoint| (endpoint.path.to_string(), endpoint))
437                .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
438
439            (id, api_endpoints)
440        })
441        .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
442
443    let consensus_api = Arc::new(consensus_api);
444    let core_api = Arc::new(core_api);
445    let module_api = Arc::new(module_api);
446    let parallel_connections_limit = Arc::new(Semaphore::new(iroh_api_limits.max_connections));
447
448    loop {
449        match endpoint.accept().await {
450            Some(incoming) => {
451                if parallel_connections_limit.available_permits() == 0 {
452                    warn!(
453                        target: LOG_NET_API,
454                        limit = iroh_api_limits.max_connections,
455                        "Iroh API connection limit reached, blocking new connections"
456                    );
457                }
458                let permit = parallel_connections_limit
459                    .clone()
460                    .acquire_owned()
461                    .await
462                    .expect("semaphore should not be closed");
463                task_group.spawn_cancellable_silent(
464                    "handle-iroh-connection",
465                    handle_incoming(
466                        consensus_api.clone(),
467                        core_api.clone(),
468                        module_api.clone(),
469                        task_group.clone(),
470                        incoming,
471                        permit,
472                        iroh_api_limits.max_requests_per_connection,
473                    )
474                    .then(|result| async {
475                        if let Err(err) = result {
476                            warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
477                        }
478                    }),
479                );
480            }
481            None => return,
482        }
483    }
484}
485
486async fn handle_incoming(
487    consensus_api: Arc<ConsensusApi>,
488    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
489    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
490    task_group: TaskGroup,
491    incoming: Incoming,
492    _connection_permit: tokio::sync::OwnedSemaphorePermit,
493    iroh_api_max_requests_per_connection: usize,
494) -> anyhow::Result<()> {
495    let connection = incoming.accept()?.await?;
496    let parallel_requests_limit = Arc::new(Semaphore::new(iroh_api_max_requests_per_connection));
497
498    loop {
499        let (send_stream, recv_stream) = connection.accept_bi().await?;
500
501        if parallel_requests_limit.available_permits() == 0 {
502            warn!(
503                target: LOG_NET_API,
504                limit = iroh_api_max_requests_per_connection,
505                "Iroh API request limit reached for connection, blocking new requests"
506            );
507        }
508        let permit = parallel_requests_limit
509            .clone()
510            .acquire_owned()
511            .await
512            .expect("semaphore should not be closed");
513        task_group.spawn_cancellable_silent(
514            "handle-iroh-request",
515            handle_request(
516                consensus_api.clone(),
517                core_api.clone(),
518                module_api.clone(),
519                send_stream,
520                recv_stream,
521                permit,
522            )
523            .then(|result| async {
524                if let Err(err) = result {
525                    warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
526                }
527            }),
528        );
529    }
530}
531
532async fn handle_request(
533    consensus_api: Arc<ConsensusApi>,
534    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
535    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
536    mut send_stream: SendStream,
537    mut recv_stream: RecvStream,
538    _request_permit: tokio::sync::OwnedSemaphorePermit,
539) -> anyhow::Result<()> {
540    let request = recv_stream.read_to_end(100_000).await?;
541
542    let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
543
544    let response = await_response(consensus_api, core_api, module_api, request).await;
545
546    let response = serde_json::to_vec(&response)?;
547
548    send_stream.write_all(&response).await?;
549
550    send_stream.finish()?;
551
552    Ok(())
553}
554
555async fn await_response(
556    consensus_api: Arc<ConsensusApi>,
557    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
558    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
559    request: IrohApiRequest,
560) -> Result<Value, ApiError> {
561    match request.method {
562        ApiMethod::Core(method) => {
563            let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
564
565            let (state, context) = consensus_api.context(&request.request, None).await;
566
567            (endpoint.handler)(state, context, request.request).await
568        }
569        ApiMethod::Module(module_id, method) => {
570            let endpoint = module_api
571                .get(&module_id)
572                .ok_or(ApiError::not_found(module_id.to_string()))?
573                .get(&method)
574                .ok_or(ApiError::not_found(method))?;
575
576            let (state, context) = consensus_api
577                .context(&request.request, Some(module_id))
578                .await;
579
580            (endpoint.handler)(state, context, request.request).await
581        }
582    }
583}