fedimint_server/consensus/
mod.rs

1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::env;
10use std::net::SocketAddr;
11use std::path::PathBuf;
12use std::sync::{Arc, RwLock};
13use std::time::Duration;
14
15use anyhow::bail;
16use async_channel::Sender;
17use db::{ServerDbMigrationContext, get_global_database_migrations};
18use fedimint_api_client::api::DynGlobalApi;
19use fedimint_core::NumPeers;
20use fedimint_core::config::P2PMessage;
21use fedimint_core::core::{ModuleInstanceId, ModuleKind};
22use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
23use fedimint_core::envs::is_running_in_test_env;
24use fedimint_core::epoch::ConsensusItem;
25use fedimint_core::module::registry::ModuleRegistry;
26use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
27use fedimint_core::net::peers::DynP2PConnections;
28use fedimint_core::task::TaskGroup;
29use fedimint_core::util::FmtCompactAnyhow as _;
30use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API, LOG_NET_IROH};
31use fedimint_server_core::dashboard_ui::IDashboardApi;
32use fedimint_server_core::migration::apply_migrations_server_dbtx;
33use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
34use futures::FutureExt;
35use iroh::Endpoint;
36use iroh::endpoint::{Incoming, RecvStream, SendStream};
37use jsonrpsee::RpcModule;
38use jsonrpsee::server::ServerHandle;
39use serde_json::Value;
40use tokio::sync::watch;
41use tracing::{info, warn};
42
43use crate::config::{ServerConfig, ServerConfigLocal};
44use crate::consensus::api::{ConsensusApi, server_endpoints};
45use crate::consensus::engine::ConsensusEngine;
46use crate::db::verify_server_db_integrity_dbtx;
47use crate::envs::{FM_DB_CHECKPOINT_RETENTION_DEFAULT, FM_DB_CHECKPOINT_RETENTION_ENV};
48use crate::net::api::announcement::get_api_urls;
49use crate::net::api::{ApiSecrets, HasApiContext};
50use crate::net::p2p::P2PStatusReceivers;
51use crate::{net, update_server_info_version_dbtx};
52
53/// How many txs can be stored in memory before blocking the API
54const TRANSACTION_BUFFER: usize = 1000;
55
56#[allow(clippy::too_many_arguments)]
57pub async fn run(
58    connections: DynP2PConnections<P2PMessage>,
59    p2p_status_receivers: P2PStatusReceivers,
60    ws_api_bind_addr: SocketAddr,
61    iroh_api_bind_addr: SocketAddr,
62    cfg: ServerConfig,
63    db: Database,
64    module_init_registry: ServerModuleInitRegistry,
65    task_group: &TaskGroup,
66    force_api_secrets: ApiSecrets,
67    data_dir: PathBuf,
68    code_version_str: String,
69    ui_bind_addr: SocketAddr,
70    dashboard_ui_handler: Option<crate::DashboardUiHandler>,
71) -> anyhow::Result<()> {
72    cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
73
74    let mut global_dbtx = db.begin_transaction().await;
75    apply_migrations_server_dbtx(
76        &mut global_dbtx.to_ref_nc(),
77        Arc::new(ServerDbMigrationContext),
78        "fedimint-server".to_string(),
79        get_global_database_migrations(),
80    )
81    .await?;
82
83    update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
84
85    if is_running_in_test_env() {
86        verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
87    }
88    global_dbtx.commit_tx_result().await?;
89
90    let mut modules = BTreeMap::new();
91
92    // TODO: make it work with all transports and federation secrets
93    let global_api = DynGlobalApi::from_endpoints(
94        cfg.consensus
95            .api_endpoints()
96            .iter()
97            .map(|(&peer_id, url)| (peer_id, url.url.clone())),
98        &None,
99    )
100    .await?;
101
102    let shared_anymap = Arc::new(RwLock::new(BTreeMap::default()));
103
104    for (module_id, module_cfg) in &cfg.consensus.modules {
105        match module_init_registry.get(&module_cfg.kind) {
106            Some(module_init) => {
107                info!(target: LOG_CORE, "Initialise module {module_id}...");
108
109                let mut dbtx = db.begin_transaction().await;
110                apply_migrations_dbtx(
111                    &mut dbtx.to_ref_nc(),
112                    Arc::new(ServerDbMigrationContext) as Arc<_>,
113                    module_init.module_kind().to_string(),
114                    module_init.get_database_migrations(),
115                    Some(*module_id),
116                    None,
117                )
118                .await?;
119
120                if let Some(used_db_prefixes) = module_init.used_db_prefixes() {
121                    if is_running_in_test_env() {
122                        verify_module_db_integrity_dbtx(
123                            &mut dbtx.to_ref_nc(),
124                            *module_id,
125                            module_init.module_kind(),
126                            &used_db_prefixes,
127                        )
128                        .await;
129                    }
130                }
131                dbtx.commit_tx_result().await?;
132
133                let module = module_init
134                    .init(
135                        NumPeers::from(cfg.consensus.api_endpoints().len()),
136                        cfg.get_module_config(*module_id)?,
137                        db.with_prefix_module_id(*module_id).0,
138                        task_group,
139                        cfg.local.identity,
140                        global_api.with_module(*module_id),
141                        shared_anymap.clone(),
142                    )
143                    .await?;
144
145                modules.insert(*module_id, (module_cfg.kind.clone(), module));
146            }
147            None => bail!("Detected configuration for unsupported module id: {module_id}"),
148        };
149    }
150
151    let module_registry = ModuleRegistry::from(modules);
152
153    let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
154
155    let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
156    let (shutdown_sender, shutdown_receiver) = watch::channel(None);
157    let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
158
159    let mut ci_status_senders = BTreeMap::new();
160    let mut ci_status_receivers = BTreeMap::new();
161
162    for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
163        let (ci_sender, ci_receiver) = watch::channel(None);
164
165        ci_status_senders.insert(peer, ci_sender);
166        ci_status_receivers.insert(peer, ci_receiver);
167    }
168
169    let consensus_api = ConsensusApi {
170        cfg: cfg.clone(),
171        db: db.clone(),
172        modules: module_registry.clone(),
173        client_cfg: client_cfg.clone(),
174        submission_sender: submission_sender.clone(),
175        shutdown_sender,
176        shutdown_receiver: shutdown_receiver.clone(),
177        supported_api_versions: ServerConfig::supported_api_versions_summary(
178            &cfg.consensus.modules,
179            &module_init_registry,
180        ),
181        p2p_status_receivers,
182        ci_status_receivers,
183        ord_latency_receiver,
184        force_api_secret: force_api_secrets.get_active(),
185        code_version_str,
186    };
187
188    info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
189
190    let api_handler = start_consensus_api(
191        &cfg.local,
192        consensus_api.clone(),
193        force_api_secrets.clone(),
194        ws_api_bind_addr,
195    )
196    .await;
197
198    if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone() {
199        Box::pin(start_iroh_api(
200            iroh_api_sk,
201            iroh_api_bind_addr,
202            consensus_api.clone(),
203            task_group,
204        ))
205        .await;
206    }
207
208    info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
209
210    for (module_id, kind, module) in module_registry.iter_modules() {
211        submit_module_ci_proposals(
212            task_group,
213            db.clone(),
214            module_id,
215            kind.clone(),
216            module.clone(),
217            submission_sender.clone(),
218        );
219    }
220
221    let checkpoint_retention: String = env::var(FM_DB_CHECKPOINT_RETENTION_ENV)
222        .unwrap_or(FM_DB_CHECKPOINT_RETENTION_DEFAULT.to_string());
223    let checkpoint_retention = checkpoint_retention.parse().unwrap_or_else(|_| {
224        panic!("FM_DB_CHECKPOINT_RETENTION_ENV var is invalid: {checkpoint_retention}")
225    });
226
227    info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
228
229    let api_urls = get_api_urls(&db, &cfg.consensus).await;
230
231    if let Some(dashboard_ui_handler) = dashboard_ui_handler {
232        task_group.spawn("dashboard-ui", move |handle| {
233            dashboard_ui_handler(consensus_api.clone().into_dyn(), ui_bind_addr, handle)
234        });
235
236        info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind_addr} 🚀");
237    }
238
239    // FIXME: (@leonardo) How should this be handled ?
240    // Using the `Connector::default()` for now!
241    ConsensusEngine {
242        db,
243        federation_api: DynGlobalApi::from_endpoints(api_urls, &force_api_secrets.get_active())
244            .await?,
245        cfg: cfg.clone(),
246        connections,
247        ord_latency_sender,
248        ci_status_senders,
249        submission_receiver,
250        shutdown_receiver,
251        modules: module_registry,
252        task_group: task_group.clone(),
253        data_dir,
254        checkpoint_retention,
255    }
256    .run()
257    .await?;
258
259    api_handler
260        .stop()
261        .expect("Consensus api should still be running");
262
263    api_handler.stopped().await;
264
265    Ok(())
266}
267
268async fn start_consensus_api(
269    cfg: &ServerConfigLocal,
270    api: ConsensusApi,
271    force_api_secrets: ApiSecrets,
272    api_bind: SocketAddr,
273) -> ServerHandle {
274    let mut rpc_module = RpcModule::new(api.clone());
275
276    net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
277
278    for (id, _, module) in api.modules.iter_modules() {
279        net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
280    }
281
282    net::api::spawn(
283        "consensus",
284        api_bind,
285        rpc_module,
286        cfg.max_connections,
287        force_api_secrets,
288    )
289    .await
290}
291
292const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
293
294fn submit_module_ci_proposals(
295    task_group: &TaskGroup,
296    db: Database,
297    module_id: ModuleInstanceId,
298    kind: ModuleKind,
299    module: DynServerModule,
300    submission_sender: Sender<ConsensusItem>,
301) {
302    let mut interval = tokio::time::interval(if is_running_in_test_env() {
303        Duration::from_millis(100)
304    } else {
305        Duration::from_secs(1)
306    });
307
308    task_group.spawn(
309        format!("citem_proposals_{module_id}"),
310        move |task_handle| async move {
311            while !task_handle.is_shutting_down() {
312                let module_consensus_items = tokio::time::timeout(
313                    CONSENSUS_PROPOSAL_TIMEOUT,
314                    module.consensus_proposal(
315                        &mut db
316                            .begin_transaction_nc()
317                            .await
318                            .to_ref_with_prefix_module_id(module_id)
319                            .0
320                            .into_nc(),
321                        module_id,
322                    ),
323                )
324                .await;
325
326                match module_consensus_items {
327                    Ok(items) => {
328                        for item in items {
329                            if submission_sender
330                                .send(ConsensusItem::Module(item))
331                                .await
332                                .is_err()
333                            {
334                                warn!(
335                                    target: LOG_CONSENSUS,
336                                    module_id,
337                                    "Unable to submit module consensus item proposal via channel"
338                                );
339                            }
340                        }
341                    }
342                    Err(..) => {
343                        warn!(
344                            target: LOG_CONSENSUS,
345                            module_id,
346                            %kind,
347                            "Module failed to propose consensus items on time"
348                        );
349                    }
350                }
351
352                interval.tick().await;
353            }
354        },
355    );
356}
357
358async fn start_iroh_api(
359    secret_key: iroh::SecretKey,
360    bind_addr: SocketAddr,
361    consensus_api: ConsensusApi,
362    task_group: &TaskGroup,
363) {
364    let builder = Endpoint::builder()
365        .discovery_n0()
366        .discovery_dht()
367        .secret_key(secret_key)
368        .alpns(vec![FEDIMINT_API_ALPN.to_vec()]);
369
370    let builder = match bind_addr {
371        SocketAddr::V4(addr_v4) => builder.bind_addr_v4(addr_v4),
372        SocketAddr::V6(addr_v6) => builder.bind_addr_v6(addr_v6),
373    };
374
375    let endpoint = builder.bind().await.expect("Failed to bind iroh api");
376
377    info!(
378        target: LOG_NET_IROH,
379        %bind_addr,
380        node_id = %endpoint.node_id(),
381        node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
382        "Iroh api server endpoint"
383    );
384
385    task_group.spawn_cancellable(
386        "iroh-api",
387        run_iroh_api(consensus_api, endpoint, task_group.clone()),
388    );
389}
390
391async fn run_iroh_api(consensus_api: ConsensusApi, endpoint: Endpoint, task_group: TaskGroup) {
392    let core_api = server_endpoints()
393        .into_iter()
394        .map(|endpoint| (endpoint.path.to_string(), endpoint))
395        .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
396
397    let module_api = consensus_api
398        .modules
399        .iter_modules()
400        .map(|(id, _, module)| {
401            let api_endpoints = module
402                .api_endpoints()
403                .into_iter()
404                .map(|endpoint| (endpoint.path.to_string(), endpoint))
405                .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
406
407            (id, api_endpoints)
408        })
409        .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
410
411    let consensus_api = Arc::new(consensus_api);
412    let core_api = Arc::new(core_api);
413    let module_api = Arc::new(module_api);
414
415    loop {
416        match endpoint.accept().await {
417            Some(incoming) => {
418                task_group.spawn_cancellable_silent(
419                    "handle-iroh-connection",
420                    handle_incoming(
421                        consensus_api.clone(),
422                        core_api.clone(),
423                        module_api.clone(),
424                        task_group.clone(),
425                        incoming,
426                    )
427                    .then(|result| async {
428                        if let Err(err) = result {
429                            warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
430                        }
431                    }),
432                );
433            }
434            None => return,
435        }
436    }
437}
438
439async fn handle_incoming(
440    consensus_api: Arc<ConsensusApi>,
441    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
442    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
443    task_group: TaskGroup,
444    incoming: Incoming,
445) -> anyhow::Result<()> {
446    let connection = incoming.accept()?.await?;
447
448    loop {
449        let (send_stream, recv_stream) = connection.accept_bi().await?;
450
451        task_group.spawn_cancellable_silent(
452            "handle-iroh-request",
453            handle_request(
454                consensus_api.clone(),
455                core_api.clone(),
456                module_api.clone(),
457                send_stream,
458                recv_stream,
459            )
460            .then(|result| async {
461                if let Err(err) = result {
462                    warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
463                }
464            }),
465        );
466    }
467}
468
469async fn handle_request(
470    consensus_api: Arc<ConsensusApi>,
471    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
472    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
473    mut send_stream: SendStream,
474    mut recv_stream: RecvStream,
475) -> anyhow::Result<()> {
476    let request = recv_stream.read_to_end(100_000).await?;
477
478    let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
479
480    let response = await_response(consensus_api, core_api, module_api, request).await;
481
482    let response = serde_json::to_vec(&response)?;
483
484    send_stream.write_all(&response).await?;
485
486    send_stream.finish()?;
487
488    Ok(())
489}
490
491async fn await_response(
492    consensus_api: Arc<ConsensusApi>,
493    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
494    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
495    request: IrohApiRequest,
496) -> Result<Value, ApiError> {
497    match request.method {
498        ApiMethod::Core(method) => {
499            let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
500
501            let (state, context) = consensus_api.context(&request.request, None).await;
502
503            (endpoint.handler)(state, context, request.request).await
504        }
505        ApiMethod::Module(module_id, method) => {
506            let endpoint = module_api
507                .get(&module_id)
508                .ok_or(ApiError::not_found(module_id.to_string()))?
509                .get(&method)
510                .ok_or(ApiError::not_found(method))?;
511
512            let (state, context) = consensus_api
513                .context(&request.request, Some(module_id))
514                .await;
515
516            (endpoint.handler)(state, context, request.request).await
517        }
518    }
519}