1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_core::PeerId;
11use fedimint_core::config::{ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry};
12use fedimint_core::core::ModuleKind;
13use fedimint_core::db::Database;
14use fedimint_core::db::mem_impl::MemDatabase;
15use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::{ApiAuth, ApiRequestErased};
18use fedimint_core::net::peers::IP2PConnections;
19use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
20use fedimint_gateway_common::ConnectFedPayload;
21use fedimint_gateway_server::Gateway;
22use fedimint_logging::LOG_TEST;
23use fedimint_rocksdb::RocksDb;
24use fedimint_server::config::ServerConfig;
25use fedimint_server::consensus;
26use fedimint_server::core::ServerModuleInitRegistry;
27use fedimint_server::net::api::ApiSecrets;
28use fedimint_server::net::p2p::{ReconnectP2PConnections, p2p_status_channels};
29use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
30use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
31use fedimint_testing_core::config::local_config_gen_params;
32use tracing::info;
33
34#[derive(Clone)]
36pub struct FederationTest {
37 configs: BTreeMap<PeerId, ServerConfig>,
38 server_init: ServerModuleInitRegistry,
39 client_init: ClientModuleInitRegistry,
40 primary_module_kind: ModuleKind,
41 _task: TaskGroup,
42 num_peers: u16,
43 num_offline: u16,
44}
45
46impl FederationTest {
47 pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
49 (self.new_client().await, self.new_client().await)
50 }
51
52 pub async fn new_client(&self) -> ClientHandleArc {
54 let client_config = self.configs[&PeerId::from(0)]
55 .consensus
56 .to_client_config(&self.server_init)
57 .unwrap();
58
59 self.new_client_with(client_config, MemDatabase::new().into(), None)
60 .await
61 }
62
63 pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
65 let client_config = self.configs[&PeerId::from(0)]
66 .consensus
67 .to_client_config(&self.server_init)
68 .unwrap();
69
70 self.new_client_with(
71 client_config,
72 RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
73 .await
74 .expect("Couldn't open DB")
75 .into(),
76 None,
77 )
78 .await
79 }
80
81 pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
83 let config = self.configs.get(&peer_id).expect("peer to have config");
84
85 DynGlobalApi::new_admin(
86 peer_id,
87 config.consensus.api_endpoints()[&peer_id].url.clone(),
88 &None,
89 )
90 .await
91 }
92
93 pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
95 let client_config = self.configs[&PeerId::from(0)]
96 .consensus
97 .to_client_config(&self.server_init)
98 .unwrap();
99
100 let admin_creds = AdminCreds { peer_id, auth };
101
102 self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
103 .await
104 }
105
106 pub async fn new_client_with(
107 &self,
108 client_config: ClientConfig,
109 db: Database,
110 admin_creds: Option<AdminCreds>,
111 ) -> ClientHandleArc {
112 info!(target: LOG_TEST, "Setting new client with config");
113 let mut client_builder = Client::builder(db).await.expect("Failed to build client");
114 client_builder.with_module_inits(self.client_init.clone());
115 client_builder.with_primary_module_kind(self.primary_module_kind.clone());
116 if let Some(admin_creds) = admin_creds {
117 client_builder.set_admin_creds(admin_creds);
118 }
119 let client_secret = Client::load_or_generate_client_secret(client_builder.db_no_decoders())
120 .await
121 .unwrap();
122 client_builder
123 .join(
124 PlainRootSecretStrategy::to_root_secret(&client_secret),
125 client_config,
126 None,
127 )
128 .await
129 .map(Arc::new)
130 .expect("Failed to build client")
131 }
132
133 pub fn invite_code(&self) -> InviteCode {
135 self.configs[&PeerId::from(0)].get_invite_code(None)
136 }
137
138 pub fn id(&self) -> FederationId {
140 self.configs[&PeerId::from(0)]
141 .consensus
142 .to_client_config(&self.server_init)
143 .unwrap()
144 .global
145 .calculate_federation_id()
146 }
147
148 pub async fn connect_gateway(&self, gw: &Gateway) {
150 gw.handle_connect_federation(ConnectFedPayload {
151 invite_code: self.invite_code().to_string(),
152 use_tor: Some(false),
153 recover: Some(false),
154 })
155 .await
156 .expect("Failed to connect federation");
157 }
158
159 pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
161 (0..(self.num_peers - self.num_offline)).map(PeerId::from)
163 }
164
165 pub fn is_degraded(&self) -> bool {
167 self.num_offline > 0
168 }
169}
170
171#[derive(Clone, Debug)]
173pub struct FederationTestBuilder {
174 num_peers: u16,
175 num_offline: u16,
176 base_port: u16,
177 primary_module_kind: ModuleKind,
178 version_hash: String,
179 modules: ServerModuleConfigGenParamsRegistry,
180 server_init: ServerModuleInitRegistry,
181 client_init: ClientModuleInitRegistry,
182 bitcoin_rpc_connection: DynServerBitcoinRpc,
183}
184
185impl FederationTestBuilder {
186 pub fn new(
187 params: ServerModuleConfigGenParamsRegistry,
188 server_init: ServerModuleInitRegistry,
189 client_init: ClientModuleInitRegistry,
190 primary_module_kind: ModuleKind,
191 num_offline: u16,
192 bitcoin_rpc_connection: DynServerBitcoinRpc,
193 ) -> FederationTestBuilder {
194 let num_peers = 4;
195 Self {
196 num_peers,
197 num_offline,
198 base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
199 .expect("Failed to allocate a port range"),
200 primary_module_kind,
201 version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
202 modules: params,
203 server_init,
204 client_init,
205 bitcoin_rpc_connection,
206 }
207 }
208
209 pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
210 self.num_peers = num_peers;
211 self
212 }
213
214 pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
215 self.num_offline = num_offline;
216 self
217 }
218
219 pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
220 self.base_port = base_port;
221 self
222 }
223
224 pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
225 self.primary_module_kind = primary_module_kind;
226 self
227 }
228
229 pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
230 self.version_hash = version_hash;
231 self
232 }
233
234 #[allow(clippy::too_many_lines)]
235 pub async fn build(self) -> FederationTest {
236 let num_offline = self.num_offline;
237 assert!(
238 self.num_peers > 3 * self.num_offline,
239 "too many peers offline ({num_offline}) to reach consensus"
240 );
241 let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
242 let params =
243 local_config_gen_params(&peers, self.base_port).expect("Generates local config");
244
245 let configs = ServerConfig::trusted_dealer_gen(
246 self.modules,
247 ¶ms,
248 &self.server_init,
249 &self.version_hash,
250 );
251
252 let task_group = TaskGroup::new();
253 for (peer_id, cfg) in configs.clone() {
254 let peer_port = self.base_port + u16::from(peer_id) * 3;
255
256 let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
257 let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
258 let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
259
260 if u16::from(peer_id) >= self.num_peers - self.num_offline {
261 continue;
262 }
263
264 let instances = cfg.consensus.iter_module_instances();
265 let decoders = self.server_init.available_decoders(instances).unwrap();
266 let db = Database::new(MemDatabase::new(), decoders);
267 let module_init_registry = self.server_init.clone();
268 let subgroup = task_group.make_subgroup();
269 let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().into_path();
270 let code_version_str = env!("CARGO_PKG_VERSION");
271
272 let connector = TlsTcpConnector::new(
273 cfg.tls_config(),
274 p2p_bind,
275 cfg.local.p2p_endpoints.clone(),
276 cfg.local.identity,
277 )
278 .await
279 .into_dyn();
280
281 let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
282
283 let connections = ReconnectP2PConnections::new(
284 cfg.local.identity,
285 connector,
286 &task_group,
287 p2p_status_senders,
288 )
289 .into_dyn();
290
291 let bitcoin_rpc_connection = self.bitcoin_rpc_connection.clone();
292
293 task_group.spawn("fedimintd", move |_| async move {
294 Box::pin(consensus::run(
295 connections,
296 p2p_status_receivers,
297 api_bind,
298 cfg.clone(),
299 db.clone(),
300 module_init_registry,
301 &subgroup,
302 ApiSecrets::default(),
303 checkpoint_dir,
304 code_version_str.to_string(),
305 bitcoin_rpc_connection,
306 ui_bind,
307 Box::new(|_| axum::Router::new()),
308 1,
309 ))
310 .await
311 .expect("Could not initialise consensus");
312 });
313 }
314
315 for (peer_id, config) in configs.clone() {
316 if u16::from(peer_id) >= self.num_peers - self.num_offline {
317 continue;
318 }
319
320 let api = DynGlobalApi::new_admin(
323 peer_id,
324 config.consensus.api_endpoints()[&peer_id].url.clone(),
325 &None,
326 )
327 .await
328 .unwrap();
329
330 while let Err(e) = api
331 .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
332 .await
333 {
334 sleep_in_test(
335 format!("Waiting for api of peer {peer_id} to come online: {e}"),
336 Duration::from_millis(500),
337 )
338 .await;
339 }
340 }
341
342 FederationTest {
343 configs,
344 server_init: self.server_init,
345 client_init: self.client_init,
346 primary_module_kind: self.primary_module_kind,
347 _task: task_group,
348 num_peers: self.num_peers,
349 num_offline: self.num_offline,
350 }
351 }
352}