fedimint_testing/
federation.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc, RootSecret};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_core::PeerId;
11use fedimint_core::config::{ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry};
12use fedimint_core::core::ModuleKind;
13use fedimint_core::db::Database;
14use fedimint_core::db::mem_impl::MemDatabase;
15use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::{ApiAuth, ApiRequestErased};
18use fedimint_core::net::peers::IP2PConnections;
19use fedimint_core::rustls::install_crypto_provider;
20use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
21use fedimint_gateway_common::ConnectFedPayload;
22use fedimint_gateway_server::Gateway;
23use fedimint_logging::LOG_TEST;
24use fedimint_rocksdb::RocksDb;
25use fedimint_server::config::ServerConfig;
26use fedimint_server::core::ServerModuleInitRegistry;
27use fedimint_server::net::api::ApiSecrets;
28use fedimint_server::net::p2p::{
29    ReconnectP2PConnections, p2p_connection_type_channels, p2p_status_channels,
30};
31use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
32use fedimint_server::{ConnectionLimits, consensus};
33use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
34use fedimint_testing_core::config::local_config_gen_params;
35use tracing::info;
36
37/// Test fixture for a running fedimint federation
38#[derive(Clone)]
39pub struct FederationTest {
40    configs: BTreeMap<PeerId, ServerConfig>,
41    server_init: ServerModuleInitRegistry,
42    client_init: ClientModuleInitRegistry,
43    primary_module_kind: ModuleKind,
44    _task: TaskGroup,
45    num_peers: u16,
46    num_offline: u16,
47}
48
49impl FederationTest {
50    /// Create two clients, useful for send/receive tests
51    pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
52        (self.new_client().await, self.new_client().await)
53    }
54
55    /// Create a client connected to this fed
56    pub async fn new_client(&self) -> ClientHandleArc {
57        let client_config = self.configs[&PeerId::from(0)]
58            .consensus
59            .to_client_config(&self.server_init)
60            .unwrap();
61
62        self.new_client_with(client_config, MemDatabase::new().into(), None)
63            .await
64    }
65
66    /// Create a client connected to this fed but using RocksDB instead of MemDB
67    pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
68        let client_config = self.configs[&PeerId::from(0)]
69            .consensus
70            .to_client_config(&self.server_init)
71            .unwrap();
72
73        self.new_client_with(
74            client_config,
75            RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
76                .await
77                .expect("Couldn't open DB")
78                .into(),
79            None,
80        )
81        .await
82    }
83
84    /// Create a new admin api for the given PeerId
85    pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
86        let config = self.configs.get(&peer_id).expect("peer to have config");
87
88        DynGlobalApi::new_admin(
89            peer_id,
90            config.consensus.api_endpoints()[&peer_id].url.clone(),
91            &None,
92            // No need to enable DHT during testing
93            false,
94            // No need to enable next stack during testing
95            false,
96        )
97        .await
98    }
99
100    /// Create a new admin client connected to this fed
101    pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
102        let client_config = self.configs[&PeerId::from(0)]
103            .consensus
104            .to_client_config(&self.server_init)
105            .unwrap();
106
107        let admin_creds = AdminCreds { peer_id, auth };
108
109        self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
110            .await
111    }
112
113    pub async fn new_client_with(
114        &self,
115        client_config: ClientConfig,
116        db: Database,
117        admin_creds: Option<AdminCreds>,
118    ) -> ClientHandleArc {
119        info!(target: LOG_TEST, "Setting new client with config");
120        let mut client_builder = Client::builder().await.expect("Failed to build client");
121        client_builder.with_module_inits(self.client_init.clone());
122        client_builder.with_primary_module_kind(self.primary_module_kind.clone());
123        if let Some(admin_creds) = admin_creds {
124            client_builder.set_admin_creds(admin_creds);
125        }
126        let client_secret = Client::load_or_generate_client_secret(&db).await.unwrap();
127        client_builder
128            .preview_with_existing_config(client_config, None)
129            .await
130            .expect("Preview failed")
131            .join(
132                db,
133                RootSecret::StandardDoubleDerive(PlainRootSecretStrategy::to_root_secret(
134                    &client_secret,
135                )),
136            )
137            .await
138            .map(Arc::new)
139            .expect("Failed to build client")
140    }
141
142    /// Return first invite code for gateways
143    pub fn invite_code(&self) -> InviteCode {
144        self.configs[&PeerId::from(0)].get_invite_code(None)
145    }
146
147    ///  Return the federation id
148    pub fn id(&self) -> FederationId {
149        self.configs[&PeerId::from(0)]
150            .consensus
151            .to_client_config(&self.server_init)
152            .unwrap()
153            .global
154            .calculate_federation_id()
155    }
156
157    /// Connects a gateway to this `FederationTest`
158    pub async fn connect_gateway(&self, gw: &Gateway) {
159        gw.handle_connect_federation(ConnectFedPayload {
160            invite_code: self.invite_code().to_string(),
161            use_tor: Some(false),
162            recover: Some(false),
163        })
164        .await
165        .expect("Failed to connect federation");
166    }
167
168    /// Return all online PeerIds
169    pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
170        // we can assume this ordering since peers are started in ascending order
171        (0..(self.num_peers - self.num_offline)).map(PeerId::from)
172    }
173
174    /// Returns true if the federation is running in a degraded state
175    pub fn is_degraded(&self) -> bool {
176        self.num_offline > 0
177    }
178}
179
180/// Builder struct for creating a `FederationTest`.
181#[derive(Clone, Debug)]
182pub struct FederationTestBuilder {
183    num_peers: u16,
184    num_offline: u16,
185    base_port: u16,
186    primary_module_kind: ModuleKind,
187    version_hash: String,
188    modules: ServerModuleConfigGenParamsRegistry,
189    server_init: ServerModuleInitRegistry,
190    client_init: ClientModuleInitRegistry,
191    bitcoin_rpc_connection: DynServerBitcoinRpc,
192}
193
194impl FederationTestBuilder {
195    pub fn new(
196        params: ServerModuleConfigGenParamsRegistry,
197        server_init: ServerModuleInitRegistry,
198        client_init: ClientModuleInitRegistry,
199        primary_module_kind: ModuleKind,
200        num_offline: u16,
201        bitcoin_rpc_connection: DynServerBitcoinRpc,
202    ) -> FederationTestBuilder {
203        let num_peers = 4;
204        Self {
205            num_peers,
206            num_offline,
207            base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
208                .expect("Failed to allocate a port range"),
209            primary_module_kind,
210            version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
211            modules: params,
212            server_init,
213            client_init,
214            bitcoin_rpc_connection,
215        }
216    }
217
218    pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
219        self.num_peers = num_peers;
220        self
221    }
222
223    pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
224        self.num_offline = num_offline;
225        self
226    }
227
228    pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
229        self.base_port = base_port;
230        self
231    }
232
233    pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
234        self.primary_module_kind = primary_module_kind;
235        self
236    }
237
238    pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
239        self.version_hash = version_hash;
240        self
241    }
242
243    #[allow(clippy::too_many_lines)]
244    pub async fn build(self) -> FederationTest {
245        install_crypto_provider().await;
246        let num_offline = self.num_offline;
247        assert!(
248            self.num_peers > 3 * self.num_offline,
249            "too many peers offline ({num_offline}) to reach consensus"
250        );
251        let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
252        let params =
253            local_config_gen_params(&peers, self.base_port).expect("Generates local config");
254
255        let configs = ServerConfig::trusted_dealer_gen(
256            self.modules,
257            &params,
258            &self.server_init,
259            &self.version_hash,
260        );
261
262        let task_group = TaskGroup::new();
263        for (peer_id, cfg) in configs.clone() {
264            let peer_port = self.base_port + u16::from(peer_id) * 3;
265
266            let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
267            let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
268            let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
269
270            if u16::from(peer_id) >= self.num_peers - self.num_offline {
271                continue;
272            }
273
274            let instances = cfg.consensus.iter_module_instances();
275            let decoders = self.server_init.available_decoders(instances).unwrap();
276            let db = Database::new(MemDatabase::new(), decoders);
277            let module_init_registry = self.server_init.clone();
278            let subgroup = task_group.make_subgroup();
279            let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().keep();
280            let code_version_str = env!("CARGO_PKG_VERSION");
281
282            let connector = TlsTcpConnector::new(
283                cfg.tls_config(),
284                p2p_bind,
285                cfg.local.p2p_endpoints.clone(),
286                cfg.local.identity,
287            )
288            .await
289            .into_dyn();
290
291            let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
292            let (p2p_connection_type_senders, p2p_connection_type_receivers) =
293                p2p_connection_type_channels(connector.peers());
294
295            let connections = ReconnectP2PConnections::new(
296                cfg.local.identity,
297                connector,
298                &task_group,
299                p2p_status_senders,
300                p2p_connection_type_senders,
301            )
302            .into_dyn();
303
304            let bitcoin_rpc_connection = self.bitcoin_rpc_connection.clone();
305
306            task_group.spawn("fedimintd", move |_| async move {
307                Box::pin(consensus::run(
308                    connections,
309                    p2p_status_receivers,
310                    p2p_connection_type_receivers,
311                    api_bind,
312                    None,
313                    vec![],
314                    cfg.clone(),
315                    db.clone(),
316                    module_init_registry,
317                    &subgroup,
318                    ApiSecrets::default(),
319                    checkpoint_dir,
320                    code_version_str.to_string(),
321                    bitcoin_rpc_connection,
322                    ui_bind,
323                    Box::new(|_| axum::Router::new()),
324                    1,
325                    ConnectionLimits {
326                        max_connections: 1000,
327                        max_requests_per_connection: 100,
328                    },
329                ))
330                .await
331                .expect("Could not initialise consensus");
332            });
333        }
334
335        for (peer_id, config) in configs.clone() {
336            if u16::from(peer_id) >= self.num_peers - self.num_offline {
337                continue;
338            }
339
340            // FIXME: (@leonardo) Currently there is no support for Tor while testing,
341            // defaulting to Tcp variant.
342            let api = DynGlobalApi::new_admin(
343                peer_id,
344                config.consensus.api_endpoints()[&peer_id].url.clone(),
345                &None,
346                // No need for dht when testing
347                false,
348                false,
349            )
350            .await
351            .unwrap();
352
353            while let Err(e) = api
354                .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
355                .await
356            {
357                sleep_in_test(
358                    format!("Waiting for api of peer {peer_id} to come online: {e}"),
359                    Duration::from_millis(500),
360                )
361                .await;
362            }
363        }
364
365        FederationTest {
366            configs,
367            server_init: self.server_init,
368            client_init: self.client_init,
369            primary_module_kind: self.primary_module_kind,
370            _task: task_group,
371            num_peers: self.num_peers,
372            num_offline: self.num_offline,
373        }
374    }
375}