1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_core::PeerId;
11use fedimint_core::config::{ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry};
12use fedimint_core::core::ModuleKind;
13use fedimint_core::db::Database;
14use fedimint_core::db::mem_impl::MemDatabase;
15use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::{ApiAuth, ApiRequestErased};
18use fedimint_core::net::peers::IP2PConnections;
19use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
20use fedimint_gateway_common::ConnectFedPayload;
21use fedimint_gateway_server::Gateway;
22use fedimint_logging::LOG_TEST;
23use fedimint_rocksdb::RocksDb;
24use fedimint_server::config::ServerConfig;
25use fedimint_server::consensus;
26use fedimint_server::core::ServerModuleInitRegistry;
27use fedimint_server::net::p2p::{ReconnectP2PConnections, p2p_status_channels};
28use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
29use fedimint_testing_core::config::local_config_gen_params;
30use tracing::info;
31
32#[derive(Clone)]
34pub struct FederationTest {
35 configs: BTreeMap<PeerId, ServerConfig>,
36 server_init: ServerModuleInitRegistry,
37 client_init: ClientModuleInitRegistry,
38 primary_module_kind: ModuleKind,
39 _task: TaskGroup,
40 num_peers: u16,
41 num_offline: u16,
42}
43
44impl FederationTest {
45 pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
47 (self.new_client().await, self.new_client().await)
48 }
49
50 pub async fn new_client(&self) -> ClientHandleArc {
52 let client_config = self.configs[&PeerId::from(0)]
53 .consensus
54 .to_client_config(&self.server_init)
55 .unwrap();
56
57 self.new_client_with(client_config, MemDatabase::new().into(), None)
58 .await
59 }
60
61 pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
63 let client_config = self.configs[&PeerId::from(0)]
64 .consensus
65 .to_client_config(&self.server_init)
66 .unwrap();
67
68 self.new_client_with(
69 client_config,
70 RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
71 .await
72 .expect("Couldn't open DB")
73 .into(),
74 None,
75 )
76 .await
77 }
78
79 pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
81 let config = self.configs.get(&peer_id).expect("peer to have config");
82
83 DynGlobalApi::new_admin(
84 peer_id,
85 config.consensus.api_endpoints()[&peer_id].url.clone(),
86 &None,
87 )
88 .await
89 }
90
91 pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
93 let client_config = self.configs[&PeerId::from(0)]
94 .consensus
95 .to_client_config(&self.server_init)
96 .unwrap();
97
98 let admin_creds = AdminCreds { peer_id, auth };
99
100 self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
101 .await
102 }
103
104 pub async fn new_client_with(
105 &self,
106 client_config: ClientConfig,
107 db: Database,
108 admin_creds: Option<AdminCreds>,
109 ) -> ClientHandleArc {
110 info!(target: LOG_TEST, "Setting new client with config");
111 let mut client_builder = Client::builder(db).await.expect("Failed to build client");
112 client_builder.with_module_inits(self.client_init.clone());
113 client_builder.with_primary_module_kind(self.primary_module_kind.clone());
114 if let Some(admin_creds) = admin_creds {
115 client_builder.set_admin_creds(admin_creds);
116 }
117 let client_secret = Client::load_or_generate_client_secret(client_builder.db_no_decoders())
118 .await
119 .unwrap();
120 client_builder
121 .join(
122 PlainRootSecretStrategy::to_root_secret(&client_secret),
123 client_config,
124 None,
125 )
126 .await
127 .map(Arc::new)
128 .expect("Failed to build client")
129 }
130
131 pub fn invite_code(&self) -> InviteCode {
133 self.configs[&PeerId::from(0)].get_invite_code(None)
134 }
135
136 pub fn id(&self) -> FederationId {
138 self.configs[&PeerId::from(0)]
139 .consensus
140 .to_client_config(&self.server_init)
141 .unwrap()
142 .global
143 .calculate_federation_id()
144 }
145
146 pub async fn connect_gateway(&self, gw: &Gateway) {
148 gw.handle_connect_federation(ConnectFedPayload {
149 invite_code: self.invite_code().to_string(),
150 use_tor: Some(false),
151 recover: Some(false),
152 })
153 .await
154 .expect("Failed to connect federation");
155 }
156
157 pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
159 (0..(self.num_peers - self.num_offline)).map(PeerId::from)
161 }
162
163 pub fn is_degraded(&self) -> bool {
165 self.num_offline > 0
166 }
167}
168
169#[derive(Clone, Debug)]
171pub struct FederationTestBuilder {
172 num_peers: u16,
173 num_offline: u16,
174 base_port: u16,
175 primary_module_kind: ModuleKind,
176 version_hash: String,
177 modules: ServerModuleConfigGenParamsRegistry,
178 server_init: ServerModuleInitRegistry,
179 client_init: ClientModuleInitRegistry,
180}
181
182impl FederationTestBuilder {
183 pub fn new(
184 params: ServerModuleConfigGenParamsRegistry,
185 server_init: ServerModuleInitRegistry,
186 client_init: ClientModuleInitRegistry,
187 primary_module_kind: ModuleKind,
188 num_offline: u16,
189 ) -> FederationTestBuilder {
190 let num_peers = 4;
191 Self {
192 num_peers,
193 num_offline,
194 base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
195 .expect("Failed to allocate a port range"),
196 primary_module_kind,
197 version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
198 modules: params,
199 server_init,
200 client_init,
201 }
202 }
203
204 pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
205 self.num_peers = num_peers;
206 self
207 }
208
209 pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
210 self.num_offline = num_offline;
211 self
212 }
213
214 pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
215 self.base_port = base_port;
216 self
217 }
218
219 pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
220 self.primary_module_kind = primary_module_kind;
221 self
222 }
223
224 pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
225 self.version_hash = version_hash;
226 self
227 }
228
229 pub async fn build(self) -> FederationTest {
230 let num_offline = self.num_offline;
231 assert!(
232 self.num_peers > 3 * self.num_offline,
233 "too many peers offline ({num_offline}) to reach consensus"
234 );
235 let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
236 let params =
237 local_config_gen_params(&peers, self.base_port).expect("Generates local config");
238
239 let configs = ServerConfig::trusted_dealer_gen(
240 self.modules,
241 ¶ms,
242 &self.server_init,
243 &self.version_hash,
244 );
245
246 let task_group = TaskGroup::new();
247 for (peer_id, cfg) in configs.clone() {
248 let peer_port = self.base_port + u16::from(peer_id) * 3;
249
250 let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
251 let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
252 let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
253
254 if u16::from(peer_id) >= self.num_peers - self.num_offline {
255 continue;
256 }
257
258 let instances = cfg.consensus.iter_module_instances();
259 let decoders = self.server_init.available_decoders(instances).unwrap();
260 let db = Database::new(MemDatabase::new(), decoders);
261 let module_init_registry = self.server_init.clone();
262 let subgroup = task_group.make_subgroup();
263 let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().into_path();
264 let code_version_str = env!("CARGO_PKG_VERSION");
265
266 let connector = TlsTcpConnector::new(
267 cfg.tls_config(),
268 p2p_bind,
269 cfg.local.p2p_endpoints.clone(),
270 cfg.local.identity,
271 )
272 .await
273 .into_dyn();
274
275 let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
276
277 let connections = ReconnectP2PConnections::new(
278 cfg.local.identity,
279 connector,
280 &task_group,
281 p2p_status_senders,
282 )
283 .into_dyn();
284
285 task_group.spawn("fedimintd", move |_| async move {
286 Box::pin(consensus::run(
287 connections,
288 p2p_status_receivers,
289 api_bind,
290 api_bind,
291 cfg.clone(),
292 db.clone(),
293 module_init_registry,
294 &subgroup,
295 fedimint_server::net::api::ApiSecrets::default(),
296 checkpoint_dir,
297 code_version_str.to_string(),
298 ui_bind,
299 None,
300 ))
301 .await
302 .expect("Could not initialise consensus");
303 });
304 }
305
306 for (peer_id, config) in configs.clone() {
307 if u16::from(peer_id) >= self.num_peers - self.num_offline {
308 continue;
309 }
310
311 let api = DynGlobalApi::new_admin(
314 peer_id,
315 config.consensus.api_endpoints()[&peer_id].url.clone(),
316 &None,
317 )
318 .await
319 .unwrap();
320
321 while let Err(e) = api
322 .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
323 .await
324 {
325 sleep_in_test(
326 format!("Waiting for api of peer {peer_id} to come online: {e}"),
327 Duration::from_millis(500),
328 )
329 .await;
330 }
331 }
332
333 FederationTest {
334 configs,
335 server_init: self.server_init,
336 client_init: self.client_init,
337 primary_module_kind: self.primary_module_kind,
338 _task: task_group,
339 num_peers: self.num_peers,
340 num_offline: self.num_offline,
341 }
342 }
343}