fedimint_testing/
federation.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc, RootSecret};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_connectors::ConnectorRegistry;
11use fedimint_core::PeerId;
12use fedimint_core::config::{ClientConfig, FederationId};
13use fedimint_core::core::ModuleKind;
14use fedimint_core::db::Database;
15use fedimint_core::db::mem_impl::MemDatabase;
16use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
17use fedimint_core::invite_code::InviteCode;
18use fedimint_core::module::{ApiAuth, ApiRequestErased};
19use fedimint_core::net::peers::IP2PConnections;
20use fedimint_core::rustls::install_crypto_provider;
21use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
22use fedimint_gateway_common::ConnectFedPayload;
23use fedimint_gateway_server::{Gateway, IAdminGateway};
24use fedimint_logging::LOG_TEST;
25use fedimint_rocksdb::RocksDb;
26use fedimint_server::config::ServerConfig;
27use fedimint_server::core::ServerModuleInitRegistry;
28use fedimint_server::net::api::ApiSecrets;
29use fedimint_server::net::p2p::{ReconnectP2PConnections, p2p_status_channels};
30use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
31use fedimint_server::{ConnectionLimits, consensus};
32use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
33use fedimint_testing_core::config::local_config_gen_params;
34use tracing::info;
35
36/// Test fixture for a running fedimint federation
37#[derive(Clone)]
38pub struct FederationTest {
39    configs: BTreeMap<PeerId, ServerConfig>,
40    server_init: ServerModuleInitRegistry,
41    client_init: ClientModuleInitRegistry,
42    _task: TaskGroup,
43    num_peers: u16,
44    num_offline: u16,
45    connectors: ConnectorRegistry,
46}
47
48impl FederationTest {
49    /// Create two clients, useful for send/receive tests
50    pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
51        (self.new_client().await, self.new_client().await)
52    }
53
54    /// Create a client connected to this fed
55    pub async fn new_client(&self) -> ClientHandleArc {
56        let client_config = self.configs[&PeerId::from(0)]
57            .consensus
58            .to_client_config(&self.server_init)
59            .unwrap();
60
61        self.new_client_with(client_config, MemDatabase::new().into(), None)
62            .await
63    }
64
65    /// Create a client connected to this fed but using RocksDB instead of MemDB
66    pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
67        let client_config = self.configs[&PeerId::from(0)]
68            .consensus
69            .to_client_config(&self.server_init)
70            .unwrap();
71
72        self.new_client_with(
73            client_config,
74            RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
75                .await
76                .expect("Couldn't open DB")
77                .into(),
78            None,
79        )
80        .await
81    }
82
83    /// Create a new admin api for the given PeerId
84    pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
85        let config = self.configs.get(&peer_id).expect("peer to have config");
86
87        DynGlobalApi::new_admin(
88            ConnectorRegistry::build_from_testing_env()?.bind().await?,
89            peer_id,
90            config.consensus.api_endpoints()[&peer_id].url.clone(),
91            None,
92        )
93    }
94
95    /// Create a new admin client connected to this fed
96    pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
97        let client_config = self.configs[&PeerId::from(0)]
98            .consensus
99            .to_client_config(&self.server_init)
100            .unwrap();
101
102        let admin_creds = AdminCreds { peer_id, auth };
103
104        self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
105            .await
106    }
107
108    pub async fn new_client_with(
109        &self,
110        client_config: ClientConfig,
111        db: Database,
112        admin_creds: Option<AdminCreds>,
113    ) -> ClientHandleArc {
114        info!(target: LOG_TEST, "Setting new client with config");
115        let mut client_builder = Client::builder().await.expect("Failed to build client");
116        client_builder.with_module_inits(self.client_init.clone());
117        if let Some(admin_creds) = admin_creds {
118            client_builder.set_admin_creds(admin_creds);
119        }
120        let client_secret = Client::load_or_generate_client_secret(&db).await.unwrap();
121        client_builder
122            .preview_with_existing_config(self.connectors.clone(), client_config, None)
123            .await
124            .expect("Preview failed")
125            .join(
126                db,
127                RootSecret::StandardDoubleDerive(PlainRootSecretStrategy::to_root_secret(
128                    &client_secret,
129                )),
130            )
131            .await
132            .map(Arc::new)
133            .expect("Failed to build client")
134    }
135
136    /// Return first invite code for gateways
137    pub fn invite_code(&self) -> InviteCode {
138        self.configs[&PeerId::from(0)].get_invite_code(None)
139    }
140
141    ///  Return the federation id
142    pub fn id(&self) -> FederationId {
143        self.configs[&PeerId::from(0)]
144            .consensus
145            .to_client_config(&self.server_init)
146            .unwrap()
147            .global
148            .calculate_federation_id()
149    }
150
151    /// Connects a gateway to this `FederationTest`
152    pub async fn connect_gateway(&self, gw: &Gateway) {
153        gw.handle_connect_federation(ConnectFedPayload {
154            invite_code: self.invite_code().to_string(),
155            use_tor: Some(false),
156            recover: Some(false),
157        })
158        .await
159        .expect("Failed to connect federation");
160    }
161
162    /// Return all online PeerIds
163    pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
164        // we can assume this ordering since peers are started in ascending order
165        (0..(self.num_peers - self.num_offline)).map(PeerId::from)
166    }
167
168    /// Returns true if the federation is running in a degraded state
169    pub fn is_degraded(&self) -> bool {
170        self.num_offline > 0
171    }
172}
173
174/// Builder struct for creating a `FederationTest`.
175#[derive(Clone, Debug)]
176pub struct FederationTestBuilder {
177    num_peers: u16,
178    num_offline: u16,
179    base_port: u16,
180    primary_module_kind: ModuleKind,
181    version_hash: String,
182    server_init: ServerModuleInitRegistry,
183    client_init: ClientModuleInitRegistry,
184    bitcoin_rpc_connection: DynServerBitcoinRpc,
185    enable_mint_fees: bool,
186}
187
188impl FederationTestBuilder {
189    pub fn new(
190        server_init: ServerModuleInitRegistry,
191        client_init: ClientModuleInitRegistry,
192        primary_module_kind: ModuleKind,
193        num_offline: u16,
194        bitcoin_rpc_connection: DynServerBitcoinRpc,
195    ) -> FederationTestBuilder {
196        let num_peers = 4;
197        Self {
198            num_peers,
199            num_offline,
200            base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
201                .expect("Failed to allocate a port range"),
202            primary_module_kind,
203            version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
204            server_init,
205            client_init,
206            bitcoin_rpc_connection,
207            enable_mint_fees: true,
208        }
209    }
210
211    pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
212        self.num_peers = num_peers;
213        self
214    }
215
216    pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
217        self.num_offline = num_offline;
218        self
219    }
220
221    pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
222        self.base_port = base_port;
223        self
224    }
225
226    pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
227        self.primary_module_kind = primary_module_kind;
228        self
229    }
230
231    pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
232        self.version_hash = version_hash;
233        self
234    }
235
236    pub fn disable_mint_fees(mut self) -> FederationTestBuilder {
237        self.enable_mint_fees = false;
238        self
239    }
240
241    #[allow(clippy::too_many_lines)]
242    pub async fn build(self) -> FederationTest {
243        install_crypto_provider().await;
244        let num_offline = self.num_offline;
245        assert!(
246            self.num_peers > 3 * self.num_offline,
247            "too many peers offline ({num_offline}) to reach consensus"
248        );
249        let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
250        let params = local_config_gen_params(&peers, self.base_port, self.enable_mint_fees)
251            .expect("Generates local config");
252
253        let configs =
254            ServerConfig::trusted_dealer_gen(&params, &self.server_init, &self.version_hash);
255
256        let task_group = TaskGroup::new();
257        for (peer_id, cfg) in configs.clone() {
258            let peer_port = self.base_port + u16::from(peer_id) * 3;
259
260            let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
261            let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
262            let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
263
264            if u16::from(peer_id) >= self.num_peers - self.num_offline {
265                continue;
266            }
267
268            let instances = cfg.consensus.iter_module_instances();
269            let decoders = self.server_init.available_decoders(instances).unwrap();
270            let db = Database::new(MemDatabase::new(), decoders);
271            let module_init_registry = self.server_init.clone();
272            let subgroup = task_group.make_subgroup();
273            let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().keep();
274            let code_version_str = env!("CARGO_PKG_VERSION");
275
276            let connector = TlsTcpConnector::new(
277                cfg.tls_config(),
278                p2p_bind,
279                cfg.local.p2p_endpoints.clone(),
280                cfg.local.identity,
281            )
282            .await
283            .into_dyn();
284
285            let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
286
287            let connections = ReconnectP2PConnections::new(
288                cfg.local.identity,
289                connector,
290                &task_group,
291                p2p_status_senders,
292            )
293            .into_dyn();
294
295            let bitcoin_rpc_connection = self.bitcoin_rpc_connection.clone();
296
297            task_group.spawn("fedimintd", move |_| async move {
298                Box::pin(consensus::run(
299                    ConnectorRegistry::build_from_testing_env()
300                        .unwrap()
301                        .bind()
302                        .await
303                        .unwrap(),
304                    connections,
305                    p2p_status_receivers,
306                    api_bind,
307                    None,
308                    vec![],
309                    cfg.clone(),
310                    db.clone(),
311                    module_init_registry,
312                    &subgroup,
313                    ApiSecrets::default(),
314                    checkpoint_dir,
315                    code_version_str.to_string(),
316                    bitcoin_rpc_connection,
317                    ui_bind,
318                    Box::new(|_| axum::Router::new()),
319                    1,
320                    ConnectionLimits {
321                        max_connections: 1000,
322                        max_requests_per_connection: 100,
323                    },
324                ))
325                .await
326                .expect("Could not initialise consensus");
327            });
328        }
329
330        for (peer_id, config) in configs.clone() {
331            if u16::from(peer_id) >= self.num_peers - self.num_offline {
332                continue;
333            }
334
335            let connectors = ConnectorRegistry::build_from_testing_env()
336                .unwrap()
337                .bind()
338                .await
339                .unwrap();
340            let api = DynGlobalApi::new_admin(
341                connectors,
342                peer_id,
343                config.consensus.api_endpoints()[&peer_id].url.clone(),
344                None,
345            )
346            .unwrap();
347
348            while let Err(e) = api
349                .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
350                .await
351            {
352                sleep_in_test(
353                    format!("Waiting for api of peer {peer_id} to come online: {e}"),
354                    Duration::from_millis(500),
355                )
356                .await;
357            }
358        }
359
360        FederationTest {
361            configs,
362            server_init: self.server_init,
363            client_init: self.client_init,
364            _task: task_group,
365            num_peers: self.num_peers,
366            num_offline: self.num_offline,
367            connectors: ConnectorRegistry::build_from_testing_env()
368                .expect("Failed to initialize endpoints for testing (env)")
369                .bind()
370                .await
371                .expect("Failed to initialize endpoints for testing"),
372        }
373    }
374}