Skip to main content

fedimint_testing/
federation.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc, RootSecret};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_connectors::ConnectorRegistry;
11use fedimint_core::PeerId;
12use fedimint_core::config::{ClientConfig, FederationId};
13use fedimint_core::core::ModuleKind;
14use fedimint_core::db::Database;
15use fedimint_core::db::mem_impl::MemDatabase;
16use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
17use fedimint_core::invite_code::InviteCode;
18use fedimint_core::module::{ApiAuth, ApiRequestErased};
19use fedimint_core::net::peers::IP2PConnections;
20use fedimint_core::rustls::install_crypto_provider;
21use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
22use fedimint_gateway_common::ConnectFedPayload;
23use fedimint_gateway_server::{Gateway, IAdminGateway};
24use fedimint_logging::LOG_TEST;
25use fedimint_rocksdb::RocksDb;
26use fedimint_server::config::ServerConfig;
27use fedimint_server::core::ServerModuleInitRegistry;
28use fedimint_server::net::api::ApiSecrets;
29use fedimint_server::net::p2p::{ReconnectP2PConnections, p2p_status_channels};
30use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
31use fedimint_server::{ConnectionLimits, consensus};
32use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
33use fedimint_testing_core::config::local_config_gen_params;
34use tracing::info;
35
36/// Test fixture for a running fedimint federation
37#[derive(Clone)]
38pub struct FederationTest {
39    configs: BTreeMap<PeerId, ServerConfig>,
40    server_init: ServerModuleInitRegistry,
41    client_init: ClientModuleInitRegistry,
42    _task: TaskGroup,
43    num_peers: u16,
44    num_offline: u16,
45    connectors: ConnectorRegistry,
46}
47
48impl FederationTest {
49    /// Create two clients, useful for send/receive tests
50    pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
51        (self.new_client().await, self.new_client().await)
52    }
53
54    /// Create a client connected to this fed
55    pub async fn new_client(&self) -> ClientHandleArc {
56        let client_config = self.configs[&PeerId::from(0)]
57            .consensus
58            .to_client_config(&self.server_init)
59            .unwrap();
60
61        self.new_client_with(client_config, MemDatabase::new().into(), None)
62            .await
63    }
64
65    /// Create a client connected to this fed but using RocksDB instead of MemDB
66    pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
67        let client_config = self.configs[&PeerId::from(0)]
68            .consensus
69            .to_client_config(&self.server_init)
70            .unwrap();
71
72        self.new_client_with(
73            client_config,
74            RocksDb::build(tempfile::tempdir().expect("Couldn't create temp dir"))
75                .open()
76                .await
77                .expect("Couldn't open DB")
78                .into(),
79            None,
80        )
81        .await
82    }
83
84    /// Create a new admin api for the given PeerId
85    pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
86        let config = self.configs.get(&peer_id).expect("peer to have config");
87
88        DynGlobalApi::new_admin(
89            ConnectorRegistry::build_from_testing_env()?.bind().await?,
90            peer_id,
91            config.consensus.api_endpoints()[&peer_id].url.clone(),
92            None,
93        )
94    }
95
96    /// Create a new admin client connected to this fed
97    pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
98        let client_config = self.configs[&PeerId::from(0)]
99            .consensus
100            .to_client_config(&self.server_init)
101            .unwrap();
102
103        let admin_creds = AdminCreds { peer_id, auth };
104
105        self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
106            .await
107    }
108
109    pub async fn new_client_with(
110        &self,
111        client_config: ClientConfig,
112        db: Database,
113        admin_creds: Option<AdminCreds>,
114    ) -> ClientHandleArc {
115        info!(target: LOG_TEST, "Setting new client with config");
116        let mut client_builder = Client::builder().await.expect("Failed to build client");
117        client_builder.with_module_inits(self.client_init.clone());
118        if let Some(admin_creds) = admin_creds {
119            client_builder.set_admin_creds(admin_creds);
120        }
121        let client_secret = Client::load_or_generate_client_secret(&db).await.unwrap();
122        client_builder
123            .preview_with_existing_config(self.connectors.clone(), client_config, None)
124            .await
125            .expect("Preview failed")
126            .join(
127                db,
128                RootSecret::StandardDoubleDerive(PlainRootSecretStrategy::to_root_secret(
129                    &client_secret,
130                )),
131            )
132            .await
133            .map(Arc::new)
134            .expect("Failed to build client")
135    }
136
137    /// Join a federation with an existing database and root secret
138    pub async fn join_client_with_db(
139        &self,
140        db: Database,
141        root_secret: RootSecret,
142    ) -> ClientHandleArc {
143        let client_config = self.configs[&PeerId::from(0)]
144            .consensus
145            .to_client_config(&self.server_init)
146            .unwrap();
147
148        info!(target: LOG_TEST, "Joining client with existing db");
149        let mut client_builder = Client::builder().await.expect("Failed to build client");
150        client_builder.with_module_inits(self.client_init.clone());
151        client_builder
152            .preview_with_existing_config(self.connectors.clone(), client_config, None)
153            .await
154            .expect("Preview failed")
155            .join(db, root_secret)
156            .await
157            .map(Arc::new)
158            .expect("Failed to join client")
159    }
160
161    /// Create a recovering client with an existing database and root secret.
162    /// Returns both the client and the database so a new client can be created
163    /// with the same DB after recovery completes.
164    pub async fn recover_client_with_db(
165        &self,
166        db: Database,
167        root_secret: RootSecret,
168    ) -> ClientHandleArc {
169        let client_config = self.configs[&PeerId::from(0)]
170            .consensus
171            .to_client_config(&self.server_init)
172            .unwrap();
173
174        info!(target: LOG_TEST, "Recovering client with existing db");
175        let mut client_builder = Client::builder().await.expect("Failed to build client");
176        client_builder.with_module_inits(self.client_init.clone());
177        client_builder
178            .preview_with_existing_config(self.connectors.clone(), client_config, None)
179            .await
180            .expect("Preview failed")
181            .recover(db, root_secret, None)
182            .await
183            .map(Arc::new)
184            .expect("Failed to recover client")
185    }
186
187    /// Open an existing client database (e.g., after recovery)
188    pub async fn open_client_with_db(
189        &self,
190        db: Database,
191        root_secret: RootSecret,
192    ) -> ClientHandleArc {
193        info!(target: LOG_TEST, "Opening client with existing db");
194        let mut client_builder = Client::builder().await.expect("Failed to build client");
195        client_builder.with_module_inits(self.client_init.clone());
196        client_builder
197            .open(self.connectors.clone(), db, root_secret)
198            .await
199            .map(Arc::new)
200            .expect("Failed to open client")
201    }
202
203    /// Return first invite code for gateways
204    pub fn invite_code(&self) -> InviteCode {
205        let peer_id = PeerId::from(0);
206        let cfg = &self.configs[&peer_id];
207        InviteCode::new(
208            cfg.consensus.api_endpoints()[&peer_id].url.clone(),
209            peer_id,
210            cfg.calculate_federation_id(),
211            None,
212        )
213    }
214
215    ///  Return the federation id
216    pub fn id(&self) -> FederationId {
217        self.configs[&PeerId::from(0)]
218            .consensus
219            .to_client_config(&self.server_init)
220            .unwrap()
221            .global
222            .calculate_federation_id()
223    }
224
225    /// Connects a gateway to this `FederationTest`
226    pub async fn connect_gateway(&self, gw: &Gateway) {
227        gw.handle_connect_federation(ConnectFedPayload {
228            invite_code: self.invite_code().to_string(),
229            use_tor: Some(false),
230            recover: Some(false),
231        })
232        .await
233        .expect("Failed to connect federation");
234    }
235
236    /// Return all online PeerIds
237    pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
238        // we can assume this ordering since peers are started in ascending order
239        (0..(self.num_peers - self.num_offline)).map(PeerId::from)
240    }
241
242    /// Returns true if the federation is running in a degraded state
243    pub fn is_degraded(&self) -> bool {
244        self.num_offline > 0
245    }
246}
247
248/// Builder struct for creating a `FederationTest`.
249#[derive(Clone, Debug)]
250pub struct FederationTestBuilder {
251    num_peers: u16,
252    num_offline: u16,
253    base_port: u16,
254    primary_module_kind: ModuleKind,
255    version_hash: String,
256    server_init: ServerModuleInitRegistry,
257    client_init: ClientModuleInitRegistry,
258    bitcoin_rpc_connection: DynServerBitcoinRpc,
259    enable_mint_fees: bool,
260}
261
262impl FederationTestBuilder {
263    pub fn new(
264        server_init: ServerModuleInitRegistry,
265        client_init: ClientModuleInitRegistry,
266        primary_module_kind: ModuleKind,
267        num_offline: u16,
268        bitcoin_rpc_connection: DynServerBitcoinRpc,
269    ) -> FederationTestBuilder {
270        let num_peers = 4;
271        Self {
272            num_peers,
273            num_offline,
274            base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
275                .expect("Failed to allocate a port range"),
276            primary_module_kind,
277            version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
278            server_init,
279            client_init,
280            bitcoin_rpc_connection,
281            enable_mint_fees: true,
282        }
283    }
284
285    pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
286        self.num_peers = num_peers;
287        self
288    }
289
290    pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
291        self.num_offline = num_offline;
292        self
293    }
294
295    pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
296        self.base_port = base_port;
297        self
298    }
299
300    pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
301        self.primary_module_kind = primary_module_kind;
302        self
303    }
304
305    pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
306        self.version_hash = version_hash;
307        self
308    }
309
310    pub fn disable_mint_fees(mut self) -> FederationTestBuilder {
311        self.enable_mint_fees = false;
312        self
313    }
314
315    #[allow(clippy::too_many_lines)]
316    pub async fn build(self) -> FederationTest {
317        install_crypto_provider().await;
318        let num_offline = self.num_offline;
319        assert!(
320            self.num_peers > 3 * self.num_offline,
321            "too many peers offline ({num_offline}) to reach consensus"
322        );
323        let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
324        let params = local_config_gen_params(
325            &peers,
326            self.base_port,
327            self.enable_mint_fees,
328            &self.server_init,
329        )
330        .expect("Generates local config");
331
332        let configs =
333            ServerConfig::trusted_dealer_gen(&params, &self.server_init, &self.version_hash);
334
335        let task_group = TaskGroup::new();
336        for (peer_id, cfg) in configs.clone() {
337            let peer_port = self.base_port + u16::from(peer_id) * 3;
338
339            let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
340            let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
341            let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
342
343            if u16::from(peer_id) >= self.num_peers - self.num_offline {
344                continue;
345            }
346
347            let instances = cfg.consensus.iter_module_instances();
348            let decoders = self.server_init.available_decoders(instances).unwrap();
349            let db = Database::new(MemDatabase::new(), decoders);
350            let module_init_registry = self.server_init.clone();
351            let subgroup = task_group.make_subgroup();
352            let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().keep();
353            let code_version_str = env!("CARGO_PKG_VERSION");
354
355            let connector = TlsTcpConnector::new(
356                cfg.tls_config(),
357                p2p_bind,
358                cfg.local.p2p_endpoints.clone(),
359                cfg.local.identity,
360            )
361            .await
362            .into_dyn();
363
364            let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
365
366            let connections = ReconnectP2PConnections::new(
367                cfg.local.identity,
368                connector,
369                &task_group,
370                p2p_status_senders,
371            )
372            .into_dyn();
373
374            let bitcoin_rpc_connection = self.bitcoin_rpc_connection.clone();
375
376            task_group.spawn("fedimintd", move |_| async move {
377                Box::pin(consensus::run(
378                    ConnectorRegistry::build_from_testing_env()
379                        .unwrap()
380                        .bind()
381                        .await
382                        .unwrap(),
383                    connections,
384                    p2p_status_receivers,
385                    api_bind,
386                    None,
387                    vec![],
388                    cfg.clone(),
389                    db.clone(),
390                    module_init_registry,
391                    &subgroup,
392                    ApiSecrets::default(),
393                    checkpoint_dir,
394                    code_version_str.to_string(),
395                    bitcoin_rpc_connection,
396                    ui_bind,
397                    Box::new(|_| axum::Router::new()),
398                    1,
399                    ConnectionLimits {
400                        max_connections: 1000,
401                        max_requests_per_connection: 100,
402                    },
403                ))
404                .await
405                .expect("Could not initialise consensus");
406            });
407        }
408
409        for (peer_id, config) in configs.clone() {
410            if u16::from(peer_id) >= self.num_peers - self.num_offline {
411                continue;
412            }
413
414            let connectors = ConnectorRegistry::build_from_testing_env()
415                .unwrap()
416                .bind()
417                .await
418                .unwrap();
419            let api = DynGlobalApi::new_admin(
420                connectors,
421                peer_id,
422                config.consensus.api_endpoints()[&peer_id].url.clone(),
423                None,
424            )
425            .unwrap();
426
427            while let Err(e) = api
428                .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
429                .await
430            {
431                sleep_in_test(
432                    format!("Waiting for api of peer {peer_id} to come online: {e}"),
433                    Duration::from_millis(500),
434                )
435                .await;
436            }
437        }
438
439        FederationTest {
440            configs,
441            server_init: self.server_init,
442            client_init: self.client_init,
443            _task: task_group,
444            num_peers: self.num_peers,
445            num_offline: self.num_offline,
446            connectors: ConnectorRegistry::build_from_testing_env()
447                .expect("Failed to initialize endpoints for testing (env)")
448                .bind()
449                .await
450                .expect("Failed to initialize endpoints for testing"),
451        }
452    }
453}