fedimint_server/consensus/
mod.rs

1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_connectors::ConnectorRegistry;
19use fedimint_core::NumPeers;
20use fedimint_core::config::P2PMessage;
21use fedimint_core::core::{ModuleInstanceId, ModuleKind};
22use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
23use fedimint_core::envs::is_running_in_test_env;
24use fedimint_core::epoch::ConsensusItem;
25use fedimint_core::module::registry::ModuleRegistry;
26use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
27use fedimint_core::net::iroh::build_iroh_endpoint;
28use fedimint_core::net::peers::DynP2PConnections;
29use fedimint_core::task::{TaskGroup, sleep};
30use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
31use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
32use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
33use fedimint_server_core::dashboard_ui::IDashboardApi;
34use fedimint_server_core::migration::apply_migrations_server_dbtx;
35use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
36use futures::FutureExt;
37use iroh::Endpoint;
38use iroh::endpoint::{Incoming, RecvStream, SendStream};
39use jsonrpsee::RpcModule;
40use jsonrpsee::server::ServerHandle;
41use serde_json::Value;
42use tokio::net::TcpListener;
43use tokio::sync::{Semaphore, watch};
44use tracing::{info, warn};
45
46use crate::config::{ServerConfig, ServerConfigLocal};
47use crate::connection_limits::ConnectionLimits;
48use crate::consensus::api::{ConsensusApi, server_endpoints};
49use crate::consensus::engine::ConsensusEngine;
50use crate::db::verify_server_db_integrity_dbtx;
51use crate::net::api::announcement::get_api_urls;
52use crate::net::api::{ApiSecrets, HasApiContext};
53use crate::net::p2p::P2PStatusReceivers;
54use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
55
56/// How many txs can be stored in memory before blocking the API
57const TRANSACTION_BUFFER: usize = 1000;
58
59#[allow(clippy::too_many_arguments)]
60pub async fn run(
61    connectors: ConnectorRegistry,
62    connections: DynP2PConnections<P2PMessage>,
63    p2p_status_receivers: P2PStatusReceivers,
64    api_bind: SocketAddr,
65    iroh_dns: Option<SafeUrl>,
66    iroh_relays: Vec<SafeUrl>,
67    cfg: ServerConfig,
68    db: Database,
69    module_init_registry: ServerModuleInitRegistry,
70    task_group: &TaskGroup,
71    force_api_secrets: ApiSecrets,
72    data_dir: PathBuf,
73    code_version_str: String,
74    dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
75    ui_bind: SocketAddr,
76    dashboard_ui_router: DashboardUiRouter,
77    db_checkpoint_retention: u64,
78    iroh_api_limits: ConnectionLimits,
79) -> anyhow::Result<()> {
80    cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
81
82    let mut global_dbtx = db.begin_transaction().await;
83    apply_migrations_server_dbtx(
84        &mut global_dbtx.to_ref_nc(),
85        Arc::new(ServerDbMigrationContext),
86        "fedimint-server".to_string(),
87        get_global_database_migrations(),
88    )
89    .await?;
90
91    update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
92
93    if is_running_in_test_env() {
94        verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
95    }
96    global_dbtx.commit_tx_result().await?;
97
98    let mut modules = BTreeMap::new();
99
100    // TODO: make it work with all transports and federation secrets
101    let global_api = DynGlobalApi::new(
102        connectors.clone(),
103        cfg.consensus
104            .api_endpoints()
105            .iter()
106            .map(|(&peer_id, url)| (peer_id, url.url.clone()))
107            .collect(),
108        None,
109    )?;
110
111    let bitcoin_rpc_connection = ServerBitcoinRpcMonitor::new(
112        dyn_server_bitcoin_rpc,
113        if is_running_in_test_env() {
114            Duration::from_millis(100)
115        } else {
116            Duration::from_secs(60)
117        },
118        task_group,
119    );
120
121    for (module_id, module_cfg) in &cfg.consensus.modules {
122        match module_init_registry.get(&module_cfg.kind) {
123            Some(module_init) => {
124                info!(target: LOG_CORE, "Initialise module {module_id}...");
125
126                let mut dbtx = db.begin_transaction().await;
127                apply_migrations_dbtx(
128                    &mut dbtx.to_ref_nc(),
129                    Arc::new(ServerDbMigrationContext) as Arc<_>,
130                    module_init.module_kind().to_string(),
131                    module_init.get_database_migrations(),
132                    Some(*module_id),
133                    None,
134                )
135                .await?;
136
137                if let Some(used_db_prefixes) = module_init.used_db_prefixes()
138                    && is_running_in_test_env()
139                {
140                    verify_module_db_integrity_dbtx(
141                        &mut dbtx.to_ref_nc(),
142                        *module_id,
143                        module_init.module_kind(),
144                        &used_db_prefixes,
145                    )
146                    .await;
147                }
148                dbtx.commit_tx_result().await?;
149
150                let module = module_init
151                    .init(
152                        NumPeers::from(cfg.consensus.api_endpoints().len()),
153                        cfg.get_module_config(*module_id)?,
154                        db.with_prefix_module_id(*module_id).0,
155                        task_group,
156                        cfg.local.identity,
157                        global_api.with_module(*module_id),
158                        bitcoin_rpc_connection.clone(),
159                    )
160                    .await?;
161
162                modules.insert(*module_id, (module_cfg.kind.clone(), module));
163            }
164            None => bail!("Detected configuration for unsupported module id: {module_id}"),
165        }
166    }
167
168    let module_registry = ModuleRegistry::from(modules);
169
170    let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
171
172    let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
173    let (shutdown_sender, shutdown_receiver) = watch::channel(None);
174    let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
175
176    let mut ci_status_senders = BTreeMap::new();
177    let mut ci_status_receivers = BTreeMap::new();
178
179    for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
180        let (ci_sender, ci_receiver) = watch::channel(None);
181
182        ci_status_senders.insert(peer, ci_sender);
183        ci_status_receivers.insert(peer, ci_receiver);
184    }
185
186    let consensus_api = ConsensusApi {
187        cfg: cfg.clone(),
188        cfg_dir: data_dir.clone(),
189        db: db.clone(),
190        modules: module_registry.clone(),
191        client_cfg: client_cfg.clone(),
192        submission_sender: submission_sender.clone(),
193        shutdown_sender,
194        shutdown_receiver: shutdown_receiver.clone(),
195        supported_api_versions: ServerConfig::supported_api_versions_summary(
196            &cfg.consensus.modules,
197            &module_init_registry,
198        ),
199        p2p_status_receivers,
200        ci_status_receivers,
201        ord_latency_receiver,
202        bitcoin_rpc_connection: bitcoin_rpc_connection.clone(),
203        force_api_secret: force_api_secrets.get_active(),
204        code_version_str,
205        task_group: task_group.clone(),
206    };
207
208    info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
209
210    let api_handler = start_consensus_api(
211        &cfg.local,
212        consensus_api.clone(),
213        force_api_secrets.clone(),
214        api_bind,
215    )
216    .await;
217
218    if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone()
219        && let Err(e) = Box::pin(start_iroh_api(
220            iroh_api_sk,
221            api_bind,
222            iroh_dns,
223            iroh_relays,
224            consensus_api.clone(),
225            task_group,
226            iroh_api_limits,
227        ))
228        .await
229    {
230        // clean up ws api before propagating error
231        api_handler.stop().expect("Just started");
232        api_handler.stopped().await;
233        return Err(e);
234    }
235
236    info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
237
238    for (module_id, kind, module) in module_registry.iter_modules() {
239        submit_module_ci_proposals(
240            task_group,
241            db.clone(),
242            module_id,
243            kind.clone(),
244            module.clone(),
245            submission_sender.clone(),
246        );
247    }
248
249    let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
250
251    let ui_listener = TcpListener::bind(ui_bind)
252        .await
253        .expect("Failed to bind dashboard UI");
254
255    task_group.spawn("dashboard-ui", move |handle| async move {
256        axum::serve(ui_listener, ui_service)
257            .with_graceful_shutdown(handle.make_shutdown_rx())
258            .await
259            .expect("Failed to serve dashboard UI");
260    });
261
262    info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
263
264    loop {
265        match bitcoin_rpc_connection.status() {
266            Some(status) => {
267                if let Some(progress) = status.sync_progress {
268                    if progress >= 0.999 {
269                        break;
270                    }
271
272                    info!(target: LOG_CONSENSUS, "Waiting for bitcoin backend to sync... {progress:.1}%");
273                } else {
274                    break;
275                }
276            }
277            None => {
278                info!(target: LOG_CONSENSUS, "Waiting to connect to bitcoin backend...");
279            }
280        }
281
282        sleep(Duration::from_secs(1)).await;
283    }
284
285    info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
286
287    let api_urls = get_api_urls(&db, &cfg.consensus).await;
288
289    // FIXME: (@leonardo) How should this be handled ?
290    // Using the `Connector::default()` for now!
291    ConsensusEngine {
292        db,
293        federation_api: DynGlobalApi::new(
294            connectors,
295            api_urls,
296            force_api_secrets.get_active().as_deref(),
297        )?,
298        cfg: cfg.clone(),
299        connections,
300        ord_latency_sender,
301        ci_status_senders,
302        submission_receiver,
303        shutdown_receiver,
304        modules: module_registry,
305        task_group: task_group.clone(),
306        data_dir,
307        db_checkpoint_retention,
308    }
309    .run()
310    .await?;
311
312    api_handler
313        .stop()
314        .expect("Consensus api should still be running");
315
316    api_handler.stopped().await;
317
318    Ok(())
319}
320
321async fn start_consensus_api(
322    cfg: &ServerConfigLocal,
323    api: ConsensusApi,
324    force_api_secrets: ApiSecrets,
325    api_bind: SocketAddr,
326) -> ServerHandle {
327    let mut rpc_module = RpcModule::new(api.clone());
328
329    net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
330
331    for (id, _, module) in api.modules.iter_modules() {
332        net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
333    }
334
335    net::api::spawn(
336        "consensus",
337        api_bind,
338        rpc_module,
339        cfg.max_connections,
340        force_api_secrets,
341    )
342    .await
343}
344
345const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
346
347fn submit_module_ci_proposals(
348    task_group: &TaskGroup,
349    db: Database,
350    module_id: ModuleInstanceId,
351    kind: ModuleKind,
352    module: DynServerModule,
353    submission_sender: Sender<ConsensusItem>,
354) {
355    let mut interval = tokio::time::interval(if is_running_in_test_env() {
356        Duration::from_millis(100)
357    } else {
358        Duration::from_secs(1)
359    });
360
361    task_group.spawn(
362        format!("citem_proposals_{module_id}"),
363        move |task_handle| async move {
364            while !task_handle.is_shutting_down() {
365                let module_consensus_items = tokio::time::timeout(
366                    CONSENSUS_PROPOSAL_TIMEOUT,
367                    module.consensus_proposal(
368                        &mut db
369                            .begin_transaction_nc()
370                            .await
371                            .to_ref_with_prefix_module_id(module_id)
372                            .0
373                            .into_nc(),
374                        module_id,
375                    ),
376                )
377                .await;
378
379                match module_consensus_items {
380                    Ok(items) => {
381                        for item in items {
382                            if submission_sender
383                                .send(ConsensusItem::Module(item))
384                                .await
385                                .is_err()
386                            {
387                                warn!(
388                                    target: LOG_CONSENSUS,
389                                    module_id,
390                                    "Unable to submit module consensus item proposal via channel"
391                                );
392                            }
393                        }
394                    }
395                    Err(..) => {
396                        warn!(
397                            target: LOG_CONSENSUS,
398                            module_id,
399                            %kind,
400                            "Module failed to propose consensus items on time"
401                        );
402                    }
403                }
404
405                interval.tick().await;
406            }
407        },
408    );
409}
410
411async fn start_iroh_api(
412    secret_key: iroh::SecretKey,
413    api_bind: SocketAddr,
414    iroh_dns: Option<SafeUrl>,
415    iroh_relays: Vec<SafeUrl>,
416    consensus_api: ConsensusApi,
417    task_group: &TaskGroup,
418    iroh_api_limits: ConnectionLimits,
419) -> anyhow::Result<()> {
420    let endpoint = build_iroh_endpoint(
421        secret_key,
422        api_bind,
423        iroh_dns,
424        iroh_relays,
425        FEDIMINT_API_ALPN,
426    )
427    .await?;
428    task_group.spawn_cancellable(
429        "iroh-api",
430        run_iroh_api(consensus_api, endpoint, task_group.clone(), iroh_api_limits),
431    );
432
433    Ok(())
434}
435
436async fn run_iroh_api(
437    consensus_api: ConsensusApi,
438    endpoint: Endpoint,
439    task_group: TaskGroup,
440    iroh_api_limits: ConnectionLimits,
441) {
442    let core_api = server_endpoints()
443        .into_iter()
444        .map(|endpoint| (endpoint.path.to_string(), endpoint))
445        .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
446
447    let module_api = consensus_api
448        .modules
449        .iter_modules()
450        .map(|(id, _, module)| {
451            let api_endpoints = module
452                .api_endpoints()
453                .into_iter()
454                .map(|endpoint| (endpoint.path.to_string(), endpoint))
455                .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
456
457            (id, api_endpoints)
458        })
459        .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
460
461    let consensus_api = Arc::new(consensus_api);
462    let core_api = Arc::new(core_api);
463    let module_api = Arc::new(module_api);
464    let parallel_connections_limit = Arc::new(Semaphore::new(iroh_api_limits.max_connections));
465
466    loop {
467        match endpoint.accept().await {
468            Some(incoming) => {
469                if parallel_connections_limit.available_permits() == 0 {
470                    warn!(
471                        target: LOG_NET_API,
472                        limit = iroh_api_limits.max_connections,
473                        "Iroh API connection limit reached, blocking new connections"
474                    );
475                }
476                let permit = parallel_connections_limit
477                    .clone()
478                    .acquire_owned()
479                    .await
480                    .expect("semaphore should not be closed");
481                task_group.spawn_cancellable_silent(
482                    "handle-iroh-connection",
483                    handle_incoming(
484                        consensus_api.clone(),
485                        core_api.clone(),
486                        module_api.clone(),
487                        task_group.clone(),
488                        incoming,
489                        permit,
490                        iroh_api_limits.max_requests_per_connection,
491                    )
492                    .then(|result| async {
493                        if let Err(err) = result {
494                            warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
495                        }
496                    }),
497                );
498            }
499            None => return,
500        }
501    }
502}
503
504async fn handle_incoming(
505    consensus_api: Arc<ConsensusApi>,
506    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
507    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
508    task_group: TaskGroup,
509    incoming: Incoming,
510    _connection_permit: tokio::sync::OwnedSemaphorePermit,
511    iroh_api_max_requests_per_connection: usize,
512) -> anyhow::Result<()> {
513    let connection = incoming.accept()?.await?;
514    let parallel_requests_limit = Arc::new(Semaphore::new(iroh_api_max_requests_per_connection));
515
516    loop {
517        let (send_stream, recv_stream) = connection.accept_bi().await?;
518
519        if parallel_requests_limit.available_permits() == 0 {
520            warn!(
521                target: LOG_NET_API,
522                limit = iroh_api_max_requests_per_connection,
523                "Iroh API request limit reached for connection, blocking new requests"
524            );
525        }
526        let permit = parallel_requests_limit
527            .clone()
528            .acquire_owned()
529            .await
530            .expect("semaphore should not be closed");
531        task_group.spawn_cancellable_silent(
532            "handle-iroh-request",
533            handle_request(
534                consensus_api.clone(),
535                core_api.clone(),
536                module_api.clone(),
537                send_stream,
538                recv_stream,
539                permit,
540            )
541            .then(|result| async {
542                if let Err(err) = result {
543                    warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
544                }
545            }),
546        );
547    }
548}
549
550async fn handle_request(
551    consensus_api: Arc<ConsensusApi>,
552    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
553    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
554    mut send_stream: SendStream,
555    mut recv_stream: RecvStream,
556    _request_permit: tokio::sync::OwnedSemaphorePermit,
557) -> anyhow::Result<()> {
558    let request = recv_stream.read_to_end(100_000).await?;
559
560    let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
561
562    let response = await_response(consensus_api, core_api, module_api, request).await;
563
564    let response = serde_json::to_vec(&response)?;
565
566    send_stream.write_all(&response).await?;
567
568    send_stream.finish()?;
569
570    Ok(())
571}
572
573async fn await_response(
574    consensus_api: Arc<ConsensusApi>,
575    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
576    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
577    request: IrohApiRequest,
578) -> Result<Value, ApiError> {
579    match request.method {
580        ApiMethod::Core(method) => {
581            let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
582
583            let (state, context) = consensus_api.context(&request.request, None).await;
584
585            (endpoint.handler)(state, context, request.request).await
586        }
587        ApiMethod::Module(module_id, method) => {
588            let endpoint = module_api
589                .get(&module_id)
590                .ok_or(ApiError::not_found(module_id.to_string()))?
591                .get(&method)
592                .ok_or(ApiError::not_found(method))?;
593
594            let (state, context) = consensus_api
595                .context(&request.request, Some(module_id))
596                .await;
597
598            (endpoint.handler)(state, context, request.request).await
599        }
600    }
601}