fedimint_testing/
federation.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc, RootSecret};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_core::PeerId;
11use fedimint_core::config::{ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry};
12use fedimint_core::core::ModuleKind;
13use fedimint_core::db::Database;
14use fedimint_core::db::mem_impl::MemDatabase;
15use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::{ApiAuth, ApiRequestErased};
18use fedimint_core::net::peers::IP2PConnections;
19use fedimint_core::rustls::install_crypto_provider;
20use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
21use fedimint_gateway_common::ConnectFedPayload;
22use fedimint_gateway_server::Gateway;
23use fedimint_logging::LOG_TEST;
24use fedimint_rocksdb::RocksDb;
25use fedimint_server::config::ServerConfig;
26use fedimint_server::core::ServerModuleInitRegistry;
27use fedimint_server::net::api::ApiSecrets;
28use fedimint_server::net::p2p::{
29    ReconnectP2PConnections, p2p_connection_type_channels, p2p_status_channels,
30};
31use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
32use fedimint_server::{ConnectionLimits, consensus};
33use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
34use fedimint_testing_core::config::local_config_gen_params;
35use tracing::info;
36
37/// Test fixture for a running fedimint federation
38#[derive(Clone)]
39pub struct FederationTest {
40    configs: BTreeMap<PeerId, ServerConfig>,
41    server_init: ServerModuleInitRegistry,
42    client_init: ClientModuleInitRegistry,
43    _task: TaskGroup,
44    num_peers: u16,
45    num_offline: u16,
46}
47
48impl FederationTest {
49    /// Create two clients, useful for send/receive tests
50    pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
51        (self.new_client().await, self.new_client().await)
52    }
53
54    /// Create a client connected to this fed
55    pub async fn new_client(&self) -> ClientHandleArc {
56        let client_config = self.configs[&PeerId::from(0)]
57            .consensus
58            .to_client_config(&self.server_init)
59            .unwrap();
60
61        self.new_client_with(client_config, MemDatabase::new().into(), None)
62            .await
63    }
64
65    /// Create a client connected to this fed but using RocksDB instead of MemDB
66    pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
67        let client_config = self.configs[&PeerId::from(0)]
68            .consensus
69            .to_client_config(&self.server_init)
70            .unwrap();
71
72        self.new_client_with(
73            client_config,
74            RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
75                .await
76                .expect("Couldn't open DB")
77                .into(),
78            None,
79        )
80        .await
81    }
82
83    /// Create a new admin api for the given PeerId
84    pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
85        let config = self.configs.get(&peer_id).expect("peer to have config");
86
87        DynGlobalApi::new_admin(
88            peer_id,
89            config.consensus.api_endpoints()[&peer_id].url.clone(),
90            &None,
91            // No need to enable DHT during testing
92            false,
93            // No need to enable next stack during testing
94            false,
95        )
96        .await
97    }
98
99    /// Create a new admin client connected to this fed
100    pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
101        let client_config = self.configs[&PeerId::from(0)]
102            .consensus
103            .to_client_config(&self.server_init)
104            .unwrap();
105
106        let admin_creds = AdminCreds { peer_id, auth };
107
108        self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
109            .await
110    }
111
112    pub async fn new_client_with(
113        &self,
114        client_config: ClientConfig,
115        db: Database,
116        admin_creds: Option<AdminCreds>,
117    ) -> ClientHandleArc {
118        info!(target: LOG_TEST, "Setting new client with config");
119        let mut client_builder = Client::builder().await.expect("Failed to build client");
120        client_builder.with_module_inits(self.client_init.clone());
121        if let Some(admin_creds) = admin_creds {
122            client_builder.set_admin_creds(admin_creds);
123        }
124        let client_secret = Client::load_or_generate_client_secret(&db).await.unwrap();
125        client_builder
126            .preview_with_existing_config(client_config, None, None)
127            .await
128            .expect("Preview failed")
129            .join(
130                db,
131                RootSecret::StandardDoubleDerive(PlainRootSecretStrategy::to_root_secret(
132                    &client_secret,
133                )),
134            )
135            .await
136            .map(Arc::new)
137            .expect("Failed to build client")
138    }
139
140    /// Return first invite code for gateways
141    pub fn invite_code(&self) -> InviteCode {
142        self.configs[&PeerId::from(0)].get_invite_code(None)
143    }
144
145    ///  Return the federation id
146    pub fn id(&self) -> FederationId {
147        self.configs[&PeerId::from(0)]
148            .consensus
149            .to_client_config(&self.server_init)
150            .unwrap()
151            .global
152            .calculate_federation_id()
153    }
154
155    /// Connects a gateway to this `FederationTest`
156    pub async fn connect_gateway(&self, gw: &Gateway) {
157        gw.handle_connect_federation(ConnectFedPayload {
158            invite_code: self.invite_code().to_string(),
159            use_tor: Some(false),
160            recover: Some(false),
161        })
162        .await
163        .expect("Failed to connect federation");
164    }
165
166    /// Return all online PeerIds
167    pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
168        // we can assume this ordering since peers are started in ascending order
169        (0..(self.num_peers - self.num_offline)).map(PeerId::from)
170    }
171
172    /// Returns true if the federation is running in a degraded state
173    pub fn is_degraded(&self) -> bool {
174        self.num_offline > 0
175    }
176}
177
178/// Builder struct for creating a `FederationTest`.
179#[derive(Clone, Debug)]
180pub struct FederationTestBuilder {
181    num_peers: u16,
182    num_offline: u16,
183    base_port: u16,
184    primary_module_kind: ModuleKind,
185    version_hash: String,
186    modules: ServerModuleConfigGenParamsRegistry,
187    server_init: ServerModuleInitRegistry,
188    client_init: ClientModuleInitRegistry,
189    bitcoin_rpc_connection: DynServerBitcoinRpc,
190    enable_mint_fees: bool,
191}
192
193impl FederationTestBuilder {
194    pub fn new(
195        params: ServerModuleConfigGenParamsRegistry,
196        server_init: ServerModuleInitRegistry,
197        client_init: ClientModuleInitRegistry,
198        primary_module_kind: ModuleKind,
199        num_offline: u16,
200        bitcoin_rpc_connection: DynServerBitcoinRpc,
201    ) -> FederationTestBuilder {
202        let num_peers = 4;
203        Self {
204            num_peers,
205            num_offline,
206            base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
207                .expect("Failed to allocate a port range"),
208            primary_module_kind,
209            version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
210            modules: params,
211            server_init,
212            client_init,
213            bitcoin_rpc_connection,
214            enable_mint_fees: true,
215        }
216    }
217
218    pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
219        self.num_peers = num_peers;
220        self
221    }
222
223    pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
224        self.num_offline = num_offline;
225        self
226    }
227
228    pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
229        self.base_port = base_port;
230        self
231    }
232
233    pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
234        self.primary_module_kind = primary_module_kind;
235        self
236    }
237
238    pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
239        self.version_hash = version_hash;
240        self
241    }
242
243    pub fn disable_mint_fees(mut self) -> FederationTestBuilder {
244        self.enable_mint_fees = false;
245        self
246    }
247
248    #[allow(clippy::too_many_lines)]
249    pub async fn build(self) -> FederationTest {
250        install_crypto_provider().await;
251        let num_offline = self.num_offline;
252        assert!(
253            self.num_peers > 3 * self.num_offline,
254            "too many peers offline ({num_offline}) to reach consensus"
255        );
256        let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
257        let params = local_config_gen_params(&peers, self.base_port, self.enable_mint_fees)
258            .expect("Generates local config");
259
260        let configs = ServerConfig::trusted_dealer_gen(
261            self.modules,
262            &params,
263            &self.server_init,
264            &self.version_hash,
265        );
266
267        let task_group = TaskGroup::new();
268        for (peer_id, cfg) in configs.clone() {
269            let peer_port = self.base_port + u16::from(peer_id) * 3;
270
271            let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
272            let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
273            let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
274
275            if u16::from(peer_id) >= self.num_peers - self.num_offline {
276                continue;
277            }
278
279            let instances = cfg.consensus.iter_module_instances();
280            let decoders = self.server_init.available_decoders(instances).unwrap();
281            let db = Database::new(MemDatabase::new(), decoders);
282            let module_init_registry = self.server_init.clone();
283            let subgroup = task_group.make_subgroup();
284            let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().keep();
285            let code_version_str = env!("CARGO_PKG_VERSION");
286
287            let connector = TlsTcpConnector::new(
288                cfg.tls_config(),
289                p2p_bind,
290                cfg.local.p2p_endpoints.clone(),
291                cfg.local.identity,
292            )
293            .await
294            .into_dyn();
295
296            let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
297            let (p2p_connection_type_senders, p2p_connection_type_receivers) =
298                p2p_connection_type_channels(connector.peers());
299
300            let connections = ReconnectP2PConnections::new(
301                cfg.local.identity,
302                connector,
303                &task_group,
304                p2p_status_senders,
305                p2p_connection_type_senders,
306            )
307            .into_dyn();
308
309            let bitcoin_rpc_connection = self.bitcoin_rpc_connection.clone();
310
311            task_group.spawn("fedimintd", move |_| async move {
312                Box::pin(consensus::run(
313                    connections,
314                    p2p_status_receivers,
315                    p2p_connection_type_receivers,
316                    api_bind,
317                    None,
318                    vec![],
319                    cfg.clone(),
320                    db.clone(),
321                    module_init_registry,
322                    &subgroup,
323                    ApiSecrets::default(),
324                    checkpoint_dir,
325                    code_version_str.to_string(),
326                    bitcoin_rpc_connection,
327                    ui_bind,
328                    Box::new(|_| axum::Router::new()),
329                    1,
330                    ConnectionLimits {
331                        max_connections: 1000,
332                        max_requests_per_connection: 100,
333                    },
334                ))
335                .await
336                .expect("Could not initialise consensus");
337            });
338        }
339
340        for (peer_id, config) in configs.clone() {
341            if u16::from(peer_id) >= self.num_peers - self.num_offline {
342                continue;
343            }
344
345            // FIXME: (@leonardo) Currently there is no support for Tor while testing,
346            // defaulting to Tcp variant.
347            let api = DynGlobalApi::new_admin(
348                peer_id,
349                config.consensus.api_endpoints()[&peer_id].url.clone(),
350                &None,
351                // No need for dht when testing
352                false,
353                false,
354            )
355            .await
356            .unwrap();
357
358            while let Err(e) = api
359                .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
360                .await
361            {
362                sleep_in_test(
363                    format!("Waiting for api of peer {peer_id} to come online: {e}"),
364                    Duration::from_millis(500),
365                )
366                .await;
367            }
368        }
369
370        FederationTest {
371            configs,
372            server_init: self.server_init,
373            client_init: self.client_init,
374            _task: task_group,
375            num_peers: self.num_peers,
376            num_offline: self.num_offline,
377        }
378    }
379}