fedimint_testing/
federation.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_core::PeerId;
11use fedimint_core::config::{ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry};
12use fedimint_core::core::ModuleKind;
13use fedimint_core::db::Database;
14use fedimint_core::db::mem_impl::MemDatabase;
15use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::{ApiAuth, ApiRequestErased};
18use fedimint_core::net::peers::IP2PConnections;
19use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
20use fedimint_gateway_common::ConnectFedPayload;
21use fedimint_gateway_server::Gateway;
22use fedimint_logging::LOG_TEST;
23use fedimint_rocksdb::RocksDb;
24use fedimint_server::config::ServerConfig;
25use fedimint_server::consensus;
26use fedimint_server::core::ServerModuleInitRegistry;
27use fedimint_server::net::api::ApiSecrets;
28use fedimint_server::net::p2p::{ReconnectP2PConnections, p2p_status_channels};
29use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
30use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
31use fedimint_testing_core::config::local_config_gen_params;
32use tracing::info;
33
34/// Test fixture for a running fedimint federation
35#[derive(Clone)]
36pub struct FederationTest {
37    configs: BTreeMap<PeerId, ServerConfig>,
38    server_init: ServerModuleInitRegistry,
39    client_init: ClientModuleInitRegistry,
40    primary_module_kind: ModuleKind,
41    _task: TaskGroup,
42    num_peers: u16,
43    num_offline: u16,
44}
45
46impl FederationTest {
47    /// Create two clients, useful for send/receive tests
48    pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
49        (self.new_client().await, self.new_client().await)
50    }
51
52    /// Create a client connected to this fed
53    pub async fn new_client(&self) -> ClientHandleArc {
54        let client_config = self.configs[&PeerId::from(0)]
55            .consensus
56            .to_client_config(&self.server_init)
57            .unwrap();
58
59        self.new_client_with(client_config, MemDatabase::new().into(), None)
60            .await
61    }
62
63    /// Create a client connected to this fed but using RocksDB instead of MemDB
64    pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
65        let client_config = self.configs[&PeerId::from(0)]
66            .consensus
67            .to_client_config(&self.server_init)
68            .unwrap();
69
70        self.new_client_with(
71            client_config,
72            RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
73                .await
74                .expect("Couldn't open DB")
75                .into(),
76            None,
77        )
78        .await
79    }
80
81    /// Create a new admin api for the given PeerId
82    pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
83        let config = self.configs.get(&peer_id).expect("peer to have config");
84
85        DynGlobalApi::new_admin(
86            peer_id,
87            config.consensus.api_endpoints()[&peer_id].url.clone(),
88            &None,
89        )
90        .await
91    }
92
93    /// Create a new admin client connected to this fed
94    pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
95        let client_config = self.configs[&PeerId::from(0)]
96            .consensus
97            .to_client_config(&self.server_init)
98            .unwrap();
99
100        let admin_creds = AdminCreds { peer_id, auth };
101
102        self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
103            .await
104    }
105
106    pub async fn new_client_with(
107        &self,
108        client_config: ClientConfig,
109        db: Database,
110        admin_creds: Option<AdminCreds>,
111    ) -> ClientHandleArc {
112        info!(target: LOG_TEST, "Setting new client with config");
113        let mut client_builder = Client::builder(db).await.expect("Failed to build client");
114        client_builder.with_module_inits(self.client_init.clone());
115        client_builder.with_primary_module_kind(self.primary_module_kind.clone());
116        if let Some(admin_creds) = admin_creds {
117            client_builder.set_admin_creds(admin_creds);
118        }
119        let client_secret = Client::load_or_generate_client_secret(client_builder.db_no_decoders())
120            .await
121            .unwrap();
122        client_builder
123            .join(
124                PlainRootSecretStrategy::to_root_secret(&client_secret),
125                client_config,
126                None,
127            )
128            .await
129            .map(Arc::new)
130            .expect("Failed to build client")
131    }
132
133    /// Return first invite code for gateways
134    pub fn invite_code(&self) -> InviteCode {
135        self.configs[&PeerId::from(0)].get_invite_code(None)
136    }
137
138    ///  Return the federation id
139    pub fn id(&self) -> FederationId {
140        self.configs[&PeerId::from(0)]
141            .consensus
142            .to_client_config(&self.server_init)
143            .unwrap()
144            .global
145            .calculate_federation_id()
146    }
147
148    /// Connects a gateway to this `FederationTest`
149    pub async fn connect_gateway(&self, gw: &Gateway) {
150        gw.handle_connect_federation(ConnectFedPayload {
151            invite_code: self.invite_code().to_string(),
152            use_tor: Some(false),
153            recover: Some(false),
154        })
155        .await
156        .expect("Failed to connect federation");
157    }
158
159    /// Return all online PeerIds
160    pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
161        // we can assume this ordering since peers are started in ascending order
162        (0..(self.num_peers - self.num_offline)).map(PeerId::from)
163    }
164
165    /// Returns true if the federation is running in a degraded state
166    pub fn is_degraded(&self) -> bool {
167        self.num_offline > 0
168    }
169}
170
171/// Builder struct for creating a `FederationTest`.
172#[derive(Clone, Debug)]
173pub struct FederationTestBuilder {
174    num_peers: u16,
175    num_offline: u16,
176    base_port: u16,
177    primary_module_kind: ModuleKind,
178    version_hash: String,
179    modules: ServerModuleConfigGenParamsRegistry,
180    server_init: ServerModuleInitRegistry,
181    client_init: ClientModuleInitRegistry,
182    bitcoin_rpc_connection: DynServerBitcoinRpc,
183}
184
185impl FederationTestBuilder {
186    pub fn new(
187        params: ServerModuleConfigGenParamsRegistry,
188        server_init: ServerModuleInitRegistry,
189        client_init: ClientModuleInitRegistry,
190        primary_module_kind: ModuleKind,
191        num_offline: u16,
192        bitcoin_rpc_connection: DynServerBitcoinRpc,
193    ) -> FederationTestBuilder {
194        let num_peers = 4;
195        Self {
196            num_peers,
197            num_offline,
198            base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
199                .expect("Failed to allocate a port range"),
200            primary_module_kind,
201            version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
202            modules: params,
203            server_init,
204            client_init,
205            bitcoin_rpc_connection,
206        }
207    }
208
209    pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
210        self.num_peers = num_peers;
211        self
212    }
213
214    pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
215        self.num_offline = num_offline;
216        self
217    }
218
219    pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
220        self.base_port = base_port;
221        self
222    }
223
224    pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
225        self.primary_module_kind = primary_module_kind;
226        self
227    }
228
229    pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
230        self.version_hash = version_hash;
231        self
232    }
233
234    #[allow(clippy::too_many_lines)]
235    pub async fn build(self) -> FederationTest {
236        let num_offline = self.num_offline;
237        assert!(
238            self.num_peers > 3 * self.num_offline,
239            "too many peers offline ({num_offline}) to reach consensus"
240        );
241        let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
242        let params =
243            local_config_gen_params(&peers, self.base_port).expect("Generates local config");
244
245        let configs = ServerConfig::trusted_dealer_gen(
246            self.modules,
247            &params,
248            &self.server_init,
249            &self.version_hash,
250        );
251
252        let task_group = TaskGroup::new();
253        for (peer_id, cfg) in configs.clone() {
254            let peer_port = self.base_port + u16::from(peer_id) * 3;
255
256            let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
257            let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
258            let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
259
260            if u16::from(peer_id) >= self.num_peers - self.num_offline {
261                continue;
262            }
263
264            let instances = cfg.consensus.iter_module_instances();
265            let decoders = self.server_init.available_decoders(instances).unwrap();
266            let db = Database::new(MemDatabase::new(), decoders);
267            let module_init_registry = self.server_init.clone();
268            let subgroup = task_group.make_subgroup();
269            let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().into_path();
270            let code_version_str = env!("CARGO_PKG_VERSION");
271
272            let connector = TlsTcpConnector::new(
273                cfg.tls_config(),
274                p2p_bind,
275                cfg.local.p2p_endpoints.clone(),
276                cfg.local.identity,
277            )
278            .await
279            .into_dyn();
280
281            let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
282
283            let connections = ReconnectP2PConnections::new(
284                cfg.local.identity,
285                connector,
286                &task_group,
287                p2p_status_senders,
288            )
289            .into_dyn();
290
291            let bitcoin_rpc_connection = self.bitcoin_rpc_connection.clone();
292
293            task_group.spawn("fedimintd", move |_| async move {
294                Box::pin(consensus::run(
295                    connections,
296                    p2p_status_receivers,
297                    api_bind,
298                    cfg.clone(),
299                    db.clone(),
300                    module_init_registry,
301                    &subgroup,
302                    ApiSecrets::default(),
303                    checkpoint_dir,
304                    code_version_str.to_string(),
305                    bitcoin_rpc_connection,
306                    ui_bind,
307                    Box::new(|_| axum::Router::new()),
308                    1,
309                ))
310                .await
311                .expect("Could not initialise consensus");
312            });
313        }
314
315        for (peer_id, config) in configs.clone() {
316            if u16::from(peer_id) >= self.num_peers - self.num_offline {
317                continue;
318            }
319
320            // FIXME: (@leonardo) Currently there is no support for Tor while testing,
321            // defaulting to Tcp variant.
322            let api = DynGlobalApi::new_admin(
323                peer_id,
324                config.consensus.api_endpoints()[&peer_id].url.clone(),
325                &None,
326            )
327            .await
328            .unwrap();
329
330            while let Err(e) = api
331                .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
332                .await
333            {
334                sleep_in_test(
335                    format!("Waiting for api of peer {peer_id} to come online: {e}"),
336                    Duration::from_millis(500),
337                )
338                .await;
339            }
340        }
341
342        FederationTest {
343            configs,
344            server_init: self.server_init,
345            client_init: self.client_init,
346            primary_module_kind: self.primary_module_kind,
347            _task: task_group,
348            num_peers: self.num_peers,
349            num_offline: self.num_offline,
350        }
351    }
352}