fedimint_server/consensus/
mod.rs

1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_connectors::ConnectorRegistry;
19use fedimint_core::NumPeers;
20use fedimint_core::config::P2PMessage;
21use fedimint_core::core::{ModuleInstanceId, ModuleKind};
22use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
23use fedimint_core::envs::is_running_in_test_env;
24use fedimint_core::epoch::ConsensusItem;
25use fedimint_core::module::registry::ModuleRegistry;
26use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
27use fedimint_core::net::iroh::build_iroh_endpoint;
28use fedimint_core::net::peers::DynP2PConnections;
29use fedimint_core::task::{TaskGroup, sleep};
30use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
31use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
32use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
33use fedimint_server_core::dashboard_ui::IDashboardApi;
34use fedimint_server_core::migration::apply_migrations_server_dbtx;
35use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
36use futures::FutureExt;
37use iroh::Endpoint;
38use iroh::endpoint::{Incoming, RecvStream, SendStream};
39use jsonrpsee::RpcModule;
40use jsonrpsee::server::ServerHandle;
41use serde_json::Value;
42use tokio::net::TcpListener;
43use tokio::sync::{Semaphore, watch};
44use tracing::{info, warn};
45
46use crate::config::{ServerConfig, ServerConfigLocal};
47use crate::connection_limits::ConnectionLimits;
48use crate::consensus::api::{ConsensusApi, server_endpoints};
49use crate::consensus::engine::ConsensusEngine;
50use crate::db::verify_server_db_integrity_dbtx;
51use crate::net::api::announcement::get_api_urls;
52use crate::net::api::{ApiSecrets, HasApiContext};
53use crate::net::p2p::{P2PConnectionTypeReceivers, P2PStatusReceivers};
54use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
55
56/// How many txs can be stored in memory before blocking the API
57const TRANSACTION_BUFFER: usize = 1000;
58
59#[allow(clippy::too_many_arguments)]
60pub async fn run(
61    connectors: ConnectorRegistry,
62    connections: DynP2PConnections<P2PMessage>,
63    p2p_status_receivers: P2PStatusReceivers,
64    p2p_connection_type_receivers: P2PConnectionTypeReceivers,
65    api_bind: SocketAddr,
66    iroh_dns: Option<SafeUrl>,
67    iroh_relays: Vec<SafeUrl>,
68    cfg: ServerConfig,
69    db: Database,
70    module_init_registry: ServerModuleInitRegistry,
71    task_group: &TaskGroup,
72    force_api_secrets: ApiSecrets,
73    data_dir: PathBuf,
74    code_version_str: String,
75    dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
76    ui_bind: SocketAddr,
77    dashboard_ui_router: DashboardUiRouter,
78    db_checkpoint_retention: u64,
79    iroh_api_limits: ConnectionLimits,
80) -> anyhow::Result<()> {
81    cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
82
83    let mut global_dbtx = db.begin_transaction().await;
84    apply_migrations_server_dbtx(
85        &mut global_dbtx.to_ref_nc(),
86        Arc::new(ServerDbMigrationContext),
87        "fedimint-server".to_string(),
88        get_global_database_migrations(),
89    )
90    .await?;
91
92    update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
93
94    if is_running_in_test_env() {
95        verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
96    }
97    global_dbtx.commit_tx_result().await?;
98
99    let mut modules = BTreeMap::new();
100
101    // TODO: make it work with all transports and federation secrets
102    let global_api = DynGlobalApi::new(
103        connectors.clone(),
104        cfg.consensus
105            .api_endpoints()
106            .iter()
107            .map(|(&peer_id, url)| (peer_id, url.url.clone()))
108            .collect(),
109        None,
110    )?;
111
112    let bitcoin_rpc_connection = ServerBitcoinRpcMonitor::new(
113        dyn_server_bitcoin_rpc,
114        if is_running_in_test_env() {
115            Duration::from_millis(100)
116        } else {
117            Duration::from_secs(60)
118        },
119        task_group,
120    );
121
122    for (module_id, module_cfg) in &cfg.consensus.modules {
123        match module_init_registry.get(&module_cfg.kind) {
124            Some(module_init) => {
125                info!(target: LOG_CORE, "Initialise module {module_id}...");
126
127                let mut dbtx = db.begin_transaction().await;
128                apply_migrations_dbtx(
129                    &mut dbtx.to_ref_nc(),
130                    Arc::new(ServerDbMigrationContext) as Arc<_>,
131                    module_init.module_kind().to_string(),
132                    module_init.get_database_migrations(),
133                    Some(*module_id),
134                    None,
135                )
136                .await?;
137
138                if let Some(used_db_prefixes) = module_init.used_db_prefixes()
139                    && is_running_in_test_env()
140                {
141                    verify_module_db_integrity_dbtx(
142                        &mut dbtx.to_ref_nc(),
143                        *module_id,
144                        module_init.module_kind(),
145                        &used_db_prefixes,
146                    )
147                    .await;
148                }
149                dbtx.commit_tx_result().await?;
150
151                let module = module_init
152                    .init(
153                        NumPeers::from(cfg.consensus.api_endpoints().len()),
154                        cfg.get_module_config(*module_id)?,
155                        db.with_prefix_module_id(*module_id).0,
156                        task_group,
157                        cfg.local.identity,
158                        global_api.with_module(*module_id),
159                        bitcoin_rpc_connection.clone(),
160                    )
161                    .await?;
162
163                modules.insert(*module_id, (module_cfg.kind.clone(), module));
164            }
165            None => bail!("Detected configuration for unsupported module id: {module_id}"),
166        }
167    }
168
169    let module_registry = ModuleRegistry::from(modules);
170
171    let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
172
173    let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
174    let (shutdown_sender, shutdown_receiver) = watch::channel(None);
175    let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
176
177    let mut ci_status_senders = BTreeMap::new();
178    let mut ci_status_receivers = BTreeMap::new();
179
180    for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
181        let (ci_sender, ci_receiver) = watch::channel(None);
182
183        ci_status_senders.insert(peer, ci_sender);
184        ci_status_receivers.insert(peer, ci_receiver);
185    }
186
187    let consensus_api = ConsensusApi {
188        cfg: cfg.clone(),
189        cfg_dir: data_dir.clone(),
190        db: db.clone(),
191        modules: module_registry.clone(),
192        client_cfg: client_cfg.clone(),
193        submission_sender: submission_sender.clone(),
194        shutdown_sender,
195        shutdown_receiver: shutdown_receiver.clone(),
196        supported_api_versions: ServerConfig::supported_api_versions_summary(
197            &cfg.consensus.modules,
198            &module_init_registry,
199        ),
200        p2p_status_receivers,
201        p2p_connection_type_receivers,
202        ci_status_receivers,
203        ord_latency_receiver,
204        bitcoin_rpc_connection: bitcoin_rpc_connection.clone(),
205        force_api_secret: force_api_secrets.get_active(),
206        code_version_str,
207        task_group: task_group.clone(),
208    };
209
210    info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
211
212    let api_handler = start_consensus_api(
213        &cfg.local,
214        consensus_api.clone(),
215        force_api_secrets.clone(),
216        api_bind,
217    )
218    .await;
219
220    if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone()
221        && let Err(e) = Box::pin(start_iroh_api(
222            iroh_api_sk,
223            api_bind,
224            iroh_dns,
225            iroh_relays,
226            consensus_api.clone(),
227            task_group,
228            iroh_api_limits,
229        ))
230        .await
231    {
232        // clean up ws api before propagating error
233        api_handler.stop().expect("Just started");
234        api_handler.stopped().await;
235        return Err(e);
236    }
237
238    info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
239
240    for (module_id, kind, module) in module_registry.iter_modules() {
241        submit_module_ci_proposals(
242            task_group,
243            db.clone(),
244            module_id,
245            kind.clone(),
246            module.clone(),
247            submission_sender.clone(),
248        );
249    }
250
251    let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
252
253    let ui_listener = TcpListener::bind(ui_bind)
254        .await
255        .expect("Failed to bind dashboard UI");
256
257    task_group.spawn("dashboard-ui", move |handle| async move {
258        axum::serve(ui_listener, ui_service)
259            .with_graceful_shutdown(handle.make_shutdown_rx())
260            .await
261            .expect("Failed to serve dashboard UI");
262    });
263
264    info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
265
266    loop {
267        match bitcoin_rpc_connection.status() {
268            Some(status) => {
269                if let Some(progress) = status.sync_progress {
270                    if progress >= 0.999 {
271                        break;
272                    }
273
274                    info!(target: LOG_CONSENSUS, "Waiting for bitcoin backend to sync... {progress:.1}%");
275                } else {
276                    break;
277                }
278            }
279            None => {
280                info!(target: LOG_CONSENSUS, "Waiting to connect to bitcoin backend...");
281            }
282        }
283
284        sleep(Duration::from_secs(1)).await;
285    }
286
287    info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
288
289    let api_urls = get_api_urls(&db, &cfg.consensus).await;
290
291    // FIXME: (@leonardo) How should this be handled ?
292    // Using the `Connector::default()` for now!
293    ConsensusEngine {
294        db,
295        federation_api: DynGlobalApi::new(
296            connectors,
297            api_urls,
298            force_api_secrets.get_active().as_deref(),
299        )?,
300        cfg: cfg.clone(),
301        connections,
302        ord_latency_sender,
303        ci_status_senders,
304        submission_receiver,
305        shutdown_receiver,
306        modules: module_registry,
307        task_group: task_group.clone(),
308        data_dir,
309        db_checkpoint_retention,
310    }
311    .run()
312    .await?;
313
314    api_handler
315        .stop()
316        .expect("Consensus api should still be running");
317
318    api_handler.stopped().await;
319
320    Ok(())
321}
322
323async fn start_consensus_api(
324    cfg: &ServerConfigLocal,
325    api: ConsensusApi,
326    force_api_secrets: ApiSecrets,
327    api_bind: SocketAddr,
328) -> ServerHandle {
329    let mut rpc_module = RpcModule::new(api.clone());
330
331    net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
332
333    for (id, _, module) in api.modules.iter_modules() {
334        net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
335    }
336
337    net::api::spawn(
338        "consensus",
339        api_bind,
340        rpc_module,
341        cfg.max_connections,
342        force_api_secrets,
343    )
344    .await
345}
346
347const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
348
349fn submit_module_ci_proposals(
350    task_group: &TaskGroup,
351    db: Database,
352    module_id: ModuleInstanceId,
353    kind: ModuleKind,
354    module: DynServerModule,
355    submission_sender: Sender<ConsensusItem>,
356) {
357    let mut interval = tokio::time::interval(if is_running_in_test_env() {
358        Duration::from_millis(100)
359    } else {
360        Duration::from_secs(1)
361    });
362
363    task_group.spawn(
364        format!("citem_proposals_{module_id}"),
365        move |task_handle| async move {
366            while !task_handle.is_shutting_down() {
367                let module_consensus_items = tokio::time::timeout(
368                    CONSENSUS_PROPOSAL_TIMEOUT,
369                    module.consensus_proposal(
370                        &mut db
371                            .begin_transaction_nc()
372                            .await
373                            .to_ref_with_prefix_module_id(module_id)
374                            .0
375                            .into_nc(),
376                        module_id,
377                    ),
378                )
379                .await;
380
381                match module_consensus_items {
382                    Ok(items) => {
383                        for item in items {
384                            if submission_sender
385                                .send(ConsensusItem::Module(item))
386                                .await
387                                .is_err()
388                            {
389                                warn!(
390                                    target: LOG_CONSENSUS,
391                                    module_id,
392                                    "Unable to submit module consensus item proposal via channel"
393                                );
394                            }
395                        }
396                    }
397                    Err(..) => {
398                        warn!(
399                            target: LOG_CONSENSUS,
400                            module_id,
401                            %kind,
402                            "Module failed to propose consensus items on time"
403                        );
404                    }
405                }
406
407                interval.tick().await;
408            }
409        },
410    );
411}
412
413async fn start_iroh_api(
414    secret_key: iroh::SecretKey,
415    api_bind: SocketAddr,
416    iroh_dns: Option<SafeUrl>,
417    iroh_relays: Vec<SafeUrl>,
418    consensus_api: ConsensusApi,
419    task_group: &TaskGroup,
420    iroh_api_limits: ConnectionLimits,
421) -> anyhow::Result<()> {
422    let endpoint = build_iroh_endpoint(
423        secret_key,
424        api_bind,
425        iroh_dns,
426        iroh_relays,
427        FEDIMINT_API_ALPN,
428    )
429    .await?;
430    task_group.spawn_cancellable(
431        "iroh-api",
432        run_iroh_api(consensus_api, endpoint, task_group.clone(), iroh_api_limits),
433    );
434
435    Ok(())
436}
437
438async fn run_iroh_api(
439    consensus_api: ConsensusApi,
440    endpoint: Endpoint,
441    task_group: TaskGroup,
442    iroh_api_limits: ConnectionLimits,
443) {
444    let core_api = server_endpoints()
445        .into_iter()
446        .map(|endpoint| (endpoint.path.to_string(), endpoint))
447        .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
448
449    let module_api = consensus_api
450        .modules
451        .iter_modules()
452        .map(|(id, _, module)| {
453            let api_endpoints = module
454                .api_endpoints()
455                .into_iter()
456                .map(|endpoint| (endpoint.path.to_string(), endpoint))
457                .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
458
459            (id, api_endpoints)
460        })
461        .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
462
463    let consensus_api = Arc::new(consensus_api);
464    let core_api = Arc::new(core_api);
465    let module_api = Arc::new(module_api);
466    let parallel_connections_limit = Arc::new(Semaphore::new(iroh_api_limits.max_connections));
467
468    loop {
469        match endpoint.accept().await {
470            Some(incoming) => {
471                if parallel_connections_limit.available_permits() == 0 {
472                    warn!(
473                        target: LOG_NET_API,
474                        limit = iroh_api_limits.max_connections,
475                        "Iroh API connection limit reached, blocking new connections"
476                    );
477                }
478                let permit = parallel_connections_limit
479                    .clone()
480                    .acquire_owned()
481                    .await
482                    .expect("semaphore should not be closed");
483                task_group.spawn_cancellable_silent(
484                    "handle-iroh-connection",
485                    handle_incoming(
486                        consensus_api.clone(),
487                        core_api.clone(),
488                        module_api.clone(),
489                        task_group.clone(),
490                        incoming,
491                        permit,
492                        iroh_api_limits.max_requests_per_connection,
493                    )
494                    .then(|result| async {
495                        if let Err(err) = result {
496                            warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
497                        }
498                    }),
499                );
500            }
501            None => return,
502        }
503    }
504}
505
506async fn handle_incoming(
507    consensus_api: Arc<ConsensusApi>,
508    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
509    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
510    task_group: TaskGroup,
511    incoming: Incoming,
512    _connection_permit: tokio::sync::OwnedSemaphorePermit,
513    iroh_api_max_requests_per_connection: usize,
514) -> anyhow::Result<()> {
515    let connection = incoming.accept()?.await?;
516    let parallel_requests_limit = Arc::new(Semaphore::new(iroh_api_max_requests_per_connection));
517
518    loop {
519        let (send_stream, recv_stream) = connection.accept_bi().await?;
520
521        if parallel_requests_limit.available_permits() == 0 {
522            warn!(
523                target: LOG_NET_API,
524                limit = iroh_api_max_requests_per_connection,
525                "Iroh API request limit reached for connection, blocking new requests"
526            );
527        }
528        let permit = parallel_requests_limit
529            .clone()
530            .acquire_owned()
531            .await
532            .expect("semaphore should not be closed");
533        task_group.spawn_cancellable_silent(
534            "handle-iroh-request",
535            handle_request(
536                consensus_api.clone(),
537                core_api.clone(),
538                module_api.clone(),
539                send_stream,
540                recv_stream,
541                permit,
542            )
543            .then(|result| async {
544                if let Err(err) = result {
545                    warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
546                }
547            }),
548        );
549    }
550}
551
552async fn handle_request(
553    consensus_api: Arc<ConsensusApi>,
554    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
555    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
556    mut send_stream: SendStream,
557    mut recv_stream: RecvStream,
558    _request_permit: tokio::sync::OwnedSemaphorePermit,
559) -> anyhow::Result<()> {
560    let request = recv_stream.read_to_end(100_000).await?;
561
562    let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
563
564    let response = await_response(consensus_api, core_api, module_api, request).await;
565
566    let response = serde_json::to_vec(&response)?;
567
568    send_stream.write_all(&response).await?;
569
570    send_stream.finish()?;
571
572    Ok(())
573}
574
575async fn await_response(
576    consensus_api: Arc<ConsensusApi>,
577    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
578    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
579    request: IrohApiRequest,
580) -> Result<Value, ApiError> {
581    match request.method {
582        ApiMethod::Core(method) => {
583            let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
584
585            let (state, context) = consensus_api.context(&request.request, None).await;
586
587            (endpoint.handler)(state, context, request.request).await
588        }
589        ApiMethod::Module(module_id, method) => {
590            let endpoint = module_api
591                .get(&module_id)
592                .ok_or(ApiError::not_found(module_id.to_string()))?
593                .get(&method)
594                .ok_or(ApiError::not_found(method))?;
595
596            let (state, context) = consensus_api
597                .context(&request.request, Some(module_id))
598                .await;
599
600            (endpoint.handler)(state, context, request.request).await
601        }
602    }
603}