fedimint_testing/
federation.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_core::PeerId;
11use fedimint_core::config::{ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry};
12use fedimint_core::core::ModuleKind;
13use fedimint_core::db::Database;
14use fedimint_core::db::mem_impl::MemDatabase;
15use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::{ApiAuth, ApiRequestErased};
18use fedimint_core::net::peers::IP2PConnections;
19use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
20use fedimint_gateway_common::ConnectFedPayload;
21use fedimint_gateway_server::Gateway;
22use fedimint_logging::LOG_TEST;
23use fedimint_rocksdb::RocksDb;
24use fedimint_server::config::ServerConfig;
25use fedimint_server::consensus;
26use fedimint_server::core::ServerModuleInitRegistry;
27use fedimint_server::net::p2p::{ReconnectP2PConnections, p2p_status_channels};
28use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
29use fedimint_testing_core::config::local_config_gen_params;
30use tracing::info;
31
32/// Test fixture for a running fedimint federation
33#[derive(Clone)]
34pub struct FederationTest {
35    configs: BTreeMap<PeerId, ServerConfig>,
36    server_init: ServerModuleInitRegistry,
37    client_init: ClientModuleInitRegistry,
38    primary_module_kind: ModuleKind,
39    _task: TaskGroup,
40    num_peers: u16,
41    num_offline: u16,
42}
43
44impl FederationTest {
45    /// Create two clients, useful for send/receive tests
46    pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
47        (self.new_client().await, self.new_client().await)
48    }
49
50    /// Create a client connected to this fed
51    pub async fn new_client(&self) -> ClientHandleArc {
52        let client_config = self.configs[&PeerId::from(0)]
53            .consensus
54            .to_client_config(&self.server_init)
55            .unwrap();
56
57        self.new_client_with(client_config, MemDatabase::new().into(), None)
58            .await
59    }
60
61    /// Create a client connected to this fed but using RocksDB instead of MemDB
62    pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
63        let client_config = self.configs[&PeerId::from(0)]
64            .consensus
65            .to_client_config(&self.server_init)
66            .unwrap();
67
68        self.new_client_with(
69            client_config,
70            RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
71                .await
72                .expect("Couldn't open DB")
73                .into(),
74            None,
75        )
76        .await
77    }
78
79    /// Create a new admin api for the given PeerId
80    pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
81        let config = self.configs.get(&peer_id).expect("peer to have config");
82
83        DynGlobalApi::new_admin(
84            peer_id,
85            config.consensus.api_endpoints()[&peer_id].url.clone(),
86            &None,
87        )
88        .await
89    }
90
91    /// Create a new admin client connected to this fed
92    pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
93        let client_config = self.configs[&PeerId::from(0)]
94            .consensus
95            .to_client_config(&self.server_init)
96            .unwrap();
97
98        let admin_creds = AdminCreds { peer_id, auth };
99
100        self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
101            .await
102    }
103
104    pub async fn new_client_with(
105        &self,
106        client_config: ClientConfig,
107        db: Database,
108        admin_creds: Option<AdminCreds>,
109    ) -> ClientHandleArc {
110        info!(target: LOG_TEST, "Setting new client with config");
111        let mut client_builder = Client::builder(db).await.expect("Failed to build client");
112        client_builder.with_module_inits(self.client_init.clone());
113        client_builder.with_primary_module_kind(self.primary_module_kind.clone());
114        if let Some(admin_creds) = admin_creds {
115            client_builder.set_admin_creds(admin_creds);
116        }
117        let client_secret = Client::load_or_generate_client_secret(client_builder.db_no_decoders())
118            .await
119            .unwrap();
120        client_builder
121            .join(
122                PlainRootSecretStrategy::to_root_secret(&client_secret),
123                client_config,
124                None,
125            )
126            .await
127            .map(Arc::new)
128            .expect("Failed to build client")
129    }
130
131    /// Return first invite code for gateways
132    pub fn invite_code(&self) -> InviteCode {
133        self.configs[&PeerId::from(0)].get_invite_code(None)
134    }
135
136    ///  Return the federation id
137    pub fn id(&self) -> FederationId {
138        self.configs[&PeerId::from(0)]
139            .consensus
140            .to_client_config(&self.server_init)
141            .unwrap()
142            .global
143            .calculate_federation_id()
144    }
145
146    /// Connects a gateway to this `FederationTest`
147    pub async fn connect_gateway(&self, gw: &Gateway) {
148        gw.handle_connect_federation(ConnectFedPayload {
149            invite_code: self.invite_code().to_string(),
150            use_tor: Some(false),
151            recover: Some(false),
152        })
153        .await
154        .expect("Failed to connect federation");
155    }
156
157    /// Return all online PeerIds
158    pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
159        // we can assume this ordering since peers are started in ascending order
160        (0..(self.num_peers - self.num_offline)).map(PeerId::from)
161    }
162
163    /// Returns true if the federation is running in a degraded state
164    pub fn is_degraded(&self) -> bool {
165        self.num_offline > 0
166    }
167}
168
169/// Builder struct for creating a `FederationTest`.
170#[derive(Clone, Debug)]
171pub struct FederationTestBuilder {
172    num_peers: u16,
173    num_offline: u16,
174    base_port: u16,
175    primary_module_kind: ModuleKind,
176    version_hash: String,
177    modules: ServerModuleConfigGenParamsRegistry,
178    server_init: ServerModuleInitRegistry,
179    client_init: ClientModuleInitRegistry,
180}
181
182impl FederationTestBuilder {
183    pub fn new(
184        params: ServerModuleConfigGenParamsRegistry,
185        server_init: ServerModuleInitRegistry,
186        client_init: ClientModuleInitRegistry,
187        primary_module_kind: ModuleKind,
188        num_offline: u16,
189    ) -> FederationTestBuilder {
190        let num_peers = 4;
191        Self {
192            num_peers,
193            num_offline,
194            base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
195                .expect("Failed to allocate a port range"),
196            primary_module_kind,
197            version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
198            modules: params,
199            server_init,
200            client_init,
201        }
202    }
203
204    pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
205        self.num_peers = num_peers;
206        self
207    }
208
209    pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
210        self.num_offline = num_offline;
211        self
212    }
213
214    pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
215        self.base_port = base_port;
216        self
217    }
218
219    pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
220        self.primary_module_kind = primary_module_kind;
221        self
222    }
223
224    pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
225        self.version_hash = version_hash;
226        self
227    }
228
229    pub async fn build(self) -> FederationTest {
230        let num_offline = self.num_offline;
231        assert!(
232            self.num_peers > 3 * self.num_offline,
233            "too many peers offline ({num_offline}) to reach consensus"
234        );
235        let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
236        let params =
237            local_config_gen_params(&peers, self.base_port).expect("Generates local config");
238
239        let configs = ServerConfig::trusted_dealer_gen(
240            self.modules,
241            &params,
242            &self.server_init,
243            &self.version_hash,
244        );
245
246        let task_group = TaskGroup::new();
247        for (peer_id, cfg) in configs.clone() {
248            let peer_port = self.base_port + u16::from(peer_id) * 3;
249
250            let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
251            let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
252            let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
253
254            if u16::from(peer_id) >= self.num_peers - self.num_offline {
255                continue;
256            }
257
258            let instances = cfg.consensus.iter_module_instances();
259            let decoders = self.server_init.available_decoders(instances).unwrap();
260            let db = Database::new(MemDatabase::new(), decoders);
261            let module_init_registry = self.server_init.clone();
262            let subgroup = task_group.make_subgroup();
263            let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().into_path();
264            let code_version_str = env!("CARGO_PKG_VERSION");
265
266            let connector = TlsTcpConnector::new(
267                cfg.tls_config(),
268                p2p_bind,
269                cfg.local.p2p_endpoints.clone(),
270                cfg.local.identity,
271            )
272            .await
273            .into_dyn();
274
275            let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
276
277            let connections = ReconnectP2PConnections::new(
278                cfg.local.identity,
279                connector,
280                &task_group,
281                p2p_status_senders,
282            )
283            .into_dyn();
284
285            task_group.spawn("fedimintd", move |_| async move {
286                Box::pin(consensus::run(
287                    connections,
288                    p2p_status_receivers,
289                    api_bind,
290                    api_bind,
291                    cfg.clone(),
292                    db.clone(),
293                    module_init_registry,
294                    &subgroup,
295                    fedimint_server::net::api::ApiSecrets::default(),
296                    checkpoint_dir,
297                    code_version_str.to_string(),
298                    ui_bind,
299                    None,
300                ))
301                .await
302                .expect("Could not initialise consensus");
303            });
304        }
305
306        for (peer_id, config) in configs.clone() {
307            if u16::from(peer_id) >= self.num_peers - self.num_offline {
308                continue;
309            }
310
311            // FIXME: (@leonardo) Currently there is no support for Tor while testing,
312            // defaulting to Tcp variant.
313            let api = DynGlobalApi::new_admin(
314                peer_id,
315                config.consensus.api_endpoints()[&peer_id].url.clone(),
316                &None,
317            )
318            .await
319            .unwrap();
320
321            while let Err(e) = api
322                .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
323                .await
324            {
325                sleep_in_test(
326                    format!("Waiting for api of peer {peer_id} to come online: {e}"),
327                    Duration::from_millis(500),
328                )
329                .await;
330            }
331        }
332
333        FederationTest {
334            configs,
335            server_init: self.server_init,
336            client_init: self.client_init,
337            primary_module_kind: self.primary_module_kind,
338            _task: task_group,
339            num_peers: self.num_peers,
340            num_offline: self.num_offline,
341        }
342    }
343}