fedimint_server/consensus/
mod.rs

1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_core::NumPeers;
19use fedimint_core::config::P2PMessage;
20use fedimint_core::core::{ModuleInstanceId, ModuleKind};
21use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
22use fedimint_core::envs::is_running_in_test_env;
23use fedimint_core::epoch::ConsensusItem;
24use fedimint_core::module::registry::ModuleRegistry;
25use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
26use fedimint_core::net::peers::DynP2PConnections;
27use fedimint_core::task::TaskGroup;
28use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
29use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
30use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
31use fedimint_server_core::dashboard_ui::IDashboardApi;
32use fedimint_server_core::migration::apply_migrations_server_dbtx;
33use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
34use futures::FutureExt;
35use iroh::Endpoint;
36use iroh::endpoint::{Incoming, RecvStream, SendStream};
37use jsonrpsee::RpcModule;
38use jsonrpsee::server::ServerHandle;
39use serde_json::Value;
40use tokio::net::TcpListener;
41use tokio::sync::{Semaphore, watch};
42use tracing::{info, warn};
43
44use crate::config::{ServerConfig, ServerConfigLocal};
45use crate::connection_limits::ConnectionLimits;
46use crate::consensus::api::{ConsensusApi, server_endpoints};
47use crate::consensus::engine::ConsensusEngine;
48use crate::db::verify_server_db_integrity_dbtx;
49use crate::net::api::announcement::get_api_urls;
50use crate::net::api::{ApiSecrets, HasApiContext};
51use crate::net::p2p::P2PStatusReceivers;
52use crate::net::p2p_connector::build_iroh_endpoint;
53use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
54
55/// How many txs can be stored in memory before blocking the API
56const TRANSACTION_BUFFER: usize = 1000;
57
58#[allow(clippy::too_many_arguments)]
59pub async fn run(
60    connections: DynP2PConnections<P2PMessage>,
61    p2p_status_receivers: P2PStatusReceivers,
62    api_bind: SocketAddr,
63    iroh_dns: Option<SafeUrl>,
64    iroh_relays: Vec<SafeUrl>,
65    cfg: ServerConfig,
66    db: Database,
67    module_init_registry: ServerModuleInitRegistry,
68    task_group: &TaskGroup,
69    force_api_secrets: ApiSecrets,
70    data_dir: PathBuf,
71    code_version_str: String,
72    dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
73    ui_bind: SocketAddr,
74    dashboard_ui_router: DashboardUiRouter,
75    db_checkpoint_retention: u64,
76    iroh_api_limits: ConnectionLimits,
77) -> anyhow::Result<()> {
78    cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
79
80    let mut global_dbtx = db.begin_transaction().await;
81    apply_migrations_server_dbtx(
82        &mut global_dbtx.to_ref_nc(),
83        Arc::new(ServerDbMigrationContext),
84        "fedimint-server".to_string(),
85        get_global_database_migrations(),
86    )
87    .await?;
88
89    update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
90
91    if is_running_in_test_env() {
92        verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
93    }
94    global_dbtx.commit_tx_result().await?;
95
96    let mut modules = BTreeMap::new();
97
98    // TODO: make it work with all transports and federation secrets
99    let global_api = DynGlobalApi::from_endpoints(
100        cfg.consensus
101            .api_endpoints()
102            .iter()
103            .map(|(&peer_id, url)| (peer_id, url.url.clone())),
104        &None,
105    )
106    .await?;
107
108    let server_bitcoin_rpc_monitor = ServerBitcoinRpcMonitor::new(
109        dyn_server_bitcoin_rpc,
110        if is_running_in_test_env() {
111            Duration::from_millis(100)
112        } else {
113            Duration::from_secs(60)
114        },
115        task_group,
116    );
117
118    for (module_id, module_cfg) in &cfg.consensus.modules {
119        match module_init_registry.get(&module_cfg.kind) {
120            Some(module_init) => {
121                info!(target: LOG_CORE, "Initialise module {module_id}...");
122
123                let mut dbtx = db.begin_transaction().await;
124                apply_migrations_dbtx(
125                    &mut dbtx.to_ref_nc(),
126                    Arc::new(ServerDbMigrationContext) as Arc<_>,
127                    module_init.module_kind().to_string(),
128                    module_init.get_database_migrations(),
129                    Some(*module_id),
130                    None,
131                )
132                .await?;
133
134                if let Some(used_db_prefixes) = module_init.used_db_prefixes() {
135                    if is_running_in_test_env() {
136                        verify_module_db_integrity_dbtx(
137                            &mut dbtx.to_ref_nc(),
138                            *module_id,
139                            module_init.module_kind(),
140                            &used_db_prefixes,
141                        )
142                        .await;
143                    }
144                }
145                dbtx.commit_tx_result().await?;
146
147                let module = module_init
148                    .init(
149                        NumPeers::from(cfg.consensus.api_endpoints().len()),
150                        cfg.get_module_config(*module_id)?,
151                        db.with_prefix_module_id(*module_id).0,
152                        task_group,
153                        cfg.local.identity,
154                        global_api.with_module(*module_id),
155                        server_bitcoin_rpc_monitor.clone(),
156                    )
157                    .await?;
158
159                modules.insert(*module_id, (module_cfg.kind.clone(), module));
160            }
161            None => bail!("Detected configuration for unsupported module id: {module_id}"),
162        }
163    }
164
165    let module_registry = ModuleRegistry::from(modules);
166
167    let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
168
169    let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
170    let (shutdown_sender, shutdown_receiver) = watch::channel(None);
171    let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
172
173    let mut ci_status_senders = BTreeMap::new();
174    let mut ci_status_receivers = BTreeMap::new();
175
176    for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
177        let (ci_sender, ci_receiver) = watch::channel(None);
178
179        ci_status_senders.insert(peer, ci_sender);
180        ci_status_receivers.insert(peer, ci_receiver);
181    }
182
183    let consensus_api = ConsensusApi {
184        cfg: cfg.clone(),
185        db: db.clone(),
186        modules: module_registry.clone(),
187        client_cfg: client_cfg.clone(),
188        submission_sender: submission_sender.clone(),
189        shutdown_sender,
190        shutdown_receiver: shutdown_receiver.clone(),
191        supported_api_versions: ServerConfig::supported_api_versions_summary(
192            &cfg.consensus.modules,
193            &module_init_registry,
194        ),
195        p2p_status_receivers,
196        ci_status_receivers,
197        ord_latency_receiver,
198        bitcoin_rpc_connection: server_bitcoin_rpc_monitor,
199        force_api_secret: force_api_secrets.get_active(),
200        code_version_str,
201    };
202
203    info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
204
205    let api_handler = if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone() {
206        Box::pin(start_iroh_api(
207            iroh_api_sk,
208            api_bind,
209            iroh_dns,
210            iroh_relays,
211            consensus_api.clone(),
212            task_group,
213            iroh_api_limits,
214        ))
215        .await?;
216
217        None
218    } else {
219        let handler = start_consensus_api(
220            &cfg.local,
221            consensus_api.clone(),
222            force_api_secrets.clone(),
223            api_bind,
224        )
225        .await;
226
227        Some(handler)
228    };
229
230    info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
231
232    for (module_id, kind, module) in module_registry.iter_modules() {
233        submit_module_ci_proposals(
234            task_group,
235            db.clone(),
236            module_id,
237            kind.clone(),
238            module.clone(),
239            submission_sender.clone(),
240        );
241    }
242
243    info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
244
245    let api_urls = get_api_urls(&db, &cfg.consensus).await;
246
247    let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
248
249    let ui_listener = TcpListener::bind(ui_bind)
250        .await
251        .expect("Failed to bind dashboard UI");
252
253    task_group.spawn("dashboard-ui", move |handle| async move {
254        axum::serve(ui_listener, ui_service)
255            .with_graceful_shutdown(handle.make_shutdown_rx())
256            .await
257            .expect("Failed to serve dashboard UI");
258    });
259
260    info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
261
262    // FIXME: (@leonardo) How should this be handled ?
263    // Using the `Connector::default()` for now!
264    ConsensusEngine {
265        db,
266        federation_api: DynGlobalApi::from_endpoints(api_urls, &force_api_secrets.get_active())
267            .await?,
268        cfg: cfg.clone(),
269        connections,
270        ord_latency_sender,
271        ci_status_senders,
272        submission_receiver,
273        shutdown_receiver,
274        modules: module_registry,
275        task_group: task_group.clone(),
276        data_dir,
277        db_checkpoint_retention,
278    }
279    .run()
280    .await?;
281
282    if let Some(api_handler) = api_handler {
283        api_handler
284            .stop()
285            .expect("Consensus api should still be running");
286
287        api_handler.stopped().await;
288    }
289
290    Ok(())
291}
292
293async fn start_consensus_api(
294    cfg: &ServerConfigLocal,
295    api: ConsensusApi,
296    force_api_secrets: ApiSecrets,
297    api_bind: SocketAddr,
298) -> ServerHandle {
299    let mut rpc_module = RpcModule::new(api.clone());
300
301    net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
302
303    for (id, _, module) in api.modules.iter_modules() {
304        net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
305    }
306
307    net::api::spawn(
308        "consensus",
309        api_bind,
310        rpc_module,
311        cfg.max_connections,
312        force_api_secrets,
313    )
314    .await
315}
316
317const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
318
319fn submit_module_ci_proposals(
320    task_group: &TaskGroup,
321    db: Database,
322    module_id: ModuleInstanceId,
323    kind: ModuleKind,
324    module: DynServerModule,
325    submission_sender: Sender<ConsensusItem>,
326) {
327    let mut interval = tokio::time::interval(if is_running_in_test_env() {
328        Duration::from_millis(100)
329    } else {
330        Duration::from_secs(1)
331    });
332
333    task_group.spawn(
334        format!("citem_proposals_{module_id}"),
335        move |task_handle| async move {
336            while !task_handle.is_shutting_down() {
337                let module_consensus_items = tokio::time::timeout(
338                    CONSENSUS_PROPOSAL_TIMEOUT,
339                    module.consensus_proposal(
340                        &mut db
341                            .begin_transaction_nc()
342                            .await
343                            .to_ref_with_prefix_module_id(module_id)
344                            .0
345                            .into_nc(),
346                        module_id,
347                    ),
348                )
349                .await;
350
351                match module_consensus_items {
352                    Ok(items) => {
353                        for item in items {
354                            if submission_sender
355                                .send(ConsensusItem::Module(item))
356                                .await
357                                .is_err()
358                            {
359                                warn!(
360                                    target: LOG_CONSENSUS,
361                                    module_id,
362                                    "Unable to submit module consensus item proposal via channel"
363                                );
364                            }
365                        }
366                    }
367                    Err(..) => {
368                        warn!(
369                            target: LOG_CONSENSUS,
370                            module_id,
371                            %kind,
372                            "Module failed to propose consensus items on time"
373                        );
374                    }
375                }
376
377                interval.tick().await;
378            }
379        },
380    );
381}
382
383async fn start_iroh_api(
384    secret_key: iroh::SecretKey,
385    api_bind: SocketAddr,
386    iroh_dns: Option<SafeUrl>,
387    iroh_relays: Vec<SafeUrl>,
388    consensus_api: ConsensusApi,
389    task_group: &TaskGroup,
390    iroh_api_limits: ConnectionLimits,
391) -> anyhow::Result<()> {
392    let endpoint = build_iroh_endpoint(
393        secret_key,
394        api_bind,
395        iroh_dns,
396        iroh_relays,
397        FEDIMINT_API_ALPN,
398    )
399    .await?;
400    task_group.spawn_cancellable(
401        "iroh-api",
402        run_iroh_api(consensus_api, endpoint, task_group.clone(), iroh_api_limits),
403    );
404
405    Ok(())
406}
407
408async fn run_iroh_api(
409    consensus_api: ConsensusApi,
410    endpoint: Endpoint,
411    task_group: TaskGroup,
412    iroh_api_limits: ConnectionLimits,
413) {
414    let core_api = server_endpoints()
415        .into_iter()
416        .map(|endpoint| (endpoint.path.to_string(), endpoint))
417        .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
418
419    let module_api = consensus_api
420        .modules
421        .iter_modules()
422        .map(|(id, _, module)| {
423            let api_endpoints = module
424                .api_endpoints()
425                .into_iter()
426                .map(|endpoint| (endpoint.path.to_string(), endpoint))
427                .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
428
429            (id, api_endpoints)
430        })
431        .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
432
433    let consensus_api = Arc::new(consensus_api);
434    let core_api = Arc::new(core_api);
435    let module_api = Arc::new(module_api);
436    let parallel_connections_limit = Arc::new(Semaphore::new(iroh_api_limits.max_connections));
437
438    loop {
439        match endpoint.accept().await {
440            Some(incoming) => {
441                if parallel_connections_limit.available_permits() == 0 {
442                    warn!(
443                        target: LOG_NET_API,
444                        limit = iroh_api_limits.max_connections,
445                        "Iroh API connection limit reached, blocking new connections"
446                    );
447                }
448                let permit = parallel_connections_limit
449                    .clone()
450                    .acquire_owned()
451                    .await
452                    .expect("semaphore should not be closed");
453                task_group.spawn_cancellable_silent(
454                    "handle-iroh-connection",
455                    handle_incoming(
456                        consensus_api.clone(),
457                        core_api.clone(),
458                        module_api.clone(),
459                        task_group.clone(),
460                        incoming,
461                        permit,
462                        iroh_api_limits.max_requests_per_connection,
463                    )
464                    .then(|result| async {
465                        if let Err(err) = result {
466                            warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
467                        }
468                    }),
469                );
470            }
471            None => return,
472        }
473    }
474}
475
476async fn handle_incoming(
477    consensus_api: Arc<ConsensusApi>,
478    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
479    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
480    task_group: TaskGroup,
481    incoming: Incoming,
482    _connection_permit: tokio::sync::OwnedSemaphorePermit,
483    iroh_api_max_requests_per_connection: usize,
484) -> anyhow::Result<()> {
485    let connection = incoming.accept()?.await?;
486    let parallel_requests_limit = Arc::new(Semaphore::new(iroh_api_max_requests_per_connection));
487
488    loop {
489        let (send_stream, recv_stream) = connection.accept_bi().await?;
490
491        if parallel_requests_limit.available_permits() == 0 {
492            warn!(
493                target: LOG_NET_API,
494                limit = iroh_api_max_requests_per_connection,
495                "Iroh API request limit reached for connection, blocking new requests"
496            );
497        }
498        let permit = parallel_requests_limit
499            .clone()
500            .acquire_owned()
501            .await
502            .expect("semaphore should not be closed");
503        task_group.spawn_cancellable_silent(
504            "handle-iroh-request",
505            handle_request(
506                consensus_api.clone(),
507                core_api.clone(),
508                module_api.clone(),
509                send_stream,
510                recv_stream,
511                permit,
512            )
513            .then(|result| async {
514                if let Err(err) = result {
515                    warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
516                }
517            }),
518        );
519    }
520}
521
522async fn handle_request(
523    consensus_api: Arc<ConsensusApi>,
524    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
525    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
526    mut send_stream: SendStream,
527    mut recv_stream: RecvStream,
528    _request_permit: tokio::sync::OwnedSemaphorePermit,
529) -> anyhow::Result<()> {
530    let request = recv_stream.read_to_end(100_000).await?;
531
532    let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
533
534    let response = await_response(consensus_api, core_api, module_api, request).await;
535
536    let response = serde_json::to_vec(&response)?;
537
538    send_stream.write_all(&response).await?;
539
540    send_stream.finish()?;
541
542    Ok(())
543}
544
545async fn await_response(
546    consensus_api: Arc<ConsensusApi>,
547    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
548    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
549    request: IrohApiRequest,
550) -> Result<Value, ApiError> {
551    match request.method {
552        ApiMethod::Core(method) => {
553            let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
554
555            let (state, context) = consensus_api.context(&request.request, None).await;
556
557            (endpoint.handler)(state, context, request.request).await
558        }
559        ApiMethod::Module(module_id, method) => {
560            let endpoint = module_api
561                .get(&module_id)
562                .ok_or(ApiError::not_found(module_id.to_string()))?
563                .get(&method)
564                .ok_or(ApiError::not_found(method))?;
565
566            let (state, context) = consensus_api
567                .context(&request.request, Some(module_id))
568                .await;
569
570            (endpoint.handler)(state, context, request.request).await
571        }
572    }
573}