1use std::collections::BTreeMap;
2use std::io::ErrorKind;
3use std::path::{Path, PathBuf};
4use std::{env, fs, io};
5
6use anyhow::{Context, bail, format_err};
7use fedimint_client::db::{apply_migrations_client_module, apply_migrations_core_client_dbtx};
8use fedimint_client::module_init::DynClientModuleInit;
9use fedimint_client::sm::executor::{
10 ActiveStateKeyBytes, ActiveStateKeyPrefix, InactiveStateKeyBytes, InactiveStateKeyPrefix,
11};
12use fedimint_client_module::module::ClientModule;
13use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
14use fedimint_core::core::OperationId;
15use fedimint_core::db::{
16 CoreMigrationFn, Database, DatabaseVersion, IDatabaseTransactionOpsCoreTyped, apply_migrations,
17 apply_migrations_server,
18};
19use fedimint_core::module::CommonModuleInit;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_logging::LOG_TEST;
22use fedimint_rocksdb::RocksDb;
23use fedimint_server::core::DynServerModuleInit;
24use futures::future::BoxFuture;
25use futures::{FutureExt, StreamExt};
26use rand::RngCore;
27use rand::rngs::OsRng;
28use tempfile::TempDir;
29use tracing::{debug, trace};
30
31use crate::envs::FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV;
32
33pub fn get_project_root() -> io::Result<PathBuf> {
41 let path = env::current_dir()?;
42 let path_ancestors = path.as_path().ancestors();
43
44 for path in path_ancestors {
45 if path.join("Cargo.lock").try_exists()? {
46 return Ok(PathBuf::from(path));
47 }
48 }
49 Err(io::Error::new(
50 ErrorKind::NotFound,
51 "Ran out of places to find Cargo.toml",
52 ))
53}
54
55fn open_snapshot_db(
59 decoders: ModuleDecoderRegistry,
60 snapshot_dir: &Path,
61 is_isolated: bool,
62) -> anyhow::Result<Database> {
63 if is_isolated {
64 Ok(Database::new(
65 RocksDb::open(snapshot_dir)
66 .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
67 decoders,
68 )
69 .with_prefix_module_id(TEST_MODULE_INSTANCE_ID)
70 .0)
71 } else {
72 Ok(Database::new(
73 RocksDb::open(snapshot_dir)
74 .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
75 decoders,
76 ))
77 }
78}
79
80async fn create_snapshot<'a, F>(
84 snapshot_dir: PathBuf,
85 decoders: ModuleDecoderRegistry,
86 is_isolated: bool,
87 prepare_fn: F,
88) -> anyhow::Result<()>
89where
90 F: FnOnce(Database) -> BoxFuture<'a, ()>,
91{
92 match (
93 std::env::var_os(FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV)
94 .map(|s| s.to_string_lossy().into_owned())
95 .as_deref(),
96 snapshot_dir.exists(),
97 ) {
98 (Some("force"), true) => {
99 tokio::fs::remove_dir_all(&snapshot_dir).await?;
100 let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated)?;
101 prepare_fn(db).await;
102 }
103 (Some(_), true) => {
104 bail!(
105 "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} set, but {} already exists already exists. Set to 'force' to overwrite.",
106 snapshot_dir.display()
107 );
108 }
109 (Some(_), false) => {
110 debug!(dir = %snapshot_dir.display(), "Snapshot dir does not exist. Creating.");
111 let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated)?;
112 prepare_fn(db).await;
113 }
114 (None, true) => {
115 debug!(dir = %snapshot_dir.display(), "Snapshot dir already exist. Nothing to do.");
116 }
117 (None, false) => {
118 bail!(
119 "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} not set, but {} doest not exist.",
120 snapshot_dir.display()
121 );
122 }
123 }
124 Ok(())
125}
126
127pub async fn snapshot_db_migrations_with_decoders<'a, F>(
133 snapshot_name: &str,
134 prepare_fn: F,
135 decoders: ModuleDecoderRegistry,
136) -> anyhow::Result<()>
137where
138 F: Fn(Database) -> BoxFuture<'a, ()>,
139{
140 let project_root = get_project_root().unwrap();
141 let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
142 create_snapshot(snapshot_dir, decoders, false, prepare_fn).await
143}
144
145pub async fn snapshot_db_migrations<'a, F, I>(
150 snapshot_name: &str,
151 prepare_fn: F,
152) -> anyhow::Result<()>
153where
154 F: Fn(Database) -> BoxFuture<'a, ()>,
155 I: CommonModuleInit,
156{
157 let project_root = get_project_root().unwrap();
158 let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
159
160 let decoders =
161 ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
162 create_snapshot(snapshot_dir, decoders, true, prepare_fn).await
163}
164
165pub async fn snapshot_db_migrations_client<'a, F, S, I>(
172 snapshot_name: &str,
173 data_prepare: F,
174 state_machine_prepare: S,
175) -> anyhow::Result<()>
176where
177 F: Fn(Database) -> BoxFuture<'a, ()> + Send + Sync,
178 S: Fn() -> (Vec<Vec<u8>>, Vec<Vec<u8>>) + Send + Sync,
179 I: CommonModuleInit,
180{
181 let project_root = get_project_root().unwrap();
182 let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
183
184 let decoders =
185 ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
186
187 let snapshot_fn = |db: Database| {
188 async move {
189 let isolated_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
190 data_prepare(isolated_db).await;
191
192 let (active_states, inactive_states) = state_machine_prepare();
193 let mut global_dbtx = db.begin_transaction().await;
194
195 for state in active_states {
196 global_dbtx
197 .insert_new_entry(
198 &ActiveStateKeyBytes {
199 operation_id: OperationId::new_random(),
200 module_instance_id: TEST_MODULE_INSTANCE_ID,
201 state,
202 },
203 &ActiveStateMeta {
204 created_at: fedimint_core::time::now(),
205 },
206 )
207 .await;
208 }
209
210 for state in inactive_states {
211 global_dbtx
212 .insert_new_entry(
213 &InactiveStateKeyBytes {
214 operation_id: OperationId::new_random(),
215 module_instance_id: TEST_MODULE_INSTANCE_ID,
216 state,
217 },
218 &InactiveStateMeta {
219 created_at: fedimint_core::time::now(),
220 exited_at: fedimint_core::time::now(),
221 },
222 )
223 .await;
224 }
225
226 global_dbtx.commit_tx().await;
227 }
228 .boxed()
229 };
230
231 create_snapshot(snapshot_dir, decoders, false, snapshot_fn).await
232}
233
234pub const STRING_64: &str = "0123456789012345678901234567890101234567890123456789012345678901";
235pub const BYTE_8: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
236pub const BYTE_20: [u8; 20] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
237pub const BYTE_32: [u8; 32] = [
238 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
239];
240pub const BYTE_33: [u8; 33] = [
241 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
242 2,
243];
244pub const TEST_MODULE_INSTANCE_ID: u16 = 0;
245
246fn get_temp_database(
250 db_prefix: &str,
251 decoders: &ModuleDecoderRegistry,
252) -> anyhow::Result<(Database, TempDir)> {
253 let snapshot_dirs = get_project_root().unwrap().join("db/migrations");
254 if snapshot_dirs.exists() {
255 for file in fs::read_dir(&snapshot_dirs)
256 .with_context(|| format!("Reading dir content: {}", snapshot_dirs.display()))?
257 .flatten()
258 {
259 let name = file
260 .file_name()
261 .into_string()
262 .map_err(|_e| format_err!("Invalid path name"))?;
263 if name.starts_with(db_prefix) {
264 let temp_path = format!("{}-{}", name.as_str(), OsRng.next_u64());
265 let temp_db = open_temp_db_and_copy(&temp_path, &file.path(), decoders.clone())
266 .with_context(|| {
267 format!("Opening temp db for {name}. Copying to {temp_path}")
268 })?;
269 return Ok(temp_db);
270 }
271 }
272 }
273
274 Err(anyhow::anyhow!(
275 "No database with prefix {db_prefix} in backup directory"
276 ))
277}
278
279pub async fn validate_migrations_global<F, Fut>(
284 validate: F,
285 db_prefix: &str,
286 migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
287 decoders: ModuleDecoderRegistry,
288) -> anyhow::Result<()>
289where
290 F: Fn(Database) -> Fut,
291 Fut: futures::Future<Output = anyhow::Result<()>>,
292{
293 let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders)?;
294 apply_migrations_server(&db, db_prefix.to_string(), migrations)
295 .await
296 .context("Error applying migrations to temp database")?;
297
298 validate(db)
299 .await
300 .with_context(|| format!("Validating {db_prefix}"))?;
301 Ok(())
302}
303
304pub async fn validate_migrations_server<F, Fut>(
308 module: DynServerModuleInit,
309 db_prefix: &str,
310 validate: F,
311) -> anyhow::Result<()>
312where
313 F: Fn(Database) -> Fut,
314 Fut: futures::Future<Output = anyhow::Result<()>>,
315{
316 let decoders = ModuleDecoderRegistry::from_iter([(
317 TEST_MODULE_INSTANCE_ID,
318 module.module_kind(),
319 module.decoder(),
320 )]);
321 let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders)?;
322 apply_migrations(
323 &db,
324 module.module_kind().to_string(),
325 module.get_database_migrations(),
326 Some(TEST_MODULE_INSTANCE_ID),
327 None,
328 )
329 .await
330 .context("Error applying migrations to temp database")?;
331
332 let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
333 validate(module_db)
334 .await
335 .with_context(|| format!("Validating {db_prefix}"))?;
336
337 Ok(())
338}
339
340pub async fn validate_migrations_core_client<F, Fut>(
345 db_prefix: &str,
346 validate: F,
347) -> anyhow::Result<()>
348where
349 F: Fn(Database) -> Fut,
350 Fut: futures::Future<Output = anyhow::Result<()>>,
351{
352 let (db, _tmp_dir) = get_temp_database(db_prefix, &ModuleDecoderRegistry::default())?;
353 let mut dbtx = db.begin_transaction().await;
354 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), db_prefix.to_string())
355 .await
356 .context("Error applying core client migrations to temp database")?;
357 dbtx.commit_tx_result().await?;
358
359 validate(db)
360 .await
361 .with_context(|| format!("Validating {db_prefix}"))?;
362
363 Ok(())
364}
365
366pub async fn validate_migrations_client<F, Fut, T>(
372 module: DynClientModuleInit,
373 db_prefix: &str,
374 validate: F,
375) -> anyhow::Result<()>
376where
377 F: Fn(Database, Vec<T::States>, Vec<T::States>) -> Fut,
378 Fut: futures::Future<Output = anyhow::Result<()>>,
379 T: ClientModule,
380{
381 let decoders = ModuleDecoderRegistry::from_iter([(
382 TEST_MODULE_INSTANCE_ID,
383 module.as_common().module_kind(),
384 T::decoder(),
385 )]);
386 let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders)?;
387 apply_migrations_client_module(
388 &db,
389 module.as_common().module_kind().to_string(),
390 module.get_database_migrations(),
391 TEST_MODULE_INSTANCE_ID,
392 )
393 .await
394 .context("Error applying migrations to temp database")?;
395
396 let mut global_dbtx = db.begin_transaction_nc().await;
397 let active_states = global_dbtx
398 .find_by_prefix(&ActiveStateKeyPrefix)
399 .await
400 .filter_map(|(state, _)| async move {
401 state.0.state.as_any().downcast_ref::<T::States>().cloned()
402 })
403 .collect::<Vec<_>>()
404 .await;
405
406 let inactive_states = global_dbtx
407 .find_by_prefix(&InactiveStateKeyPrefix)
408 .await
409 .filter_map(|(state, _)| async move {
410 state.0.state.as_any().downcast_ref::<T::States>().cloned()
411 })
412 .collect::<Vec<_>>()
413 .await;
414
415 let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
416 validate(module_db, active_states, inactive_states)
417 .await
418 .with_context(|| format!("Validating {db_prefix}"))?;
419
420 Ok(())
421}
422
423fn open_temp_db_and_copy(
426 temp_path: &str,
427 src_dir: &Path,
428 decoders: ModuleDecoderRegistry,
429) -> anyhow::Result<(Database, TempDir)> {
430 let tmp_dir = tempfile::Builder::new().prefix(temp_path).tempdir()?;
433 copy_directory(src_dir, tmp_dir.path())
434 .context("Error copying database to temporary directory")?;
435
436 Ok((Database::new(RocksDb::open(&tmp_dir)?, decoders), tmp_dir))
437}
438
439pub fn copy_directory(src: &Path, dst: &Path) -> io::Result<()> {
442 trace!(target: LOG_TEST, src = %src.display(), dst = %dst.display(), "Copy dir");
443
444 fs::create_dir_all(dst)?;
446
447 for entry in fs::read_dir(src)? {
448 let entry = entry?;
449 let path = entry.path();
450 if path.is_dir() {
451 copy_directory(&path, &dst.join(entry.file_name()))?;
452 } else {
453 let dst_path = dst.join(entry.file_name());
454 trace!(target: LOG_TEST, src = %path.display(), dst = %dst_path.display(), "Copy file");
455 fs::copy(&path, &dst_path)?;
456 }
457 }
458
459 Ok(())
460}
461
462#[cfg(test)]
463mod fedimint_migration_tests {
464 use anyhow::ensure;
465 use fedimint_client::db::{ClientConfigKey, ClientConfigKeyV0};
466 use fedimint_core::config::{ClientConfigV0, FederationId, GlobalClientConfigV0};
467 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
468 use fedimint_core::module::CoreConsensusVersion;
469 use fedimint_core::module::registry::ModuleDecoderRegistry;
470 use fedimint_logging::TracingSetup;
471
472 use crate::db::{snapshot_db_migrations_with_decoders, validate_migrations_core_client};
473 async fn create_client_db_with_v0_data(db: Database) {
480 let mut dbtx = db.begin_transaction().await;
481
482 let federation_id = FederationId::dummy();
483
484 let client_config_v0 = ClientConfigV0 {
485 global: GlobalClientConfigV0 {
486 api_endpoints: Default::default(),
487 consensus_version: CoreConsensusVersion::new(0, 0),
488 meta: Default::default(),
489 },
490 modules: Default::default(),
491 };
492
493 let client_config_key_v0 = ClientConfigKeyV0 { id: federation_id };
494
495 dbtx.insert_new_entry(&client_config_key_v0, &client_config_v0)
496 .await;
497
498 dbtx.commit_tx().await;
499 }
500
501 #[tokio::test(flavor = "multi_thread")]
502 async fn snapshot_client_db_migrations() -> anyhow::Result<()> {
503 snapshot_db_migrations_with_decoders(
504 "fedimint-client",
505 |db| {
506 Box::pin(async {
507 create_client_db_with_v0_data(db).await;
508 })
509 },
510 ModuleDecoderRegistry::default(),
511 )
512 .await
513 }
514
515 #[tokio::test(flavor = "multi_thread")]
516 async fn test_client_db_migrations() -> anyhow::Result<()> {
517 let _ = TracingSetup::default().init();
518
519 validate_migrations_core_client("fedimint-client", |db| async move {
520 let mut dbtx = db.begin_transaction_nc().await;
521 ensure!(
523 dbtx.get_value(&ClientConfigKey).await.is_some(),
524 "Client config migration to v0 failed"
525 );
526
527 Ok(())
528 })
529 .await?;
530
531 Ok(())
532 }
533}