1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc, RootSecret};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_core::PeerId;
11use fedimint_core::config::{ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry};
12use fedimint_core::core::ModuleKind;
13use fedimint_core::db::Database;
14use fedimint_core::db::mem_impl::MemDatabase;
15use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::{ApiAuth, ApiRequestErased};
18use fedimint_core::net::peers::IP2PConnections;
19use fedimint_core::rustls::install_crypto_provider;
20use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
21use fedimint_gateway_common::ConnectFedPayload;
22use fedimint_gateway_server::Gateway;
23use fedimint_logging::LOG_TEST;
24use fedimint_rocksdb::RocksDb;
25use fedimint_server::config::ServerConfig;
26use fedimint_server::core::ServerModuleInitRegistry;
27use fedimint_server::net::api::ApiSecrets;
28use fedimint_server::net::p2p::{
29 ReconnectP2PConnections, p2p_connection_type_channels, p2p_status_channels,
30};
31use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
32use fedimint_server::{ConnectionLimits, consensus};
33use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
34use fedimint_testing_core::config::local_config_gen_params;
35use tracing::info;
36
37#[derive(Clone)]
39pub struct FederationTest {
40 configs: BTreeMap<PeerId, ServerConfig>,
41 server_init: ServerModuleInitRegistry,
42 client_init: ClientModuleInitRegistry,
43 _task: TaskGroup,
44 num_peers: u16,
45 num_offline: u16,
46}
47
48impl FederationTest {
49 pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
51 (self.new_client().await, self.new_client().await)
52 }
53
54 pub async fn new_client(&self) -> ClientHandleArc {
56 let client_config = self.configs[&PeerId::from(0)]
57 .consensus
58 .to_client_config(&self.server_init)
59 .unwrap();
60
61 self.new_client_with(client_config, MemDatabase::new().into(), None)
62 .await
63 }
64
65 pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
67 let client_config = self.configs[&PeerId::from(0)]
68 .consensus
69 .to_client_config(&self.server_init)
70 .unwrap();
71
72 self.new_client_with(
73 client_config,
74 RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
75 .await
76 .expect("Couldn't open DB")
77 .into(),
78 None,
79 )
80 .await
81 }
82
83 pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
85 let config = self.configs.get(&peer_id).expect("peer to have config");
86
87 DynGlobalApi::new_admin(
88 peer_id,
89 config.consensus.api_endpoints()[&peer_id].url.clone(),
90 &None,
91 false,
93 false,
95 )
96 .await
97 }
98
99 pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
101 let client_config = self.configs[&PeerId::from(0)]
102 .consensus
103 .to_client_config(&self.server_init)
104 .unwrap();
105
106 let admin_creds = AdminCreds { peer_id, auth };
107
108 self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
109 .await
110 }
111
112 pub async fn new_client_with(
113 &self,
114 client_config: ClientConfig,
115 db: Database,
116 admin_creds: Option<AdminCreds>,
117 ) -> ClientHandleArc {
118 info!(target: LOG_TEST, "Setting new client with config");
119 let mut client_builder = Client::builder().await.expect("Failed to build client");
120 client_builder.with_module_inits(self.client_init.clone());
121 if let Some(admin_creds) = admin_creds {
122 client_builder.set_admin_creds(admin_creds);
123 }
124 let client_secret = Client::load_or_generate_client_secret(&db).await.unwrap();
125 client_builder
126 .preview_with_existing_config(client_config, None, None)
127 .await
128 .expect("Preview failed")
129 .join(
130 db,
131 RootSecret::StandardDoubleDerive(PlainRootSecretStrategy::to_root_secret(
132 &client_secret,
133 )),
134 )
135 .await
136 .map(Arc::new)
137 .expect("Failed to build client")
138 }
139
140 pub fn invite_code(&self) -> InviteCode {
142 self.configs[&PeerId::from(0)].get_invite_code(None)
143 }
144
145 pub fn id(&self) -> FederationId {
147 self.configs[&PeerId::from(0)]
148 .consensus
149 .to_client_config(&self.server_init)
150 .unwrap()
151 .global
152 .calculate_federation_id()
153 }
154
155 pub async fn connect_gateway(&self, gw: &Gateway) {
157 gw.handle_connect_federation(ConnectFedPayload {
158 invite_code: self.invite_code().to_string(),
159 use_tor: Some(false),
160 recover: Some(false),
161 })
162 .await
163 .expect("Failed to connect federation");
164 }
165
166 pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
168 (0..(self.num_peers - self.num_offline)).map(PeerId::from)
170 }
171
172 pub fn is_degraded(&self) -> bool {
174 self.num_offline > 0
175 }
176}
177
178#[derive(Clone, Debug)]
180pub struct FederationTestBuilder {
181 num_peers: u16,
182 num_offline: u16,
183 base_port: u16,
184 primary_module_kind: ModuleKind,
185 version_hash: String,
186 modules: ServerModuleConfigGenParamsRegistry,
187 server_init: ServerModuleInitRegistry,
188 client_init: ClientModuleInitRegistry,
189 bitcoin_rpc_connection: DynServerBitcoinRpc,
190 enable_mint_fees: bool,
191}
192
193impl FederationTestBuilder {
194 pub fn new(
195 params: ServerModuleConfigGenParamsRegistry,
196 server_init: ServerModuleInitRegistry,
197 client_init: ClientModuleInitRegistry,
198 primary_module_kind: ModuleKind,
199 num_offline: u16,
200 bitcoin_rpc_connection: DynServerBitcoinRpc,
201 ) -> FederationTestBuilder {
202 let num_peers = 4;
203 Self {
204 num_peers,
205 num_offline,
206 base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
207 .expect("Failed to allocate a port range"),
208 primary_module_kind,
209 version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
210 modules: params,
211 server_init,
212 client_init,
213 bitcoin_rpc_connection,
214 enable_mint_fees: true,
215 }
216 }
217
218 pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
219 self.num_peers = num_peers;
220 self
221 }
222
223 pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
224 self.num_offline = num_offline;
225 self
226 }
227
228 pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
229 self.base_port = base_port;
230 self
231 }
232
233 pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
234 self.primary_module_kind = primary_module_kind;
235 self
236 }
237
238 pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
239 self.version_hash = version_hash;
240 self
241 }
242
243 pub fn disable_mint_fees(mut self) -> FederationTestBuilder {
244 self.enable_mint_fees = false;
245 self
246 }
247
248 #[allow(clippy::too_many_lines)]
249 pub async fn build(self) -> FederationTest {
250 install_crypto_provider().await;
251 let num_offline = self.num_offline;
252 assert!(
253 self.num_peers > 3 * self.num_offline,
254 "too many peers offline ({num_offline}) to reach consensus"
255 );
256 let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
257 let params = local_config_gen_params(&peers, self.base_port, self.enable_mint_fees)
258 .expect("Generates local config");
259
260 let configs = ServerConfig::trusted_dealer_gen(
261 self.modules,
262 ¶ms,
263 &self.server_init,
264 &self.version_hash,
265 );
266
267 let task_group = TaskGroup::new();
268 for (peer_id, cfg) in configs.clone() {
269 let peer_port = self.base_port + u16::from(peer_id) * 3;
270
271 let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
272 let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
273 let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
274
275 if u16::from(peer_id) >= self.num_peers - self.num_offline {
276 continue;
277 }
278
279 let instances = cfg.consensus.iter_module_instances();
280 let decoders = self.server_init.available_decoders(instances).unwrap();
281 let db = Database::new(MemDatabase::new(), decoders);
282 let module_init_registry = self.server_init.clone();
283 let subgroup = task_group.make_subgroup();
284 let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().keep();
285 let code_version_str = env!("CARGO_PKG_VERSION");
286
287 let connector = TlsTcpConnector::new(
288 cfg.tls_config(),
289 p2p_bind,
290 cfg.local.p2p_endpoints.clone(),
291 cfg.local.identity,
292 )
293 .await
294 .into_dyn();
295
296 let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
297 let (p2p_connection_type_senders, p2p_connection_type_receivers) =
298 p2p_connection_type_channels(connector.peers());
299
300 let connections = ReconnectP2PConnections::new(
301 cfg.local.identity,
302 connector,
303 &task_group,
304 p2p_status_senders,
305 p2p_connection_type_senders,
306 )
307 .into_dyn();
308
309 let bitcoin_rpc_connection = self.bitcoin_rpc_connection.clone();
310
311 task_group.spawn("fedimintd", move |_| async move {
312 Box::pin(consensus::run(
313 connections,
314 p2p_status_receivers,
315 p2p_connection_type_receivers,
316 api_bind,
317 None,
318 vec![],
319 cfg.clone(),
320 db.clone(),
321 module_init_registry,
322 &subgroup,
323 ApiSecrets::default(),
324 checkpoint_dir,
325 code_version_str.to_string(),
326 bitcoin_rpc_connection,
327 ui_bind,
328 Box::new(|_| axum::Router::new()),
329 1,
330 ConnectionLimits {
331 max_connections: 1000,
332 max_requests_per_connection: 100,
333 },
334 ))
335 .await
336 .expect("Could not initialise consensus");
337 });
338 }
339
340 for (peer_id, config) in configs.clone() {
341 if u16::from(peer_id) >= self.num_peers - self.num_offline {
342 continue;
343 }
344
345 let api = DynGlobalApi::new_admin(
348 peer_id,
349 config.consensus.api_endpoints()[&peer_id].url.clone(),
350 &None,
351 false,
353 false,
354 )
355 .await
356 .unwrap();
357
358 while let Err(e) = api
359 .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
360 .await
361 {
362 sleep_in_test(
363 format!("Waiting for api of peer {peer_id} to come online: {e}"),
364 Duration::from_millis(500),
365 )
366 .await;
367 }
368 }
369
370 FederationTest {
371 configs,
372 server_init: self.server_init,
373 client_init: self.client_init,
374 _task: task_group,
375 num_peers: self.num_peers,
376 num_offline: self.num_offline,
377 }
378 }
379}