fedimint_testing/
federation.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc, RootSecret};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_core::PeerId;
11use fedimint_core::config::{ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry};
12use fedimint_core::core::ModuleKind;
13use fedimint_core::db::Database;
14use fedimint_core::db::mem_impl::MemDatabase;
15use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::{ApiAuth, ApiRequestErased};
18use fedimint_core::net::peers::IP2PConnections;
19use fedimint_core::rustls::install_crypto_provider;
20use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
21use fedimint_gateway_common::ConnectFedPayload;
22use fedimint_gateway_server::Gateway;
23use fedimint_logging::LOG_TEST;
24use fedimint_rocksdb::RocksDb;
25use fedimint_server::config::ServerConfig;
26use fedimint_server::core::ServerModuleInitRegistry;
27use fedimint_server::net::api::ApiSecrets;
28use fedimint_server::net::p2p::{ReconnectP2PConnections, p2p_status_channels};
29use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
30use fedimint_server::{ConnectionLimits, consensus};
31use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
32use fedimint_testing_core::config::local_config_gen_params;
33use tracing::info;
34
35/// Test fixture for a running fedimint federation
36#[derive(Clone)]
37pub struct FederationTest {
38    configs: BTreeMap<PeerId, ServerConfig>,
39    server_init: ServerModuleInitRegistry,
40    client_init: ClientModuleInitRegistry,
41    primary_module_kind: ModuleKind,
42    _task: TaskGroup,
43    num_peers: u16,
44    num_offline: u16,
45}
46
47impl FederationTest {
48    /// Create two clients, useful for send/receive tests
49    pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
50        (self.new_client().await, self.new_client().await)
51    }
52
53    /// Create a client connected to this fed
54    pub async fn new_client(&self) -> ClientHandleArc {
55        let client_config = self.configs[&PeerId::from(0)]
56            .consensus
57            .to_client_config(&self.server_init)
58            .unwrap();
59
60        self.new_client_with(client_config, MemDatabase::new().into(), None)
61            .await
62    }
63
64    /// Create a client connected to this fed but using RocksDB instead of MemDB
65    pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
66        let client_config = self.configs[&PeerId::from(0)]
67            .consensus
68            .to_client_config(&self.server_init)
69            .unwrap();
70
71        self.new_client_with(
72            client_config,
73            RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
74                .await
75                .expect("Couldn't open DB")
76                .into(),
77            None,
78        )
79        .await
80    }
81
82    /// Create a new admin api for the given PeerId
83    pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
84        let config = self.configs.get(&peer_id).expect("peer to have config");
85
86        DynGlobalApi::new_admin(
87            peer_id,
88            config.consensus.api_endpoints()[&peer_id].url.clone(),
89            &None,
90        )
91        .await
92    }
93
94    /// Create a new admin client connected to this fed
95    pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
96        let client_config = self.configs[&PeerId::from(0)]
97            .consensus
98            .to_client_config(&self.server_init)
99            .unwrap();
100
101        let admin_creds = AdminCreds { peer_id, auth };
102
103        self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
104            .await
105    }
106
107    pub async fn new_client_with(
108        &self,
109        client_config: ClientConfig,
110        db: Database,
111        admin_creds: Option<AdminCreds>,
112    ) -> ClientHandleArc {
113        info!(target: LOG_TEST, "Setting new client with config");
114        let mut client_builder = Client::builder(db).await.expect("Failed to build client");
115        client_builder.with_module_inits(self.client_init.clone());
116        client_builder.with_primary_module_kind(self.primary_module_kind.clone());
117        if let Some(admin_creds) = admin_creds {
118            client_builder.set_admin_creds(admin_creds);
119        }
120        let client_secret = Client::load_or_generate_client_secret(client_builder.db_no_decoders())
121            .await
122            .unwrap();
123        client_builder
124            .preview_with_existing_config(client_config, None)
125            .await
126            .expect("Preview failed")
127            .join(RootSecret::StandardDoubleDerive(
128                PlainRootSecretStrategy::to_root_secret(&client_secret),
129            ))
130            .await
131            .map(Arc::new)
132            .expect("Failed to build client")
133    }
134
135    /// Return first invite code for gateways
136    pub fn invite_code(&self) -> InviteCode {
137        self.configs[&PeerId::from(0)].get_invite_code(None)
138    }
139
140    ///  Return the federation id
141    pub fn id(&self) -> FederationId {
142        self.configs[&PeerId::from(0)]
143            .consensus
144            .to_client_config(&self.server_init)
145            .unwrap()
146            .global
147            .calculate_federation_id()
148    }
149
150    /// Connects a gateway to this `FederationTest`
151    pub async fn connect_gateway(&self, gw: &Gateway) {
152        gw.handle_connect_federation(ConnectFedPayload {
153            invite_code: self.invite_code().to_string(),
154            use_tor: Some(false),
155            recover: Some(false),
156        })
157        .await
158        .expect("Failed to connect federation");
159    }
160
161    /// Return all online PeerIds
162    pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
163        // we can assume this ordering since peers are started in ascending order
164        (0..(self.num_peers - self.num_offline)).map(PeerId::from)
165    }
166
167    /// Returns true if the federation is running in a degraded state
168    pub fn is_degraded(&self) -> bool {
169        self.num_offline > 0
170    }
171}
172
173/// Builder struct for creating a `FederationTest`.
174#[derive(Clone, Debug)]
175pub struct FederationTestBuilder {
176    num_peers: u16,
177    num_offline: u16,
178    base_port: u16,
179    primary_module_kind: ModuleKind,
180    version_hash: String,
181    modules: ServerModuleConfigGenParamsRegistry,
182    server_init: ServerModuleInitRegistry,
183    client_init: ClientModuleInitRegistry,
184    bitcoin_rpc_connection: DynServerBitcoinRpc,
185}
186
187impl FederationTestBuilder {
188    pub fn new(
189        params: ServerModuleConfigGenParamsRegistry,
190        server_init: ServerModuleInitRegistry,
191        client_init: ClientModuleInitRegistry,
192        primary_module_kind: ModuleKind,
193        num_offline: u16,
194        bitcoin_rpc_connection: DynServerBitcoinRpc,
195    ) -> FederationTestBuilder {
196        let num_peers = 4;
197        Self {
198            num_peers,
199            num_offline,
200            base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
201                .expect("Failed to allocate a port range"),
202            primary_module_kind,
203            version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
204            modules: params,
205            server_init,
206            client_init,
207            bitcoin_rpc_connection,
208        }
209    }
210
211    pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
212        self.num_peers = num_peers;
213        self
214    }
215
216    pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
217        self.num_offline = num_offline;
218        self
219    }
220
221    pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
222        self.base_port = base_port;
223        self
224    }
225
226    pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
227        self.primary_module_kind = primary_module_kind;
228        self
229    }
230
231    pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
232        self.version_hash = version_hash;
233        self
234    }
235
236    #[allow(clippy::too_many_lines)]
237    pub async fn build(self) -> FederationTest {
238        install_crypto_provider().await;
239        let num_offline = self.num_offline;
240        assert!(
241            self.num_peers > 3 * self.num_offline,
242            "too many peers offline ({num_offline}) to reach consensus"
243        );
244        let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
245        let params =
246            local_config_gen_params(&peers, self.base_port).expect("Generates local config");
247
248        let configs = ServerConfig::trusted_dealer_gen(
249            self.modules,
250            &params,
251            &self.server_init,
252            &self.version_hash,
253        );
254
255        let task_group = TaskGroup::new();
256        for (peer_id, cfg) in configs.clone() {
257            let peer_port = self.base_port + u16::from(peer_id) * 3;
258
259            let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
260            let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
261            let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
262
263            if u16::from(peer_id) >= self.num_peers - self.num_offline {
264                continue;
265            }
266
267            let instances = cfg.consensus.iter_module_instances();
268            let decoders = self.server_init.available_decoders(instances).unwrap();
269            let db = Database::new(MemDatabase::new(), decoders);
270            let module_init_registry = self.server_init.clone();
271            let subgroup = task_group.make_subgroup();
272            let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().keep();
273            let code_version_str = env!("CARGO_PKG_VERSION");
274
275            let connector = TlsTcpConnector::new(
276                cfg.tls_config(),
277                p2p_bind,
278                cfg.local.p2p_endpoints.clone(),
279                cfg.local.identity,
280            )
281            .await
282            .into_dyn();
283
284            let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
285
286            let connections = ReconnectP2PConnections::new(
287                cfg.local.identity,
288                connector,
289                &task_group,
290                p2p_status_senders,
291            )
292            .into_dyn();
293
294            let bitcoin_rpc_connection = self.bitcoin_rpc_connection.clone();
295
296            task_group.spawn("fedimintd", move |_| async move {
297                Box::pin(consensus::run(
298                    connections,
299                    p2p_status_receivers,
300                    api_bind,
301                    None,
302                    vec![],
303                    cfg.clone(),
304                    db.clone(),
305                    module_init_registry,
306                    &subgroup,
307                    ApiSecrets::default(),
308                    checkpoint_dir,
309                    code_version_str.to_string(),
310                    bitcoin_rpc_connection,
311                    ui_bind,
312                    Box::new(|_| axum::Router::new()),
313                    1,
314                    ConnectionLimits {
315                        max_connections: 1000,
316                        max_requests_per_connection: 100,
317                    },
318                ))
319                .await
320                .expect("Could not initialise consensus");
321            });
322        }
323
324        for (peer_id, config) in configs.clone() {
325            if u16::from(peer_id) >= self.num_peers - self.num_offline {
326                continue;
327            }
328
329            // FIXME: (@leonardo) Currently there is no support for Tor while testing,
330            // defaulting to Tcp variant.
331            let api = DynGlobalApi::new_admin(
332                peer_id,
333                config.consensus.api_endpoints()[&peer_id].url.clone(),
334                &None,
335            )
336            .await
337            .unwrap();
338
339            while let Err(e) = api
340                .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
341                .await
342            {
343                sleep_in_test(
344                    format!("Waiting for api of peer {peer_id} to come online: {e}"),
345                    Duration::from_millis(500),
346                )
347                .await;
348            }
349        }
350
351        FederationTest {
352            configs,
353            server_init: self.server_init,
354            client_init: self.client_init,
355            primary_module_kind: self.primary_module_kind,
356            _task: task_group,
357            num_peers: self.num_peers,
358            num_offline: self.num_offline,
359        }
360    }
361}