fedimint_server/consensus/
mod.rs

1pub mod aleph_bft;
2pub mod api;
3pub mod db;
4pub mod debug;
5pub mod engine;
6pub mod transaction;
7
8use std::collections::BTreeMap;
9use std::net::SocketAddr;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use async_channel::Sender;
16use db::{ServerDbMigrationContext, get_global_database_migrations};
17use fedimint_api_client::api::DynGlobalApi;
18use fedimint_core::NumPeers;
19use fedimint_core::config::P2PMessage;
20use fedimint_core::core::{ModuleInstanceId, ModuleKind};
21use fedimint_core::db::{Database, apply_migrations_dbtx, verify_module_db_integrity_dbtx};
22use fedimint_core::envs::is_running_in_test_env;
23use fedimint_core::epoch::ConsensusItem;
24use fedimint_core::module::registry::ModuleRegistry;
25use fedimint_core::module::{ApiEndpoint, ApiError, ApiMethod, FEDIMINT_API_ALPN, IrohApiRequest};
26use fedimint_core::net::peers::DynP2PConnections;
27use fedimint_core::task::TaskGroup;
28use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
29use fedimint_logging::{LOG_CONSENSUS, LOG_CORE, LOG_NET_API};
30use fedimint_server_core::bitcoin_rpc::{DynServerBitcoinRpc, ServerBitcoinRpcMonitor};
31use fedimint_server_core::dashboard_ui::IDashboardApi;
32use fedimint_server_core::migration::apply_migrations_server_dbtx;
33use fedimint_server_core::{DynServerModule, ServerModuleInitRegistry};
34use futures::FutureExt;
35use iroh::Endpoint;
36use iroh::endpoint::{Incoming, RecvStream, SendStream};
37use jsonrpsee::RpcModule;
38use jsonrpsee::server::ServerHandle;
39use serde_json::Value;
40use tokio::net::TcpListener;
41use tokio::sync::watch;
42use tracing::{info, warn};
43
44use crate::config::{ServerConfig, ServerConfigLocal};
45use crate::consensus::api::{ConsensusApi, server_endpoints};
46use crate::consensus::engine::ConsensusEngine;
47use crate::db::verify_server_db_integrity_dbtx;
48use crate::net::api::announcement::get_api_urls;
49use crate::net::api::{ApiSecrets, HasApiContext};
50use crate::net::p2p::P2PStatusReceivers;
51use crate::net::p2p_connector::build_iroh_endpoint;
52use crate::{DashboardUiRouter, net, update_server_info_version_dbtx};
53
54/// How many txs can be stored in memory before blocking the API
55const TRANSACTION_BUFFER: usize = 1000;
56
57#[allow(clippy::too_many_arguments)]
58pub async fn run(
59    connections: DynP2PConnections<P2PMessage>,
60    p2p_status_receivers: P2PStatusReceivers,
61    api_bind: SocketAddr,
62    iroh_dns: Option<SafeUrl>,
63    iroh_relays: Vec<SafeUrl>,
64    cfg: ServerConfig,
65    db: Database,
66    module_init_registry: ServerModuleInitRegistry,
67    task_group: &TaskGroup,
68    force_api_secrets: ApiSecrets,
69    data_dir: PathBuf,
70    code_version_str: String,
71    dyn_server_bitcoin_rpc: DynServerBitcoinRpc,
72    ui_bind: SocketAddr,
73    dashboard_ui_router: DashboardUiRouter,
74    db_checkpoint_retention: u64,
75) -> anyhow::Result<()> {
76    cfg.validate_config(&cfg.local.identity, &module_init_registry)?;
77
78    let mut global_dbtx = db.begin_transaction().await;
79    apply_migrations_server_dbtx(
80        &mut global_dbtx.to_ref_nc(),
81        Arc::new(ServerDbMigrationContext),
82        "fedimint-server".to_string(),
83        get_global_database_migrations(),
84    )
85    .await?;
86
87    update_server_info_version_dbtx(&mut global_dbtx.to_ref_nc(), &code_version_str).await;
88
89    if is_running_in_test_env() {
90        verify_server_db_integrity_dbtx(&mut global_dbtx.to_ref_nc()).await;
91    }
92    global_dbtx.commit_tx_result().await?;
93
94    let mut modules = BTreeMap::new();
95
96    // TODO: make it work with all transports and federation secrets
97    let global_api = DynGlobalApi::from_endpoints(
98        cfg.consensus
99            .api_endpoints()
100            .iter()
101            .map(|(&peer_id, url)| (peer_id, url.url.clone())),
102        &None,
103    )
104    .await?;
105
106    let server_bitcoin_rpc_monitor = ServerBitcoinRpcMonitor::new(
107        dyn_server_bitcoin_rpc,
108        if is_running_in_test_env() {
109            Duration::from_millis(100)
110        } else {
111            Duration::from_secs(60)
112        },
113        task_group,
114    );
115
116    for (module_id, module_cfg) in &cfg.consensus.modules {
117        match module_init_registry.get(&module_cfg.kind) {
118            Some(module_init) => {
119                info!(target: LOG_CORE, "Initialise module {module_id}...");
120
121                let mut dbtx = db.begin_transaction().await;
122                apply_migrations_dbtx(
123                    &mut dbtx.to_ref_nc(),
124                    Arc::new(ServerDbMigrationContext) as Arc<_>,
125                    module_init.module_kind().to_string(),
126                    module_init.get_database_migrations(),
127                    Some(*module_id),
128                    None,
129                )
130                .await?;
131
132                if let Some(used_db_prefixes) = module_init.used_db_prefixes() {
133                    if is_running_in_test_env() {
134                        verify_module_db_integrity_dbtx(
135                            &mut dbtx.to_ref_nc(),
136                            *module_id,
137                            module_init.module_kind(),
138                            &used_db_prefixes,
139                        )
140                        .await;
141                    }
142                }
143                dbtx.commit_tx_result().await?;
144
145                let module = module_init
146                    .init(
147                        NumPeers::from(cfg.consensus.api_endpoints().len()),
148                        cfg.get_module_config(*module_id)?,
149                        db.with_prefix_module_id(*module_id).0,
150                        task_group,
151                        cfg.local.identity,
152                        global_api.with_module(*module_id),
153                        server_bitcoin_rpc_monitor.clone(),
154                    )
155                    .await?;
156
157                modules.insert(*module_id, (module_cfg.kind.clone(), module));
158            }
159            None => bail!("Detected configuration for unsupported module id: {module_id}"),
160        }
161    }
162
163    let module_registry = ModuleRegistry::from(modules);
164
165    let client_cfg = cfg.consensus.to_client_config(&module_init_registry)?;
166
167    let (submission_sender, submission_receiver) = async_channel::bounded(TRANSACTION_BUFFER);
168    let (shutdown_sender, shutdown_receiver) = watch::channel(None);
169    let (ord_latency_sender, ord_latency_receiver) = watch::channel(None);
170
171    let mut ci_status_senders = BTreeMap::new();
172    let mut ci_status_receivers = BTreeMap::new();
173
174    for peer in cfg.consensus.broadcast_public_keys.keys().copied() {
175        let (ci_sender, ci_receiver) = watch::channel(None);
176
177        ci_status_senders.insert(peer, ci_sender);
178        ci_status_receivers.insert(peer, ci_receiver);
179    }
180
181    let consensus_api = ConsensusApi {
182        cfg: cfg.clone(),
183        db: db.clone(),
184        modules: module_registry.clone(),
185        client_cfg: client_cfg.clone(),
186        submission_sender: submission_sender.clone(),
187        shutdown_sender,
188        shutdown_receiver: shutdown_receiver.clone(),
189        supported_api_versions: ServerConfig::supported_api_versions_summary(
190            &cfg.consensus.modules,
191            &module_init_registry,
192        ),
193        p2p_status_receivers,
194        ci_status_receivers,
195        ord_latency_receiver,
196        bitcoin_rpc_connection: server_bitcoin_rpc_monitor,
197        force_api_secret: force_api_secrets.get_active(),
198        code_version_str,
199    };
200
201    info!(target: LOG_CONSENSUS, "Starting Consensus Api...");
202
203    let api_handler = if let Some(iroh_api_sk) = cfg.private.iroh_api_sk.clone() {
204        Box::pin(start_iroh_api(
205            iroh_api_sk,
206            api_bind,
207            iroh_dns,
208            iroh_relays,
209            consensus_api.clone(),
210            task_group,
211        ))
212        .await?;
213
214        None
215    } else {
216        let handler = start_consensus_api(
217            &cfg.local,
218            consensus_api.clone(),
219            force_api_secrets.clone(),
220            api_bind,
221        )
222        .await;
223
224        Some(handler)
225    };
226
227    info!(target: LOG_CONSENSUS, "Starting Submission of Module CI proposals...");
228
229    for (module_id, kind, module) in module_registry.iter_modules() {
230        submit_module_ci_proposals(
231            task_group,
232            db.clone(),
233            module_id,
234            kind.clone(),
235            module.clone(),
236            submission_sender.clone(),
237        );
238    }
239
240    info!(target: LOG_CONSENSUS, "Starting Consensus Engine...");
241
242    let api_urls = get_api_urls(&db, &cfg.consensus).await;
243
244    let ui_service = dashboard_ui_router(consensus_api.clone().into_dyn()).into_make_service();
245
246    let ui_listener = TcpListener::bind(ui_bind)
247        .await
248        .expect("Failed to bind dashboard UI");
249
250    task_group.spawn("dashboard-ui", move |handle| async move {
251        axum::serve(ui_listener, ui_service)
252            .with_graceful_shutdown(handle.make_shutdown_rx())
253            .await
254            .expect("Failed to serve dashboard UI");
255    });
256
257    info!(target: LOG_CONSENSUS, "Dashboard UI running at http://{ui_bind} 🚀");
258
259    // FIXME: (@leonardo) How should this be handled ?
260    // Using the `Connector::default()` for now!
261    ConsensusEngine {
262        db,
263        federation_api: DynGlobalApi::from_endpoints(api_urls, &force_api_secrets.get_active())
264            .await?,
265        cfg: cfg.clone(),
266        connections,
267        ord_latency_sender,
268        ci_status_senders,
269        submission_receiver,
270        shutdown_receiver,
271        modules: module_registry,
272        task_group: task_group.clone(),
273        data_dir,
274        db_checkpoint_retention,
275    }
276    .run()
277    .await?;
278
279    if let Some(api_handler) = api_handler {
280        api_handler
281            .stop()
282            .expect("Consensus api should still be running");
283
284        api_handler.stopped().await;
285    }
286
287    Ok(())
288}
289
290async fn start_consensus_api(
291    cfg: &ServerConfigLocal,
292    api: ConsensusApi,
293    force_api_secrets: ApiSecrets,
294    api_bind: SocketAddr,
295) -> ServerHandle {
296    let mut rpc_module = RpcModule::new(api.clone());
297
298    net::api::attach_endpoints(&mut rpc_module, api::server_endpoints(), None);
299
300    for (id, _, module) in api.modules.iter_modules() {
301        net::api::attach_endpoints(&mut rpc_module, module.api_endpoints(), Some(id));
302    }
303
304    net::api::spawn(
305        "consensus",
306        api_bind,
307        rpc_module,
308        cfg.max_connections,
309        force_api_secrets,
310    )
311    .await
312}
313
314const CONSENSUS_PROPOSAL_TIMEOUT: Duration = Duration::from_secs(30);
315
316fn submit_module_ci_proposals(
317    task_group: &TaskGroup,
318    db: Database,
319    module_id: ModuleInstanceId,
320    kind: ModuleKind,
321    module: DynServerModule,
322    submission_sender: Sender<ConsensusItem>,
323) {
324    let mut interval = tokio::time::interval(if is_running_in_test_env() {
325        Duration::from_millis(100)
326    } else {
327        Duration::from_secs(1)
328    });
329
330    task_group.spawn(
331        format!("citem_proposals_{module_id}"),
332        move |task_handle| async move {
333            while !task_handle.is_shutting_down() {
334                let module_consensus_items = tokio::time::timeout(
335                    CONSENSUS_PROPOSAL_TIMEOUT,
336                    module.consensus_proposal(
337                        &mut db
338                            .begin_transaction_nc()
339                            .await
340                            .to_ref_with_prefix_module_id(module_id)
341                            .0
342                            .into_nc(),
343                        module_id,
344                    ),
345                )
346                .await;
347
348                match module_consensus_items {
349                    Ok(items) => {
350                        for item in items {
351                            if submission_sender
352                                .send(ConsensusItem::Module(item))
353                                .await
354                                .is_err()
355                            {
356                                warn!(
357                                    target: LOG_CONSENSUS,
358                                    module_id,
359                                    "Unable to submit module consensus item proposal via channel"
360                                );
361                            }
362                        }
363                    }
364                    Err(..) => {
365                        warn!(
366                            target: LOG_CONSENSUS,
367                            module_id,
368                            %kind,
369                            "Module failed to propose consensus items on time"
370                        );
371                    }
372                }
373
374                interval.tick().await;
375            }
376        },
377    );
378}
379
380async fn start_iroh_api(
381    secret_key: iroh::SecretKey,
382    api_bind: SocketAddr,
383    iroh_dns: Option<SafeUrl>,
384    iroh_relays: Vec<SafeUrl>,
385    consensus_api: ConsensusApi,
386    task_group: &TaskGroup,
387) -> anyhow::Result<()> {
388    let endpoint = build_iroh_endpoint(
389        secret_key,
390        api_bind,
391        iroh_dns,
392        iroh_relays,
393        FEDIMINT_API_ALPN,
394    )
395    .await?;
396    task_group.spawn_cancellable(
397        "iroh-api",
398        run_iroh_api(consensus_api, endpoint, task_group.clone()),
399    );
400
401    Ok(())
402}
403
404async fn run_iroh_api(consensus_api: ConsensusApi, endpoint: Endpoint, task_group: TaskGroup) {
405    let core_api = server_endpoints()
406        .into_iter()
407        .map(|endpoint| (endpoint.path.to_string(), endpoint))
408        .collect::<BTreeMap<String, ApiEndpoint<ConsensusApi>>>();
409
410    let module_api = consensus_api
411        .modules
412        .iter_modules()
413        .map(|(id, _, module)| {
414            let api_endpoints = module
415                .api_endpoints()
416                .into_iter()
417                .map(|endpoint| (endpoint.path.to_string(), endpoint))
418                .collect::<BTreeMap<String, ApiEndpoint<DynServerModule>>>();
419
420            (id, api_endpoints)
421        })
422        .collect::<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>();
423
424    let consensus_api = Arc::new(consensus_api);
425    let core_api = Arc::new(core_api);
426    let module_api = Arc::new(module_api);
427
428    loop {
429        match endpoint.accept().await {
430            Some(incoming) => {
431                task_group.spawn_cancellable_silent(
432                    "handle-iroh-connection",
433                    handle_incoming(
434                        consensus_api.clone(),
435                        core_api.clone(),
436                        module_api.clone(),
437                        task_group.clone(),
438                        incoming,
439                    )
440                    .then(|result| async {
441                        if let Err(err) = result {
442                            warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh connection");
443                        }
444                    }),
445                );
446            }
447            None => return,
448        }
449    }
450}
451
452async fn handle_incoming(
453    consensus_api: Arc<ConsensusApi>,
454    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
455    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
456    task_group: TaskGroup,
457    incoming: Incoming,
458) -> anyhow::Result<()> {
459    let connection = incoming.accept()?.await?;
460
461    loop {
462        let (send_stream, recv_stream) = connection.accept_bi().await?;
463
464        task_group.spawn_cancellable_silent(
465            "handle-iroh-request",
466            handle_request(
467                consensus_api.clone(),
468                core_api.clone(),
469                module_api.clone(),
470                send_stream,
471                recv_stream,
472            )
473            .then(|result| async {
474                if let Err(err) = result {
475                    warn!(target: LOG_NET_API, err = %err.fmt_compact_anyhow(), "Failed to handle iroh request");
476                }
477            }),
478        );
479    }
480}
481
482async fn handle_request(
483    consensus_api: Arc<ConsensusApi>,
484    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
485    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
486    mut send_stream: SendStream,
487    mut recv_stream: RecvStream,
488) -> anyhow::Result<()> {
489    let request = recv_stream.read_to_end(100_000).await?;
490
491    let request = serde_json::from_slice::<IrohApiRequest>(&request)?;
492
493    let response = await_response(consensus_api, core_api, module_api, request).await;
494
495    let response = serde_json::to_vec(&response)?;
496
497    send_stream.write_all(&response).await?;
498
499    send_stream.finish()?;
500
501    Ok(())
502}
503
504async fn await_response(
505    consensus_api: Arc<ConsensusApi>,
506    core_api: Arc<BTreeMap<String, ApiEndpoint<ConsensusApi>>>,
507    module_api: Arc<BTreeMap<ModuleInstanceId, BTreeMap<String, ApiEndpoint<DynServerModule>>>>,
508    request: IrohApiRequest,
509) -> Result<Value, ApiError> {
510    match request.method {
511        ApiMethod::Core(method) => {
512            let endpoint = core_api.get(&method).ok_or(ApiError::not_found(method))?;
513
514            let (state, context) = consensus_api.context(&request.request, None).await;
515
516            (endpoint.handler)(state, context, request.request).await
517        }
518        ApiMethod::Module(module_id, method) => {
519            let endpoint = module_api
520                .get(&module_id)
521                .ok_or(ApiError::not_found(module_id.to_string()))?
522                .get(&method)
523                .ok_or(ApiError::not_found(method))?;
524
525            let (state, context) = consensus_api
526                .context(&request.request, Some(module_id))
527                .await;
528
529            (endpoint.handler)(state, context, request.request).await
530        }
531    }
532}