fedimint_testing/
federation.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc, RootSecret};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_connectors::ConnectorRegistry;
11use fedimint_core::PeerId;
12use fedimint_core::config::{ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry};
13use fedimint_core::core::ModuleKind;
14use fedimint_core::db::Database;
15use fedimint_core::db::mem_impl::MemDatabase;
16use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
17use fedimint_core::invite_code::InviteCode;
18use fedimint_core::module::{ApiAuth, ApiRequestErased};
19use fedimint_core::net::peers::IP2PConnections;
20use fedimint_core::rustls::install_crypto_provider;
21use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
22use fedimint_gateway_common::ConnectFedPayload;
23use fedimint_gateway_server::{Gateway, IAdminGateway};
24use fedimint_logging::LOG_TEST;
25use fedimint_rocksdb::RocksDb;
26use fedimint_server::config::ServerConfig;
27use fedimint_server::core::ServerModuleInitRegistry;
28use fedimint_server::net::api::ApiSecrets;
29use fedimint_server::net::p2p::{
30    ReconnectP2PConnections, p2p_connection_type_channels, p2p_status_channels,
31};
32use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
33use fedimint_server::{ConnectionLimits, consensus};
34use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
35use fedimint_testing_core::config::local_config_gen_params;
36use tracing::info;
37
38/// Test fixture for a running fedimint federation
39#[derive(Clone)]
40pub struct FederationTest {
41    configs: BTreeMap<PeerId, ServerConfig>,
42    server_init: ServerModuleInitRegistry,
43    client_init: ClientModuleInitRegistry,
44    _task: TaskGroup,
45    num_peers: u16,
46    num_offline: u16,
47    connectors: ConnectorRegistry,
48}
49
50impl FederationTest {
51    /// Create two clients, useful for send/receive tests
52    pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
53        (self.new_client().await, self.new_client().await)
54    }
55
56    /// Create a client connected to this fed
57    pub async fn new_client(&self) -> ClientHandleArc {
58        let client_config = self.configs[&PeerId::from(0)]
59            .consensus
60            .to_client_config(&self.server_init)
61            .unwrap();
62
63        self.new_client_with(client_config, MemDatabase::new().into(), None)
64            .await
65    }
66
67    /// Create a client connected to this fed but using RocksDB instead of MemDB
68    pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
69        let client_config = self.configs[&PeerId::from(0)]
70            .consensus
71            .to_client_config(&self.server_init)
72            .unwrap();
73
74        self.new_client_with(
75            client_config,
76            RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
77                .await
78                .expect("Couldn't open DB")
79                .into(),
80            None,
81        )
82        .await
83    }
84
85    /// Create a new admin api for the given PeerId
86    pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
87        let config = self.configs.get(&peer_id).expect("peer to have config");
88
89        DynGlobalApi::new_admin(
90            ConnectorRegistry::build_from_testing_env()?.bind().await?,
91            peer_id,
92            config.consensus.api_endpoints()[&peer_id].url.clone(),
93            None,
94        )
95    }
96
97    /// Create a new admin client connected to this fed
98    pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
99        let client_config = self.configs[&PeerId::from(0)]
100            .consensus
101            .to_client_config(&self.server_init)
102            .unwrap();
103
104        let admin_creds = AdminCreds { peer_id, auth };
105
106        self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
107            .await
108    }
109
110    pub async fn new_client_with(
111        &self,
112        client_config: ClientConfig,
113        db: Database,
114        admin_creds: Option<AdminCreds>,
115    ) -> ClientHandleArc {
116        info!(target: LOG_TEST, "Setting new client with config");
117        let mut client_builder = Client::builder().await.expect("Failed to build client");
118        client_builder.with_module_inits(self.client_init.clone());
119        if let Some(admin_creds) = admin_creds {
120            client_builder.set_admin_creds(admin_creds);
121        }
122        let client_secret = Client::load_or_generate_client_secret(&db).await.unwrap();
123        client_builder
124            .preview_with_existing_config(self.connectors.clone(), client_config, None)
125            .await
126            .expect("Preview failed")
127            .join(
128                db,
129                RootSecret::StandardDoubleDerive(PlainRootSecretStrategy::to_root_secret(
130                    &client_secret,
131                )),
132            )
133            .await
134            .map(Arc::new)
135            .expect("Failed to build client")
136    }
137
138    /// Return first invite code for gateways
139    pub fn invite_code(&self) -> InviteCode {
140        self.configs[&PeerId::from(0)].get_invite_code(None)
141    }
142
143    ///  Return the federation id
144    pub fn id(&self) -> FederationId {
145        self.configs[&PeerId::from(0)]
146            .consensus
147            .to_client_config(&self.server_init)
148            .unwrap()
149            .global
150            .calculate_federation_id()
151    }
152
153    /// Connects a gateway to this `FederationTest`
154    pub async fn connect_gateway(&self, gw: &Gateway) {
155        gw.handle_connect_federation(ConnectFedPayload {
156            invite_code: self.invite_code().to_string(),
157            use_tor: Some(false),
158            recover: Some(false),
159        })
160        .await
161        .expect("Failed to connect federation");
162    }
163
164    /// Return all online PeerIds
165    pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
166        // we can assume this ordering since peers are started in ascending order
167        (0..(self.num_peers - self.num_offline)).map(PeerId::from)
168    }
169
170    /// Returns true if the federation is running in a degraded state
171    pub fn is_degraded(&self) -> bool {
172        self.num_offline > 0
173    }
174}
175
176/// Builder struct for creating a `FederationTest`.
177#[derive(Clone, Debug)]
178pub struct FederationTestBuilder {
179    num_peers: u16,
180    num_offline: u16,
181    base_port: u16,
182    primary_module_kind: ModuleKind,
183    version_hash: String,
184    modules: ServerModuleConfigGenParamsRegistry,
185    server_init: ServerModuleInitRegistry,
186    client_init: ClientModuleInitRegistry,
187    bitcoin_rpc_connection: DynServerBitcoinRpc,
188    enable_mint_fees: bool,
189}
190
191impl FederationTestBuilder {
192    pub fn new(
193        params: ServerModuleConfigGenParamsRegistry,
194        server_init: ServerModuleInitRegistry,
195        client_init: ClientModuleInitRegistry,
196        primary_module_kind: ModuleKind,
197        num_offline: u16,
198        bitcoin_rpc_connection: DynServerBitcoinRpc,
199    ) -> FederationTestBuilder {
200        let num_peers = 4;
201        Self {
202            num_peers,
203            num_offline,
204            base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
205                .expect("Failed to allocate a port range"),
206            primary_module_kind,
207            version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
208            modules: params,
209            server_init,
210            client_init,
211            bitcoin_rpc_connection,
212            enable_mint_fees: true,
213        }
214    }
215
216    pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
217        self.num_peers = num_peers;
218        self
219    }
220
221    pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
222        self.num_offline = num_offline;
223        self
224    }
225
226    pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
227        self.base_port = base_port;
228        self
229    }
230
231    pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
232        self.primary_module_kind = primary_module_kind;
233        self
234    }
235
236    pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
237        self.version_hash = version_hash;
238        self
239    }
240
241    pub fn disable_mint_fees(mut self) -> FederationTestBuilder {
242        self.enable_mint_fees = false;
243        self
244    }
245
246    #[allow(clippy::too_many_lines)]
247    pub async fn build(self) -> FederationTest {
248        install_crypto_provider().await;
249        let num_offline = self.num_offline;
250        assert!(
251            self.num_peers > 3 * self.num_offline,
252            "too many peers offline ({num_offline}) to reach consensus"
253        );
254        let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
255        let params = local_config_gen_params(&peers, self.base_port, self.enable_mint_fees)
256            .expect("Generates local config");
257
258        let configs = ServerConfig::trusted_dealer_gen(
259            self.modules,
260            &params,
261            &self.server_init,
262            &self.version_hash,
263        );
264
265        let task_group = TaskGroup::new();
266        for (peer_id, cfg) in configs.clone() {
267            let peer_port = self.base_port + u16::from(peer_id) * 3;
268
269            let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
270            let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
271            let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
272
273            if u16::from(peer_id) >= self.num_peers - self.num_offline {
274                continue;
275            }
276
277            let instances = cfg.consensus.iter_module_instances();
278            let decoders = self.server_init.available_decoders(instances).unwrap();
279            let db = Database::new(MemDatabase::new(), decoders);
280            let module_init_registry = self.server_init.clone();
281            let subgroup = task_group.make_subgroup();
282            let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().keep();
283            let code_version_str = env!("CARGO_PKG_VERSION");
284
285            let connector = TlsTcpConnector::new(
286                cfg.tls_config(),
287                p2p_bind,
288                cfg.local.p2p_endpoints.clone(),
289                cfg.local.identity,
290            )
291            .await
292            .into_dyn();
293
294            let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
295            let (p2p_connection_type_senders, p2p_connection_type_receivers) =
296                p2p_connection_type_channels(connector.peers());
297
298            let connections = ReconnectP2PConnections::new(
299                cfg.local.identity,
300                connector,
301                &task_group,
302                p2p_status_senders,
303                p2p_connection_type_senders,
304            )
305            .into_dyn();
306
307            let bitcoin_rpc_connection = self.bitcoin_rpc_connection.clone();
308
309            task_group.spawn("fedimintd", move |_| async move {
310                Box::pin(consensus::run(
311                    ConnectorRegistry::build_from_testing_env()
312                        .unwrap()
313                        .bind()
314                        .await
315                        .unwrap(),
316                    connections,
317                    p2p_status_receivers,
318                    p2p_connection_type_receivers,
319                    api_bind,
320                    None,
321                    vec![],
322                    cfg.clone(),
323                    db.clone(),
324                    module_init_registry,
325                    &subgroup,
326                    ApiSecrets::default(),
327                    checkpoint_dir,
328                    code_version_str.to_string(),
329                    bitcoin_rpc_connection,
330                    ui_bind,
331                    Box::new(|_| axum::Router::new()),
332                    1,
333                    ConnectionLimits {
334                        max_connections: 1000,
335                        max_requests_per_connection: 100,
336                    },
337                ))
338                .await
339                .expect("Could not initialise consensus");
340            });
341        }
342
343        for (peer_id, config) in configs.clone() {
344            if u16::from(peer_id) >= self.num_peers - self.num_offline {
345                continue;
346            }
347
348            let connectors = ConnectorRegistry::build_from_testing_env()
349                .unwrap()
350                .bind()
351                .await
352                .unwrap();
353            let api = DynGlobalApi::new_admin(
354                connectors,
355                peer_id,
356                config.consensus.api_endpoints()[&peer_id].url.clone(),
357                None,
358            )
359            .unwrap();
360
361            while let Err(e) = api
362                .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
363                .await
364            {
365                sleep_in_test(
366                    format!("Waiting for api of peer {peer_id} to come online: {e}"),
367                    Duration::from_millis(500),
368                )
369                .await;
370            }
371        }
372
373        FederationTest {
374            configs,
375            server_init: self.server_init,
376            client_init: self.client_init,
377            _task: task_group,
378            num_peers: self.num_peers,
379            num_offline: self.num_offline,
380            connectors: ConnectorRegistry::build_from_testing_env()
381                .expect("Failed to initialize endpoints for testing (env)")
382                .bind()
383                .await
384                .expect("Failed to initialize endpoints for testing"),
385        }
386    }
387}