Skip to main content

fedimint_server/consensus/
mod.rs

1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_connectors::ConnectorRegistry;
19use fedimint_core::NumPeers;
20use fedimint_core::config::P2PMessage;
21use fedimint_core::core::{ModuleInstanceId, ModuleKind};
22use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
23use fedimint_core::envs::is_running_in_test_env;
24use fedimint_core::epoch::ConsensusItem;
25use fedimint_core::module::registry::ModuleRegistry;
26use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
27use fedimint_core::net::iroh::build_iroh_endpoint;
28use fedimint_core::net::peers::DynP2PConnections;
29use fedimint_core::task::{TaskGroup, sleep};
30use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
31use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
32use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
33use fedimint_server_core::dashboard_ui::IDashboardApi;
34use fedimint_server_core::migration::apply_migrations_server_dbtx;
35use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
36use futures::FutureExt;
37use iroh::Endpoint;
38use iroh::endpoint::{Incoming, RecvStream, SendStream};
39use jsonrpsee::RpcModule;
40use jsonrpsee::server::ServerHandle;
41use serde_json::Value;
42use tokio::net::TcpListener;
43use tokio::sync::{Semaphore, watch};
44use tracing::{info, warn};
45
46use crate::config::{ServerConfig, ServerConfigLocal};
47use crate::connection_limits::ConnectionLimits;
48use crate::consensus::api::{ConsensusApi, server_endpoints};
49use crate::consensus::engine::ConsensusEngine;
50use crate::db::verify_server_db_integrity_dbtx;
51use crate::metrics::{
52    IROH_API_CONNECTION_DURATION_SECONDS, IROH_API_CONNECTIONS_ACTIVE,
53    IROH_API_REQUEST_DURATION_SECONDS,
54};
55use crate::net::api::announcement::get_api_urls;
56use crate::net::api::{ApiSecrets, HasApiContext};
57use crate::net::p2p::P2PStatusReceivers;
58use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
59
60/// How many txs can be stored in memory before blocking the API
61const TRANSACTION_BUFFER: usize = 1000;
62
63#[allow(clippy::too_many_arguments)]
64pub async fn run(
65    connectors: ConnectorRegistry,
66    connections: DynP2PConnections<P2PMessage>,
67    p2p_status_receivers: P2PStatusReceivers,
68    api_bind: SocketAddr,
69    iroh_dns: Option<SafeUrl>,
70    iroh_relays: Vec<SafeUrl>,
71    cfg: ServerConfig,
72    db: Database,
73    module_init_registry: ServerModuleInitRegistry,
74    task_group: &TaskGroup,
75    force_api_secrets: ApiSecrets,
76    data_dir: PathBuf,
77    code_version_str: String,
78    code_version_hash: String,
79    dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
80    ui_bind: SocketAddr,
81    dashboard_ui_router: DashboardUiRouter,
82    db_checkpoint_retention: u64,
83    iroh_api_limits: ConnectionLimits,
84) -> anyhow::Result<()> {
85    cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
86
87    let mut global_dbtx = db.begin_transaction().await;
88    apply_migrations_server_dbtx(
89        &mut global_dbtx.to_ref_nc(),
90        Arc::new(ServerDbMigrationContext),
91        "fedimint-server".to_string(),
92        get_global_database_migrations(),
93    )
94    .await?;
95
96    update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
97
98    if is_running_in_test_env() {
99        verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
100    }
101    global_dbtx.commit_tx_result().await?;
102
103    let mut modules = BTreeMap::new();
104
105    // TODO: make it work with all transports and federation secrets
106    let global_api = DynGlobalApi::new(
107        connectors.clone(),
108        cfg.consensus
109            .api_endpoints()
110            .iter()
111            .map(|(&peer_id, url)| (peer_id, url.url.clone()))
112            .collect(),
113        None,
114    )?;
115
116    let bitcoin_rpc_connection = ServerBitcoinRpcMonitor::new(
117        dyn_server_bitcoin_rpc,
118        if is_running_in_test_env() {
119            Duration::from_millis(100)
120        } else {
121            Duration::from_mins(1)
122        },
123        task_group,
124    );
125
126    for (module_id, module_cfg) in &cfg.consensus.modules {
127        match module_init_registry.get(&module_cfg.kind) {
128            Some(module_init) => {
129                info!(target: LOG_CORE, "Initialise module {module_id}...");
130
131                let mut dbtx = db.begin_transaction().await;
132                apply_migrations_dbtx(
133                    &mut dbtx.to_ref_nc(),
134                    Arc::new(ServerDbMigrationContext) as Arc<_>,
135                    module_init.module_kind().to_string(),
136                    module_init.get_database_migrations(),
137                    Some(*module_id),
138                    None,
139                )
140                .await?;
141
142                if let Some(used_db_prefixes) = module_init.used_db_prefixes()
143                    && is_running_in_test_env()
144                {
145                    verify_module_db_integrity_dbtx(
146                        &mut dbtx.to_ref_nc(),
147                        *module_id,
148                        module_init.module_kind(),
149                        &used_db_prefixes,
150                    )
151                    .await;
152                }
153                dbtx.commit_tx_result().await?;
154
155                let module = module_init
156                    .init(
157                        NumPeers::from(cfg.consensus.api_endpoints().len()),
158                        cfg.get_module_config(*module_id)?,
159                        db.with_prefix_module_id(*module_id).0,
160                        task_group,
161                        cfg.local.identity,
162                        global_api.with_module(*module_id),
163                        bitcoin_rpc_connection.clone(),
164                    )
165                    .await?;
166
167                modules.insert(*module_id, (module_cfg.kind.clone(), module));
168            }
169            None => bail!("Detected configuration for unsupported module id: {module_id}"),
170        }
171    }
172
173    let module_registry = ModuleRegistry::from(modules);
174
175    let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
176
177    let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
178    let (shutdown_sender, shutdown_receiver) = watch::channel(None);
179    let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
180
181    let mut ci_status_senders = BTreeMap::new();
182    let mut ci_status_receivers = BTreeMap::new();
183
184    for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
185        let (ci_sender, ci_receiver) = watch::channel(None);
186
187        ci_status_senders.insert(peer, ci_sender);
188        ci_status_receivers.insert(peer, ci_receiver);
189    }
190
191    let consensus_api = ConsensusApi {
192        cfg: cfg.clone(),
193        cfg_dir: data_dir.clone(),
194        db: db.clone(),
195        modules: module_registry.clone(),
196        client_cfg: client_cfg.clone(),
197        submission_sender: submission_sender.clone(),
198        shutdown_sender,
199        shutdown_receiver: shutdown_receiver.clone(),
200        supported_api_versions: ServerConfig::supported_api_versions_summary(
201            &cfg.consensus.modules,
202            &module_init_registry,
203        ),
204        p2p_status_receivers,
205        ci_status_receivers,
206        ord_latency_receiver,
207        bitcoin_rpc_connection: bitcoin_rpc_connection.clone(),
208        force_api_secret: force_api_secrets.get_active(),
209        code_version_str,
210        code_version_hash,
211        task_group: task_group.clone(),
212    };
213
214    info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
215
216    let api_handler = start_consensus_api(
217        &cfg.local,
218        consensus_api.clone(),
219        force_api_secrets.clone(),
220        api_bind,
221    )
222    .await;
223
224    if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone()
225        && let Err(e) = Box::pin(start_iroh_api(
226            iroh_api_sk,
227            api_bind,
228            iroh_dns,
229            iroh_relays,
230            consensus_api.clone(),
231            task_group,
232            iroh_api_limits,
233        ))
234        .await
235    {
236        // clean up ws api before propagating error
237        api_handler.stop().expect("Just started");
238        api_handler.stopped().await;
239        return Err(e);
240    }
241
242    info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
243
244    for (module_id, kind, module) in module_registry.iter_modules() {
245        submit_module_ci_proposals(
246            task_group,
247            db.clone(),
248            module_id,
249            kind.clone(),
250            module.clone(),
251            submission_sender.clone(),
252        );
253    }
254
255    let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
256
257    let ui_listener = TcpListener::bind(ui_bind)
258        .await
259        .expect("Failed to bind dashboard UI");
260
261    task_group.spawn("dashboard-ui", move |handle| async move {
262        axum::serve(ui_listener, ui_service)
263            .with_graceful_shutdown(handle.make_shutdown_rx())
264            .await
265            .expect("Failed to serve dashboard UI");
266    });
267
268    info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
269
270    loop {
271        match bitcoin_rpc_connection.status() {
272            Some(status) => {
273                if let Some(progress) = status.sync_progress {
274                    if progress >= 0.999 {
275                        break;
276                    }
277
278                    info!(target: LOG_CONSENSUS, "Waiting for bitcoin backend to sync... {progress:.1}%");
279                } else {
280                    break;
281                }
282            }
283            None => {
284                info!(target: LOG_CONSENSUS, "Waiting to connect to bitcoin backend...");
285            }
286        }
287
288        sleep(Duration::from_secs(1)).await;
289    }
290
291    info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
292
293    let api_urls = get_api_urls(&db, &cfg.consensus).await;
294
295    // FIXME: (@leonardo) How should this be handled ?
296    // Using the `Connector::default()` for now!
297    ConsensusEngine {
298        db,
299        federation_api: DynGlobalApi::new(
300            connectors,
301            api_urls,
302            force_api_secrets.get_active().as_deref(),
303        )?,
304        cfg: cfg.clone(),
305        connections,
306        ord_latency_sender,
307        ci_status_senders,
308        submission_receiver,
309        shutdown_receiver,
310        modules: module_registry,
311        task_group: task_group.clone(),
312        data_dir,
313        db_checkpoint_retention,
314    }
315    .run()
316    .await?;
317
318    api_handler
319        .stop()
320        .expect("Consensus api should still be running");
321
322    api_handler.stopped().await;
323
324    Ok(())
325}
326
327async fn start_consensus_api(
328    cfg: &ServerConfigLocal,
329    api: ConsensusApi,
330    force_api_secrets: ApiSecrets,
331    api_bind: SocketAddr,
332) -> ServerHandle {
333    let mut rpc_module = RpcModule::new(api.clone());
334
335    net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
336
337    for (id, _, module) in api.modules.iter_modules() {
338        net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
339    }
340
341    net::api::spawn(
342        "consensus",
343        api_bind,
344        rpc_module,
345        cfg.max_connections,
346        force_api_secrets,
347    )
348    .await
349}
350
351const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
352
353fn submit_module_ci_proposals(
354    task_group: &TaskGroup,
355    db: Database,
356    module_id: ModuleInstanceId,
357    kind: ModuleKind,
358    module: DynServerModule,
359    submission_sender: Sender<ConsensusItem>,
360) {
361    let mut interval = tokio::time::interval(if is_running_in_test_env() {
362        Duration::from_millis(100)
363    } else {
364        Duration::from_secs(1)
365    });
366
367    task_group.spawn(
368        format!("citem_proposals_{module_id}"),
369        move |task_handle| async move {
370            while !task_handle.is_shutting_down() {
371                let module_consensus_items = tokio::time::timeout(
372                    CONSENSUS_PROPOSAL_TIMEOUT,
373                    module.consensus_proposal(
374                        &mut db
375                            .begin_transaction_nc()
376                            .await
377                            .to_ref_with_prefix_module_id(module_id)
378                            .0
379                            .into_nc(),
380                        module_id,
381                    ),
382                )
383                .await;
384
385                match module_consensus_items {
386                    Ok(items) => {
387                        for item in items {
388                            if submission_sender
389                                .send(ConsensusItem::Module(item))
390                                .await
391                                .is_err()
392                            {
393                                warn!(
394                                    target: LOG_CONSENSUS,
395                                    module_id,
396                                    "Unable to submit module consensus item proposal via channel"
397                                );
398                            }
399                        }
400                    }
401                    Err(..) => {
402                        warn!(
403                            target: LOG_CONSENSUS,
404                            module_id,
405                            %kind,
406                            "Module failed to propose consensus items on time"
407                        );
408                    }
409                }
410
411                interval.tick().await;
412            }
413        },
414    );
415}
416
417async fn start_iroh_api(
418    secret_key: iroh::SecretKey,
419    api_bind: SocketAddr,
420    iroh_dns: Option<SafeUrl>,
421    iroh_relays: Vec<SafeUrl>,
422    consensus_api: ConsensusApi,
423    task_group: &TaskGroup,
424    iroh_api_limits: ConnectionLimits,
425) -> anyhow::Result<()> {
426    let endpoint = build_iroh_endpoint(
427        secret_key,
428        api_bind,
429        iroh_dns,
430        iroh_relays,
431        FEDIMINT_API_ALPN,
432    )
433    .await?;
434    task_group.spawn_cancellable(
435        "iroh-api",
436        run_iroh_api(consensus_api, endpoint, task_group.clone(), iroh_api_limits),
437    );
438
439    Ok(())
440}
441
442async fn run_iroh_api(
443    consensus_api: ConsensusApi,
444    endpoint: Endpoint,
445    task_group: TaskGroup,
446    iroh_api_limits: ConnectionLimits,
447) {
448    let core_api = server_endpoints()
449        .into_iter()
450        .map(|endpoint| (endpoint.path.to_string(), endpoint))
451        .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
452
453    let module_api = consensus_api
454        .modules
455        .iter_modules()
456        .map(|(id, _, module)| {
457            let api_endpoints = module
458                .api_endpoints()
459                .into_iter()
460                .map(|endpoint| (endpoint.path.to_string(), endpoint))
461                .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
462
463            (id, api_endpoints)
464        })
465        .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
466
467    let consensus_api = Arc::new(consensus_api);
468    let core_api = Arc::new(core_api);
469    let module_api = Arc::new(module_api);
470    let parallel_connections_limit = Arc::new(Semaphore::new(iroh_api_limits.max_connections));
471
472    loop {
473        match endpoint.accept().await {
474            Some(incoming) => {
475                if parallel_connections_limit.available_permits() == 0 {
476                    warn!(
477                        target: LOG_NET_API,
478                        limit = iroh_api_limits.max_connections,
479                        "Iroh API connection limit reached, blocking new connections"
480                    );
481                }
482                let permit = parallel_connections_limit
483                    .clone()
484                    .acquire_owned()
485                    .await
486                    .expect("semaphore should not be closed");
487                task_group.spawn_cancellable_silent(
488                    "handle-iroh-connection",
489                    handle_incoming(
490                        consensus_api.clone(),
491                        core_api.clone(),
492                        module_api.clone(),
493                        task_group.clone(),
494                        incoming,
495                        permit,
496                        iroh_api_limits.max_requests_per_connection,
497                    )
498                    .then(|result| async {
499                        if let Err(err) = result {
500                            warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
501                        }
502                    }),
503                );
504            }
505            None => return,
506        }
507    }
508}
509
510async fn handle_incoming(
511    consensus_api: Arc<ConsensusApi>,
512    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
513    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
514    task_group: TaskGroup,
515    incoming: Incoming,
516    _connection_permit: tokio::sync::OwnedSemaphorePermit,
517    iroh_api_max_requests_per_connection: usize,
518) -> anyhow::Result<()> {
519    let connection = incoming.accept()?.await?;
520    let parallel_requests_limit = Arc::new(Semaphore::new(iroh_api_max_requests_per_connection));
521
522    IROH_API_CONNECTIONS_ACTIVE.inc();
523    let connection_timer = IROH_API_CONNECTION_DURATION_SECONDS.start_timer();
524    scopeguard::defer! {
525        IROH_API_CONNECTIONS_ACTIVE.dec();
526        connection_timer.observe_duration();
527    }
528
529    loop {
530        let (send_stream, recv_stream) = connection.accept_bi().await?;
531
532        if parallel_requests_limit.available_permits() == 0 {
533            warn!(
534                target: LOG_NET_API,
535                limit = iroh_api_max_requests_per_connection,
536                "Iroh API request limit reached for connection, blocking new requests"
537            );
538        }
539        let permit = parallel_requests_limit
540            .clone()
541            .acquire_owned()
542            .await
543            .expect("semaphore should not be closed");
544        task_group.spawn_cancellable_silent(
545            "handle-iroh-request",
546            handle_request(
547                consensus_api.clone(),
548                core_api.clone(),
549                module_api.clone(),
550                send_stream,
551                recv_stream,
552                permit,
553            )
554            .then(|result| async {
555                if let Err(err) = result {
556                    warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
557                }
558            }),
559        );
560    }
561}
562
563async fn handle_request(
564    consensus_api: Arc<ConsensusApi>,
565    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
566    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
567    mut send_stream: SendStream,
568    mut recv_stream: RecvStream,
569    _request_permit: tokio::sync::OwnedSemaphorePermit,
570) -> anyhow::Result<()> {
571    let request = recv_stream.read_to_end(100_000).await?;
572
573    let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
574
575    let method = request.method.to_string();
576    let timer = IROH_API_REQUEST_DURATION_SECONDS
577        .with_label_values(&[&method])
578        .start_timer();
579
580    let response = await_response(consensus_api, core_api, module_api, request).await;
581
582    timer.observe_duration();
583
584    let response = serde_json::to_vec(&response)?;
585
586    send_stream.write_all(&response).await?;
587
588    send_stream.finish()?;
589
590    Ok(())
591}
592
593async fn await_response(
594    consensus_api: Arc<ConsensusApi>,
595    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
596    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
597    request: IrohApiRequest,
598) -> Result<Value, ApiError> {
599    match request.method {
600        ApiMethod::Core(method) => {
601            let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
602
603            let (state, context) = consensus_api.context(&request.request, None).await;
604
605            (endpoint.handler)(state, context, request.request).await
606        }
607        ApiMethod::Module(module_id, method) => {
608            let endpoint = module_api
609                .get(&module_id)
610                .ok_or(ApiError::not_found(module_id.to_string()))?
611                .get(&method)
612                .ok_or(ApiError::not_found(method))?;
613
614            let (state, context) = consensus_api
615                .context(&request.request, Some(module_id))
616                .await;
617
618            (endpoint.handler)(state, context, request.request).await
619        }
620    }
621}