1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc, RootSecret};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_connectors::ConnectorRegistry;
11use fedimint_core::PeerId;
12use fedimint_core::config::{ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry};
13use fedimint_core::core::ModuleKind;
14use fedimint_core::db::Database;
15use fedimint_core::db::mem_impl::MemDatabase;
16use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
17use fedimint_core::invite_code::InviteCode;
18use fedimint_core::module::{ApiAuth, ApiRequestErased};
19use fedimint_core::net::peers::IP2PConnections;
20use fedimint_core::rustls::install_crypto_provider;
21use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
22use fedimint_gateway_common::ConnectFedPayload;
23use fedimint_gateway_server::{Gateway, IAdminGateway};
24use fedimint_logging::LOG_TEST;
25use fedimint_rocksdb::RocksDb;
26use fedimint_server::config::ServerConfig;
27use fedimint_server::core::ServerModuleInitRegistry;
28use fedimint_server::net::api::ApiSecrets;
29use fedimint_server::net::p2p::{
30 ReconnectP2PConnections, p2p_connection_type_channels, p2p_status_channels,
31};
32use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
33use fedimint_server::{ConnectionLimits, consensus};
34use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
35use fedimint_testing_core::config::local_config_gen_params;
36use tracing::info;
37
38#[derive(Clone)]
40pub struct FederationTest {
41 configs: BTreeMap<PeerId, ServerConfig>,
42 server_init: ServerModuleInitRegistry,
43 client_init: ClientModuleInitRegistry,
44 _task: TaskGroup,
45 num_peers: u16,
46 num_offline: u16,
47 connectors: ConnectorRegistry,
48}
49
50impl FederationTest {
51 pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
53 (self.new_client().await, self.new_client().await)
54 }
55
56 pub async fn new_client(&self) -> ClientHandleArc {
58 let client_config = self.configs[&PeerId::from(0)]
59 .consensus
60 .to_client_config(&self.server_init)
61 .unwrap();
62
63 self.new_client_with(client_config, MemDatabase::new().into(), None)
64 .await
65 }
66
67 pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
69 let client_config = self.configs[&PeerId::from(0)]
70 .consensus
71 .to_client_config(&self.server_init)
72 .unwrap();
73
74 self.new_client_with(
75 client_config,
76 RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
77 .await
78 .expect("Couldn't open DB")
79 .into(),
80 None,
81 )
82 .await
83 }
84
85 pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
87 let config = self.configs.get(&peer_id).expect("peer to have config");
88
89 DynGlobalApi::new_admin(
90 ConnectorRegistry::build_from_testing_env()?.bind().await?,
91 peer_id,
92 config.consensus.api_endpoints()[&peer_id].url.clone(),
93 None,
94 )
95 }
96
97 pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
99 let client_config = self.configs[&PeerId::from(0)]
100 .consensus
101 .to_client_config(&self.server_init)
102 .unwrap();
103
104 let admin_creds = AdminCreds { peer_id, auth };
105
106 self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
107 .await
108 }
109
110 pub async fn new_client_with(
111 &self,
112 client_config: ClientConfig,
113 db: Database,
114 admin_creds: Option<AdminCreds>,
115 ) -> ClientHandleArc {
116 info!(target: LOG_TEST, "Setting new client with config");
117 let mut client_builder = Client::builder().await.expect("Failed to build client");
118 client_builder.with_module_inits(self.client_init.clone());
119 if let Some(admin_creds) = admin_creds {
120 client_builder.set_admin_creds(admin_creds);
121 }
122 let client_secret = Client::load_or_generate_client_secret(&db).await.unwrap();
123 client_builder
124 .preview_with_existing_config(self.connectors.clone(), client_config, None)
125 .await
126 .expect("Preview failed")
127 .join(
128 db,
129 RootSecret::StandardDoubleDerive(PlainRootSecretStrategy::to_root_secret(
130 &client_secret,
131 )),
132 )
133 .await
134 .map(Arc::new)
135 .expect("Failed to build client")
136 }
137
138 pub fn invite_code(&self) -> InviteCode {
140 self.configs[&PeerId::from(0)].get_invite_code(None)
141 }
142
143 pub fn id(&self) -> FederationId {
145 self.configs[&PeerId::from(0)]
146 .consensus
147 .to_client_config(&self.server_init)
148 .unwrap()
149 .global
150 .calculate_federation_id()
151 }
152
153 pub async fn connect_gateway(&self, gw: &Gateway) {
155 gw.handle_connect_federation(ConnectFedPayload {
156 invite_code: self.invite_code().to_string(),
157 use_tor: Some(false),
158 recover: Some(false),
159 })
160 .await
161 .expect("Failed to connect federation");
162 }
163
164 pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
166 (0..(self.num_peers - self.num_offline)).map(PeerId::from)
168 }
169
170 pub fn is_degraded(&self) -> bool {
172 self.num_offline > 0
173 }
174}
175
176#[derive(Clone, Debug)]
178pub struct FederationTestBuilder {
179 num_peers: u16,
180 num_offline: u16,
181 base_port: u16,
182 primary_module_kind: ModuleKind,
183 version_hash: String,
184 modules: ServerModuleConfigGenParamsRegistry,
185 server_init: ServerModuleInitRegistry,
186 client_init: ClientModuleInitRegistry,
187 bitcoin_rpc_connection: DynServerBitcoinRpc,
188 enable_mint_fees: bool,
189}
190
191impl FederationTestBuilder {
192 pub fn new(
193 params: ServerModuleConfigGenParamsRegistry,
194 server_init: ServerModuleInitRegistry,
195 client_init: ClientModuleInitRegistry,
196 primary_module_kind: ModuleKind,
197 num_offline: u16,
198 bitcoin_rpc_connection: DynServerBitcoinRpc,
199 ) -> FederationTestBuilder {
200 let num_peers = 4;
201 Self {
202 num_peers,
203 num_offline,
204 base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
205 .expect("Failed to allocate a port range"),
206 primary_module_kind,
207 version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
208 modules: params,
209 server_init,
210 client_init,
211 bitcoin_rpc_connection,
212 enable_mint_fees: true,
213 }
214 }
215
216 pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
217 self.num_peers = num_peers;
218 self
219 }
220
221 pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
222 self.num_offline = num_offline;
223 self
224 }
225
226 pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
227 self.base_port = base_port;
228 self
229 }
230
231 pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
232 self.primary_module_kind = primary_module_kind;
233 self
234 }
235
236 pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
237 self.version_hash = version_hash;
238 self
239 }
240
241 pub fn disable_mint_fees(mut self) -> FederationTestBuilder {
242 self.enable_mint_fees = false;
243 self
244 }
245
246 #[allow(clippy::too_many_lines)]
247 pub async fn build(self) -> FederationTest {
248 install_crypto_provider().await;
249 let num_offline = self.num_offline;
250 assert!(
251 self.num_peers > 3 * self.num_offline,
252 "too many peers offline ({num_offline}) to reach consensus"
253 );
254 let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
255 let params = local_config_gen_params(&peers, self.base_port, self.enable_mint_fees)
256 .expect("Generates local config");
257
258 let configs = ServerConfig::trusted_dealer_gen(
259 self.modules,
260 ¶ms,
261 &self.server_init,
262 &self.version_hash,
263 );
264
265 let task_group = TaskGroup::new();
266 for (peer_id, cfg) in configs.clone() {
267 let peer_port = self.base_port + u16::from(peer_id) * 3;
268
269 let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
270 let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
271 let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
272
273 if u16::from(peer_id) >= self.num_peers - self.num_offline {
274 continue;
275 }
276
277 let instances = cfg.consensus.iter_module_instances();
278 let decoders = self.server_init.available_decoders(instances).unwrap();
279 let db = Database::new(MemDatabase::new(), decoders);
280 let module_init_registry = self.server_init.clone();
281 let subgroup = task_group.make_subgroup();
282 let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().keep();
283 let code_version_str = env!("CARGO_PKG_VERSION");
284
285 let connector = TlsTcpConnector::new(
286 cfg.tls_config(),
287 p2p_bind,
288 cfg.local.p2p_endpoints.clone(),
289 cfg.local.identity,
290 )
291 .await
292 .into_dyn();
293
294 let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
295 let (p2p_connection_type_senders, p2p_connection_type_receivers) =
296 p2p_connection_type_channels(connector.peers());
297
298 let connections = ReconnectP2PConnections::new(
299 cfg.local.identity,
300 connector,
301 &task_group,
302 p2p_status_senders,
303 p2p_connection_type_senders,
304 )
305 .into_dyn();
306
307 let bitcoin_rpc_connection = self.bitcoin_rpc_connection.clone();
308
309 task_group.spawn("fedimintd", move |_| async move {
310 Box::pin(consensus::run(
311 ConnectorRegistry::build_from_testing_env()
312 .unwrap()
313 .bind()
314 .await
315 .unwrap(),
316 connections,
317 p2p_status_receivers,
318 p2p_connection_type_receivers,
319 api_bind,
320 None,
321 vec![],
322 cfg.clone(),
323 db.clone(),
324 module_init_registry,
325 &subgroup,
326 ApiSecrets::default(),
327 checkpoint_dir,
328 code_version_str.to_string(),
329 bitcoin_rpc_connection,
330 ui_bind,
331 Box::new(|_| axum::Router::new()),
332 1,
333 ConnectionLimits {
334 max_connections: 1000,
335 max_requests_per_connection: 100,
336 },
337 ))
338 .await
339 .expect("Could not initialise consensus");
340 });
341 }
342
343 for (peer_id, config) in configs.clone() {
344 if u16::from(peer_id) >= self.num_peers - self.num_offline {
345 continue;
346 }
347
348 let connectors = ConnectorRegistry::build_from_testing_env()
349 .unwrap()
350 .bind()
351 .await
352 .unwrap();
353 let api = DynGlobalApi::new_admin(
354 connectors,
355 peer_id,
356 config.consensus.api_endpoints()[&peer_id].url.clone(),
357 None,
358 )
359 .unwrap();
360
361 while let Err(e) = api
362 .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
363 .await
364 {
365 sleep_in_test(
366 format!("Waiting for api of peer {peer_id} to come online: {e}"),
367 Duration::from_millis(500),
368 )
369 .await;
370 }
371 }
372
373 FederationTest {
374 configs,
375 server_init: self.server_init,
376 client_init: self.client_init,
377 _task: task_group,
378 num_peers: self.num_peers,
379 num_offline: self.num_offline,
380 connectors: ConnectorRegistry::build_from_testing_env()
381 .expect("Failed to initialize endpoints for testing (env)")
382 .bind()
383 .await
384 .expect("Failed to initialize endpoints for testing"),
385 }
386 }
387}