fedimint_testing/
federation.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use fedimint_api_client::api::{DynGlobalApi, FederationApiExt};
6use fedimint_client::module_init::ClientModuleInitRegistry;
7use fedimint_client::{Client, ClientHandleArc, RootSecret};
8use fedimint_client_module::AdminCreds;
9use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy};
10use fedimint_core::PeerId;
11use fedimint_core::config::{ClientConfig, FederationId, ServerModuleConfigGenParamsRegistry};
12use fedimint_core::core::ModuleKind;
13use fedimint_core::db::Database;
14use fedimint_core::db::mem_impl::MemDatabase;
15use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::module::{ApiAuth, ApiRequestErased};
18use fedimint_core::net::peers::IP2PConnections;
19use fedimint_core::task::{TaskGroup, block_in_place, sleep_in_test};
20use fedimint_gateway_common::ConnectFedPayload;
21use fedimint_gateway_server::Gateway;
22use fedimint_logging::LOG_TEST;
23use fedimint_rocksdb::RocksDb;
24use fedimint_server::config::ServerConfig;
25use fedimint_server::core::ServerModuleInitRegistry;
26use fedimint_server::net::api::ApiSecrets;
27use fedimint_server::net::p2p::{ReconnectP2PConnections, p2p_status_channels};
28use fedimint_server::net::p2p_connector::{IP2PConnector, TlsTcpConnector};
29use fedimint_server::{ConnectionLimits, consensus};
30use fedimint_server_core::bitcoin_rpc::DynServerBitcoinRpc;
31use fedimint_testing_core::config::local_config_gen_params;
32use tracing::info;
33
34/// Test fixture for a running fedimint federation
35#[derive(Clone)]
36pub struct FederationTest {
37    configs: BTreeMap<PeerId, ServerConfig>,
38    server_init: ServerModuleInitRegistry,
39    client_init: ClientModuleInitRegistry,
40    primary_module_kind: ModuleKind,
41    _task: TaskGroup,
42    num_peers: u16,
43    num_offline: u16,
44}
45
46impl FederationTest {
47    /// Create two clients, useful for send/receive tests
48    pub async fn two_clients(&self) -> (ClientHandleArc, ClientHandleArc) {
49        (self.new_client().await, self.new_client().await)
50    }
51
52    /// Create a client connected to this fed
53    pub async fn new_client(&self) -> ClientHandleArc {
54        let client_config = self.configs[&PeerId::from(0)]
55            .consensus
56            .to_client_config(&self.server_init)
57            .unwrap();
58
59        self.new_client_with(client_config, MemDatabase::new().into(), None)
60            .await
61    }
62
63    /// Create a client connected to this fed but using RocksDB instead of MemDB
64    pub async fn new_client_rocksdb(&self) -> ClientHandleArc {
65        let client_config = self.configs[&PeerId::from(0)]
66            .consensus
67            .to_client_config(&self.server_init)
68            .unwrap();
69
70        self.new_client_with(
71            client_config,
72            RocksDb::open(tempfile::tempdir().expect("Couldn't create temp dir"))
73                .await
74                .expect("Couldn't open DB")
75                .into(),
76            None,
77        )
78        .await
79    }
80
81    /// Create a new admin api for the given PeerId
82    pub async fn new_admin_api(&self, peer_id: PeerId) -> anyhow::Result<DynGlobalApi> {
83        let config = self.configs.get(&peer_id).expect("peer to have config");
84
85        DynGlobalApi::new_admin(
86            peer_id,
87            config.consensus.api_endpoints()[&peer_id].url.clone(),
88            &None,
89        )
90        .await
91    }
92
93    /// Create a new admin client connected to this fed
94    pub async fn new_admin_client(&self, peer_id: PeerId, auth: ApiAuth) -> ClientHandleArc {
95        let client_config = self.configs[&PeerId::from(0)]
96            .consensus
97            .to_client_config(&self.server_init)
98            .unwrap();
99
100        let admin_creds = AdminCreds { peer_id, auth };
101
102        self.new_client_with(client_config, MemDatabase::new().into(), Some(admin_creds))
103            .await
104    }
105
106    pub async fn new_client_with(
107        &self,
108        client_config: ClientConfig,
109        db: Database,
110        admin_creds: Option<AdminCreds>,
111    ) -> ClientHandleArc {
112        info!(target: LOG_TEST, "Setting new client with config");
113        let mut client_builder = Client::builder(db).await.expect("Failed to build client");
114        client_builder.with_module_inits(self.client_init.clone());
115        client_builder.with_primary_module_kind(self.primary_module_kind.clone());
116        if let Some(admin_creds) = admin_creds {
117            client_builder.set_admin_creds(admin_creds);
118        }
119        let client_secret = Client::load_or_generate_client_secret(client_builder.db_no_decoders())
120            .await
121            .unwrap();
122        client_builder
123            .preview_with_existing_config(client_config, None)
124            .await
125            .expect("Preview failed")
126            .join(RootSecret::StandardDoubleDerive(
127                PlainRootSecretStrategy::to_root_secret(&client_secret),
128            ))
129            .await
130            .map(Arc::new)
131            .expect("Failed to build client")
132    }
133
134    /// Return first invite code for gateways
135    pub fn invite_code(&self) -> InviteCode {
136        self.configs[&PeerId::from(0)].get_invite_code(None)
137    }
138
139    ///  Return the federation id
140    pub fn id(&self) -> FederationId {
141        self.configs[&PeerId::from(0)]
142            .consensus
143            .to_client_config(&self.server_init)
144            .unwrap()
145            .global
146            .calculate_federation_id()
147    }
148
149    /// Connects a gateway to this `FederationTest`
150    pub async fn connect_gateway(&self, gw: &Gateway) {
151        gw.handle_connect_federation(ConnectFedPayload {
152            invite_code: self.invite_code().to_string(),
153            use_tor: Some(false),
154            recover: Some(false),
155        })
156        .await
157        .expect("Failed to connect federation");
158    }
159
160    /// Return all online PeerIds
161    pub fn online_peer_ids(&self) -> impl Iterator<Item = PeerId> + use<> {
162        // we can assume this ordering since peers are started in ascending order
163        (0..(self.num_peers - self.num_offline)).map(PeerId::from)
164    }
165
166    /// Returns true if the federation is running in a degraded state
167    pub fn is_degraded(&self) -> bool {
168        self.num_offline > 0
169    }
170}
171
172/// Builder struct for creating a `FederationTest`.
173#[derive(Clone, Debug)]
174pub struct FederationTestBuilder {
175    num_peers: u16,
176    num_offline: u16,
177    base_port: u16,
178    primary_module_kind: ModuleKind,
179    version_hash: String,
180    modules: ServerModuleConfigGenParamsRegistry,
181    server_init: ServerModuleInitRegistry,
182    client_init: ClientModuleInitRegistry,
183    bitcoin_rpc_connection: DynServerBitcoinRpc,
184}
185
186impl FederationTestBuilder {
187    pub fn new(
188        params: ServerModuleConfigGenParamsRegistry,
189        server_init: ServerModuleInitRegistry,
190        client_init: ClientModuleInitRegistry,
191        primary_module_kind: ModuleKind,
192        num_offline: u16,
193        bitcoin_rpc_connection: DynServerBitcoinRpc,
194    ) -> FederationTestBuilder {
195        let num_peers = 4;
196        Self {
197            num_peers,
198            num_offline,
199            base_port: block_in_place(|| fedimint_portalloc::port_alloc(num_peers * 3))
200                .expect("Failed to allocate a port range"),
201            primary_module_kind,
202            version_hash: "fedimint-testing-dummy-version-hash".to_owned(),
203            modules: params,
204            server_init,
205            client_init,
206            bitcoin_rpc_connection,
207        }
208    }
209
210    pub fn num_peers(mut self, num_peers: u16) -> FederationTestBuilder {
211        self.num_peers = num_peers;
212        self
213    }
214
215    pub fn num_offline(mut self, num_offline: u16) -> FederationTestBuilder {
216        self.num_offline = num_offline;
217        self
218    }
219
220    pub fn base_port(mut self, base_port: u16) -> FederationTestBuilder {
221        self.base_port = base_port;
222        self
223    }
224
225    pub fn primary_module_kind(mut self, primary_module_kind: ModuleKind) -> FederationTestBuilder {
226        self.primary_module_kind = primary_module_kind;
227        self
228    }
229
230    pub fn version_hash(mut self, version_hash: String) -> FederationTestBuilder {
231        self.version_hash = version_hash;
232        self
233    }
234
235    #[allow(clippy::too_many_lines)]
236    pub async fn build(self) -> FederationTest {
237        let num_offline = self.num_offline;
238        assert!(
239            self.num_peers > 3 * self.num_offline,
240            "too many peers offline ({num_offline}) to reach consensus"
241        );
242        let peers = (0..self.num_peers).map(PeerId::from).collect::<Vec<_>>();
243        let params =
244            local_config_gen_params(&peers, self.base_port).expect("Generates local config");
245
246        let configs = ServerConfig::trusted_dealer_gen(
247            self.modules,
248            &params,
249            &self.server_init,
250            &self.version_hash,
251        );
252
253        let task_group = TaskGroup::new();
254        for (peer_id, cfg) in configs.clone() {
255            let peer_port = self.base_port + u16::from(peer_id) * 3;
256
257            let p2p_bind = format!("127.0.0.1:{peer_port}").parse().unwrap();
258            let api_bind = format!("127.0.0.1:{}", peer_port + 1).parse().unwrap();
259            let ui_bind = format!("127.0.0.1:{}", peer_port + 2).parse().unwrap();
260
261            if u16::from(peer_id) >= self.num_peers - self.num_offline {
262                continue;
263            }
264
265            let instances = cfg.consensus.iter_module_instances();
266            let decoders = self.server_init.available_decoders(instances).unwrap();
267            let db = Database::new(MemDatabase::new(), decoders);
268            let module_init_registry = self.server_init.clone();
269            let subgroup = task_group.make_subgroup();
270            let checkpoint_dir = tempfile::Builder::new().tempdir().unwrap().keep();
271            let code_version_str = env!("CARGO_PKG_VERSION");
272
273            let connector = TlsTcpConnector::new(
274                cfg.tls_config(),
275                p2p_bind,
276                cfg.local.p2p_endpoints.clone(),
277                cfg.local.identity,
278            )
279            .await
280            .into_dyn();
281
282            let (p2p_status_senders, p2p_status_receivers) = p2p_status_channels(connector.peers());
283
284            let connections = ReconnectP2PConnections::new(
285                cfg.local.identity,
286                connector,
287                &task_group,
288                p2p_status_senders,
289            )
290            .into_dyn();
291
292            let bitcoin_rpc_connection = self.bitcoin_rpc_connection.clone();
293
294            task_group.spawn("fedimintd", move |_| async move {
295                Box::pin(consensus::run(
296                    connections,
297                    p2p_status_receivers,
298                    api_bind,
299                    None,
300                    vec![],
301                    cfg.clone(),
302                    db.clone(),
303                    module_init_registry,
304                    &subgroup,
305                    ApiSecrets::default(),
306                    checkpoint_dir,
307                    code_version_str.to_string(),
308                    bitcoin_rpc_connection,
309                    ui_bind,
310                    Box::new(|_| axum::Router::new()),
311                    1,
312                    ConnectionLimits {
313                        max_connections: 1000,
314                        max_requests_per_connection: 100,
315                    },
316                ))
317                .await
318                .expect("Could not initialise consensus");
319            });
320        }
321
322        for (peer_id, config) in configs.clone() {
323            if u16::from(peer_id) >= self.num_peers - self.num_offline {
324                continue;
325            }
326
327            // FIXME: (@leonardo) Currently there is no support for Tor while testing,
328            // defaulting to Tcp variant.
329            let api = DynGlobalApi::new_admin(
330                peer_id,
331                config.consensus.api_endpoints()[&peer_id].url.clone(),
332                &None,
333            )
334            .await
335            .unwrap();
336
337            while let Err(e) = api
338                .request_admin_no_auth::<u64>(SESSION_COUNT_ENDPOINT, ApiRequestErased::default())
339                .await
340            {
341                sleep_in_test(
342                    format!("Waiting for api of peer {peer_id} to come online: {e}"),
343                    Duration::from_millis(500),
344                )
345                .await;
346            }
347        }
348
349        FederationTest {
350            configs,
351            server_init: self.server_init,
352            client_init: self.client_init,
353            primary_module_kind: self.primary_module_kind,
354            _task: task_group,
355            num_peers: self.num_peers,
356            num_offline: self.num_offline,
357        }
358    }
359}