Skip to main content

fedimint_server/consensus/
mod.rs

1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_connectors::ConnectorRegistry;
19use fedimint_core::NumPeers;
20use fedimint_core::config::P2PMessage;
21use fedimint_core::core::{ModuleInstanceId, ModuleKind};
22use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
23use fedimint_core::envs::is_running_in_test_env;
24use fedimint_core::epoch::ConsensusItem;
25use fedimint_core::module::registry::ModuleRegistry;
26use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
27use fedimint_core::net::iroh::build_iroh_endpoint;
28use fedimint_core::net::peers::DynP2PConnections;
29use fedimint_core::task::{TaskGroup, sleep};
30use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
31use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
32use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
33use fedimint_server_core::dashboard_ui::IDashboardApi;
34use fedimint_server_core::migration::apply_migrations_server_dbtx;
35use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
36use futures::FutureExt;
37use iroh::Endpoint;
38use iroh::endpoint::{Incoming, RecvStream, SendStream};
39use jsonrpsee::RpcModule;
40use jsonrpsee::server::ServerHandle;
41use serde_json::Value;
42use tokio::net::TcpListener;
43use tokio::sync::{Semaphore, watch};
44use tracing::{info, warn};
45
46use crate::config::{ServerConfig, ServerConfigLocal};
47use crate::connection_limits::ConnectionLimits;
48use crate::consensus::api::{ConsensusApi, server_endpoints};
49use crate::consensus::engine::ConsensusEngine;
50use crate::db::verify_server_db_integrity_dbtx;
51use crate::metrics::{
52    IROH_API_CONNECTION_DURATION_SECONDS, IROH_API_CONNECTIONS_ACTIVE,
53    IROH_API_REQUEST_DURATION_SECONDS,
54};
55use crate::net::api::announcement::get_api_urls;
56use crate::net::api::{ApiSecrets, HasApiContext};
57use crate::net::p2p::P2PStatusReceivers;
58use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
59
60/// How many txs can be stored in memory before blocking the API
61const TRANSACTION_BUFFER: usize = 1000;
62
63#[allow(clippy::too_many_arguments)]
64pub async fn run(
65    connectors: ConnectorRegistry,
66    connections: DynP2PConnections<P2PMessage>,
67    p2p_status_receivers: P2PStatusReceivers,
68    api_bind: SocketAddr,
69    iroh_dns: Option<SafeUrl>,
70    iroh_relays: Vec<SafeUrl>,
71    cfg: ServerConfig,
72    db: Database,
73    module_init_registry: ServerModuleInitRegistry,
74    task_group: &TaskGroup,
75    force_api_secrets: ApiSecrets,
76    data_dir: PathBuf,
77    code_version_str: String,
78    dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
79    ui_bind: SocketAddr,
80    dashboard_ui_router: DashboardUiRouter,
81    db_checkpoint_retention: u64,
82    iroh_api_limits: ConnectionLimits,
83) -> anyhow::Result<()> {
84    cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
85
86    let mut global_dbtx = db.begin_transaction().await;
87    apply_migrations_server_dbtx(
88        &mut global_dbtx.to_ref_nc(),
89        Arc::new(ServerDbMigrationContext),
90        "fedimint-server".to_string(),
91        get_global_database_migrations(),
92    )
93    .await?;
94
95    update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
96
97    if is_running_in_test_env() {
98        verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
99    }
100    global_dbtx.commit_tx_result().await?;
101
102    let mut modules = BTreeMap::new();
103
104    // TODO: make it work with all transports and federation secrets
105    let global_api = DynGlobalApi::new(
106        connectors.clone(),
107        cfg.consensus
108            .api_endpoints()
109            .iter()
110            .map(|(&peer_id, url)| (peer_id, url.url.clone()))
111            .collect(),
112        None,
113    )?;
114
115    let bitcoin_rpc_connection = ServerBitcoinRpcMonitor::new(
116        dyn_server_bitcoin_rpc,
117        if is_running_in_test_env() {
118            Duration::from_millis(100)
119        } else {
120            Duration::from_mins(1)
121        },
122        task_group,
123    );
124
125    for (module_id, module_cfg) in &cfg.consensus.modules {
126        match module_init_registry.get(&module_cfg.kind) {
127            Some(module_init) => {
128                info!(target: LOG_CORE, "Initialise module {module_id}...");
129
130                let mut dbtx = db.begin_transaction().await;
131                apply_migrations_dbtx(
132                    &mut dbtx.to_ref_nc(),
133                    Arc::new(ServerDbMigrationContext) as Arc<_>,
134                    module_init.module_kind().to_string(),
135                    module_init.get_database_migrations(),
136                    Some(*module_id),
137                    None,
138                )
139                .await?;
140
141                if let Some(used_db_prefixes) = module_init.used_db_prefixes()
142                    && is_running_in_test_env()
143                {
144                    verify_module_db_integrity_dbtx(
145                        &mut dbtx.to_ref_nc(),
146                        *module_id,
147                        module_init.module_kind(),
148                        &used_db_prefixes,
149                    )
150                    .await;
151                }
152                dbtx.commit_tx_result().await?;
153
154                let module = module_init
155                    .init(
156                        NumPeers::from(cfg.consensus.api_endpoints().len()),
157                        cfg.get_module_config(*module_id)?,
158                        db.with_prefix_module_id(*module_id).0,
159                        task_group,
160                        cfg.local.identity,
161                        global_api.with_module(*module_id),
162                        bitcoin_rpc_connection.clone(),
163                    )
164                    .await?;
165
166                modules.insert(*module_id, (module_cfg.kind.clone(), module));
167            }
168            None => bail!("Detected configuration for unsupported module id: {module_id}"),
169        }
170    }
171
172    let module_registry = ModuleRegistry::from(modules);
173
174    let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
175
176    let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
177    let (shutdown_sender, shutdown_receiver) = watch::channel(None);
178    let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
179
180    let mut ci_status_senders = BTreeMap::new();
181    let mut ci_status_receivers = BTreeMap::new();
182
183    for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
184        let (ci_sender, ci_receiver) = watch::channel(None);
185
186        ci_status_senders.insert(peer, ci_sender);
187        ci_status_receivers.insert(peer, ci_receiver);
188    }
189
190    let consensus_api = ConsensusApi {
191        cfg: cfg.clone(),
192        cfg_dir: data_dir.clone(),
193        db: db.clone(),
194        modules: module_registry.clone(),
195        client_cfg: client_cfg.clone(),
196        submission_sender: submission_sender.clone(),
197        shutdown_sender,
198        shutdown_receiver: shutdown_receiver.clone(),
199        supported_api_versions: ServerConfig::supported_api_versions_summary(
200            &cfg.consensus.modules,
201            &module_init_registry,
202        ),
203        p2p_status_receivers,
204        ci_status_receivers,
205        ord_latency_receiver,
206        bitcoin_rpc_connection: bitcoin_rpc_connection.clone(),
207        force_api_secret: force_api_secrets.get_active(),
208        code_version_str,
209        task_group: task_group.clone(),
210    };
211
212    info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
213
214    let api_handler = start_consensus_api(
215        &cfg.local,
216        consensus_api.clone(),
217        force_api_secrets.clone(),
218        api_bind,
219    )
220    .await;
221
222    if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone()
223        && let Err(e) = Box::pin(start_iroh_api(
224            iroh_api_sk,
225            api_bind,
226            iroh_dns,
227            iroh_relays,
228            consensus_api.clone(),
229            task_group,
230            iroh_api_limits,
231        ))
232        .await
233    {
234        // clean up ws api before propagating error
235        api_handler.stop().expect("Just started");
236        api_handler.stopped().await;
237        return Err(e);
238    }
239
240    info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
241
242    for (module_id, kind, module) in module_registry.iter_modules() {
243        submit_module_ci_proposals(
244            task_group,
245            db.clone(),
246            module_id,
247            kind.clone(),
248            module.clone(),
249            submission_sender.clone(),
250        );
251    }
252
253    let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
254
255    let ui_listener = TcpListener::bind(ui_bind)
256        .await
257        .expect("Failed to bind dashboard UI");
258
259    task_group.spawn("dashboard-ui", move |handle| async move {
260        axum::serve(ui_listener, ui_service)
261            .with_graceful_shutdown(handle.make_shutdown_rx())
262            .await
263            .expect("Failed to serve dashboard UI");
264    });
265
266    info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
267
268    loop {
269        match bitcoin_rpc_connection.status() {
270            Some(status) => {
271                if let Some(progress) = status.sync_progress {
272                    if progress >= 0.999 {
273                        break;
274                    }
275
276                    info!(target: LOG_CONSENSUS, "Waiting for bitcoin backend to sync... {progress:.1}%");
277                } else {
278                    break;
279                }
280            }
281            None => {
282                info!(target: LOG_CONSENSUS, "Waiting to connect to bitcoin backend...");
283            }
284        }
285
286        sleep(Duration::from_secs(1)).await;
287    }
288
289    info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
290
291    let api_urls = get_api_urls(&db, &cfg.consensus).await;
292
293    // FIXME: (@leonardo) How should this be handled ?
294    // Using the `Connector::default()` for now!
295    ConsensusEngine {
296        db,
297        federation_api: DynGlobalApi::new(
298            connectors,
299            api_urls,
300            force_api_secrets.get_active().as_deref(),
301        )?,
302        cfg: cfg.clone(),
303        connections,
304        ord_latency_sender,
305        ci_status_senders,
306        submission_receiver,
307        shutdown_receiver,
308        modules: module_registry,
309        task_group: task_group.clone(),
310        data_dir,
311        db_checkpoint_retention,
312    }
313    .run()
314    .await?;
315
316    api_handler
317        .stop()
318        .expect("Consensus api should still be running");
319
320    api_handler.stopped().await;
321
322    Ok(())
323}
324
325async fn start_consensus_api(
326    cfg: &ServerConfigLocal,
327    api: ConsensusApi,
328    force_api_secrets: ApiSecrets,
329    api_bind: SocketAddr,
330) -> ServerHandle {
331    let mut rpc_module = RpcModule::new(api.clone());
332
333    net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
334
335    for (id, _, module) in api.modules.iter_modules() {
336        net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
337    }
338
339    net::api::spawn(
340        "consensus",
341        api_bind,
342        rpc_module,
343        cfg.max_connections,
344        force_api_secrets,
345    )
346    .await
347}
348
349const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
350
351fn submit_module_ci_proposals(
352    task_group: &TaskGroup,
353    db: Database,
354    module_id: ModuleInstanceId,
355    kind: ModuleKind,
356    module: DynServerModule,
357    submission_sender: Sender<ConsensusItem>,
358) {
359    let mut interval = tokio::time::interval(if is_running_in_test_env() {
360        Duration::from_millis(100)
361    } else {
362        Duration::from_secs(1)
363    });
364
365    task_group.spawn(
366        format!("citem_proposals_{module_id}"),
367        move |task_handle| async move {
368            while !task_handle.is_shutting_down() {
369                let module_consensus_items = tokio::time::timeout(
370                    CONSENSUS_PROPOSAL_TIMEOUT,
371                    module.consensus_proposal(
372                        &mut db
373                            .begin_transaction_nc()
374                            .await
375                            .to_ref_with_prefix_module_id(module_id)
376                            .0
377                            .into_nc(),
378                        module_id,
379                    ),
380                )
381                .await;
382
383                match module_consensus_items {
384                    Ok(items) => {
385                        for item in items {
386                            if submission_sender
387                                .send(ConsensusItem::Module(item))
388                                .await
389                                .is_err()
390                            {
391                                warn!(
392                                    target: LOG_CONSENSUS,
393                                    module_id,
394                                    "Unable to submit module consensus item proposal via channel"
395                                );
396                            }
397                        }
398                    }
399                    Err(..) => {
400                        warn!(
401                            target: LOG_CONSENSUS,
402                            module_id,
403                            %kind,
404                            "Module failed to propose consensus items on time"
405                        );
406                    }
407                }
408
409                interval.tick().await;
410            }
411        },
412    );
413}
414
415async fn start_iroh_api(
416    secret_key: iroh::SecretKey,
417    api_bind: SocketAddr,
418    iroh_dns: Option<SafeUrl>,
419    iroh_relays: Vec<SafeUrl>,
420    consensus_api: ConsensusApi,
421    task_group: &TaskGroup,
422    iroh_api_limits: ConnectionLimits,
423) -> anyhow::Result<()> {
424    let endpoint = build_iroh_endpoint(
425        secret_key,
426        api_bind,
427        iroh_dns,
428        iroh_relays,
429        FEDIMINT_API_ALPN,
430    )
431    .await?;
432    task_group.spawn_cancellable(
433        "iroh-api",
434        run_iroh_api(consensus_api, endpoint, task_group.clone(), iroh_api_limits),
435    );
436
437    Ok(())
438}
439
440async fn run_iroh_api(
441    consensus_api: ConsensusApi,
442    endpoint: Endpoint,
443    task_group: TaskGroup,
444    iroh_api_limits: ConnectionLimits,
445) {
446    let core_api = server_endpoints()
447        .into_iter()
448        .map(|endpoint| (endpoint.path.to_string(), endpoint))
449        .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
450
451    let module_api = consensus_api
452        .modules
453        .iter_modules()
454        .map(|(id, _, module)| {
455            let api_endpoints = module
456                .api_endpoints()
457                .into_iter()
458                .map(|endpoint| (endpoint.path.to_string(), endpoint))
459                .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
460
461            (id, api_endpoints)
462        })
463        .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
464
465    let consensus_api = Arc::new(consensus_api);
466    let core_api = Arc::new(core_api);
467    let module_api = Arc::new(module_api);
468    let parallel_connections_limit = Arc::new(Semaphore::new(iroh_api_limits.max_connections));
469
470    loop {
471        match endpoint.accept().await {
472            Some(incoming) => {
473                if parallel_connections_limit.available_permits() == 0 {
474                    warn!(
475                        target: LOG_NET_API,
476                        limit = iroh_api_limits.max_connections,
477                        "Iroh API connection limit reached, blocking new connections"
478                    );
479                }
480                let permit = parallel_connections_limit
481                    .clone()
482                    .acquire_owned()
483                    .await
484                    .expect("semaphore should not be closed");
485                task_group.spawn_cancellable_silent(
486                    "handle-iroh-connection",
487                    handle_incoming(
488                        consensus_api.clone(),
489                        core_api.clone(),
490                        module_api.clone(),
491                        task_group.clone(),
492                        incoming,
493                        permit,
494                        iroh_api_limits.max_requests_per_connection,
495                    )
496                    .then(|result| async {
497                        if let Err(err) = result {
498                            warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
499                        }
500                    }),
501                );
502            }
503            None => return,
504        }
505    }
506}
507
508async fn handle_incoming(
509    consensus_api: Arc<ConsensusApi>,
510    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
511    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
512    task_group: TaskGroup,
513    incoming: Incoming,
514    _connection_permit: tokio::sync::OwnedSemaphorePermit,
515    iroh_api_max_requests_per_connection: usize,
516) -> anyhow::Result<()> {
517    let connection = incoming.accept()?.await?;
518    let parallel_requests_limit = Arc::new(Semaphore::new(iroh_api_max_requests_per_connection));
519
520    IROH_API_CONNECTIONS_ACTIVE.inc();
521    let connection_timer = IROH_API_CONNECTION_DURATION_SECONDS.start_timer();
522    scopeguard::defer! {
523        IROH_API_CONNECTIONS_ACTIVE.dec();
524        connection_timer.observe_duration();
525    }
526
527    loop {
528        let (send_stream, recv_stream) = connection.accept_bi().await?;
529
530        if parallel_requests_limit.available_permits() == 0 {
531            warn!(
532                target: LOG_NET_API,
533                limit = iroh_api_max_requests_per_connection,
534                "Iroh API request limit reached for connection, blocking new requests"
535            );
536        }
537        let permit = parallel_requests_limit
538            .clone()
539            .acquire_owned()
540            .await
541            .expect("semaphore should not be closed");
542        task_group.spawn_cancellable_silent(
543            "handle-iroh-request",
544            handle_request(
545                consensus_api.clone(),
546                core_api.clone(),
547                module_api.clone(),
548                send_stream,
549                recv_stream,
550                permit,
551            )
552            .then(|result| async {
553                if let Err(err) = result {
554                    warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
555                }
556            }),
557        );
558    }
559}
560
561async fn handle_request(
562    consensus_api: Arc<ConsensusApi>,
563    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
564    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
565    mut send_stream: SendStream,
566    mut recv_stream: RecvStream,
567    _request_permit: tokio::sync::OwnedSemaphorePermit,
568) -> anyhow::Result<()> {
569    let request = recv_stream.read_to_end(100_000).await?;
570
571    let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
572
573    let method = request.method.to_string();
574    let timer = IROH_API_REQUEST_DURATION_SECONDS
575        .with_label_values(&[&method])
576        .start_timer();
577
578    let response = await_response(consensus_api, core_api, module_api, request).await;
579
580    timer.observe_duration();
581
582    let response = serde_json::to_vec(&response)?;
583
584    send_stream.write_all(&response).await?;
585
586    send_stream.finish()?;
587
588    Ok(())
589}
590
591async fn await_response(
592    consensus_api: Arc<ConsensusApi>,
593    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
594    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
595    request: IrohApiRequest,
596) -> Result<Value, ApiError> {
597    match request.method {
598        ApiMethod::Core(method) => {
599            let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
600
601            let (state, context) = consensus_api.context(&request.request, None).await;
602
603            (endpoint.handler)(state, context, request.request).await
604        }
605        ApiMethod::Module(module_id, method) => {
606            let endpoint = module_api
607                .get(&module_id)
608                .ok_or(ApiError::not_found(module_id.to_string()))?
609                .get(&method)
610                .ok_or(ApiError::not_found(method))?;
611
612            let (state, context) = consensus_api
613                .context(&request.request, Some(module_id))
614                .await;
615
616            (endpoint.handler)(state, context, request.request).await
617        }
618    }
619}