1use std::collections::BTreeMap;
2use std::io::ErrorKind;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::{env, fs, io};
6
7use anyhow::{Context, bail, format_err};
8use fedimint_client::db::{apply_migrations_client_module, apply_migrations_core_client_dbtx};
9use fedimint_client::module_init::DynClientModuleInit;
10use fedimint_client::sm::executor::{
11 ActiveStateKeyBytes, ActiveStateKeyPrefix, InactiveStateKeyBytes, InactiveStateKeyPrefix,
12};
13use fedimint_client_module::module::ClientModule;
14use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
15use fedimint_core::core::OperationId;
16use fedimint_core::db::{
17 Database, DatabaseVersion, DbMigrationFn, IDatabaseTransactionOpsCoreTyped, apply_migrations,
18};
19use fedimint_core::module::CommonModuleInit;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_core::task::block_in_place;
22use fedimint_logging::LOG_TEST;
23use fedimint_rocksdb::RocksDb;
24use fedimint_server::consensus::db::ServerDbMigrationContext;
25use fedimint_server::core::DynServerModuleInit;
26use futures::future::BoxFuture;
27use futures::{FutureExt, StreamExt};
28use rand::RngCore;
29use rand::rngs::OsRng;
30use tempfile::TempDir;
31use tracing::{debug, trace};
32
33use crate::envs::FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV;
34
35pub fn get_project_root() -> io::Result<PathBuf> {
43 let path = env::current_dir()?;
44 let path_ancestors = path.as_path().ancestors();
45
46 for path in path_ancestors {
47 if path.join("Cargo.lock").try_exists()? {
48 return Ok(PathBuf::from(path));
49 }
50 }
51 Err(io::Error::new(
52 ErrorKind::NotFound,
53 "Ran out of places to find Cargo.toml",
54 ))
55}
56
57async fn open_snapshot_db(
61 decoders: ModuleDecoderRegistry,
62 snapshot_dir: &Path,
63 is_isolated: bool,
64) -> anyhow::Result<Database> {
65 if is_isolated {
66 Ok(Database::new(
67 RocksDb::open(snapshot_dir)
68 .await
69 .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
70 decoders,
71 )
72 .with_prefix_module_id(TEST_MODULE_INSTANCE_ID)
73 .0)
74 } else {
75 Ok(Database::new(
76 RocksDb::open(snapshot_dir)
77 .await
78 .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
79 decoders,
80 ))
81 }
82}
83
84async fn create_snapshot<'a, F>(
88 snapshot_dir: PathBuf,
89 decoders: ModuleDecoderRegistry,
90 is_isolated: bool,
91 prepare_fn: F,
92) -> anyhow::Result<()>
93where
94 F: FnOnce(Database) -> BoxFuture<'a, ()>,
95{
96 match (
97 std::env::var_os(FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV)
98 .map(|s| s.to_string_lossy().into_owned())
99 .as_deref(),
100 snapshot_dir.exists(),
101 ) {
102 (Some("force"), true) => {
103 tokio::fs::remove_dir_all(&snapshot_dir).await?;
104 let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated).await?;
105 prepare_fn(db).await;
106 }
107 (Some(_), true) => {
108 bail!(
109 "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} set, but {} already exists already exists. Set to 'force' to overwrite.",
110 snapshot_dir.display()
111 );
112 }
113 (Some(_), false) => {
114 debug!(dir = %snapshot_dir.display(), "Snapshot dir does not exist. Creating.");
115 let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated).await?;
116 prepare_fn(db).await;
117 }
118 (None, true) => {
119 debug!(dir = %snapshot_dir.display(), "Snapshot dir already exist. Nothing to do.");
120 }
121 (None, false) => {
122 bail!(
123 "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} not set, but {} doest not exist.",
124 snapshot_dir.display()
125 );
126 }
127 }
128 Ok(())
129}
130
131pub async fn snapshot_db_migrations_with_decoders<'a, F>(
137 snapshot_name: &str,
138 prepare_fn: F,
139 decoders: ModuleDecoderRegistry,
140) -> anyhow::Result<()>
141where
142 F: Fn(Database) -> BoxFuture<'a, ()>,
143{
144 let project_root = get_project_root().unwrap();
145 let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
146 create_snapshot(snapshot_dir, decoders, false, prepare_fn).await
147}
148
149pub async fn snapshot_db_migrations<'a, F, I>(
154 snapshot_name: &str,
155 prepare_fn: F,
156) -> anyhow::Result<()>
157where
158 F: Fn(Database) -> BoxFuture<'a, ()>,
159 I: CommonModuleInit,
160{
161 let project_root = get_project_root().unwrap();
162 let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
163
164 let decoders =
165 ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
166 create_snapshot(snapshot_dir, decoders, true, prepare_fn).await
167}
168
169pub async fn snapshot_db_migrations_client<'a, F, S, I>(
176 snapshot_name: &str,
177 data_prepare: F,
178 state_machine_prepare: S,
179) -> anyhow::Result<()>
180where
181 F: Fn(Database) -> BoxFuture<'a, ()> + Send + Sync,
182 S: Fn() -> (Vec<Vec<u8>>, Vec<Vec<u8>>) + Send + Sync,
183 I: CommonModuleInit,
184{
185 let project_root = get_project_root().unwrap();
186 let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
187
188 let decoders =
189 ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
190
191 let snapshot_fn = |db: Database| {
192 async move {
193 let isolated_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
194 data_prepare(isolated_db).await;
195
196 let (active_states, inactive_states) = state_machine_prepare();
197 let mut global_dbtx = db.begin_transaction().await;
198
199 for state in active_states {
200 global_dbtx
201 .insert_new_entry(
202 &ActiveStateKeyBytes {
203 operation_id: OperationId::new_random(),
204 module_instance_id: TEST_MODULE_INSTANCE_ID,
205 state,
206 },
207 &ActiveStateMeta {
208 created_at: fedimint_core::time::now(),
209 },
210 )
211 .await;
212 }
213
214 for state in inactive_states {
215 global_dbtx
216 .insert_new_entry(
217 &InactiveStateKeyBytes {
218 operation_id: OperationId::new_random(),
219 module_instance_id: TEST_MODULE_INSTANCE_ID,
220 state,
221 },
222 &InactiveStateMeta {
223 created_at: fedimint_core::time::now(),
224 exited_at: fedimint_core::time::now(),
225 },
226 )
227 .await;
228 }
229
230 global_dbtx.commit_tx().await;
231 }
232 .boxed()
233 };
234
235 create_snapshot(snapshot_dir, decoders, false, snapshot_fn).await
236}
237
238pub const STRING_64: &str = "0123456789012345678901234567890101234567890123456789012345678901";
239pub const BYTE_8: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
240pub const BYTE_20: [u8; 20] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
241pub const BYTE_32: [u8; 32] = [
242 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
243];
244pub const BYTE_33: [u8; 33] = [
245 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
246 2,
247];
248pub const TEST_MODULE_INSTANCE_ID: u16 = 0;
249
250async fn get_temp_database(
254 db_prefix: &str,
255 decoders: &ModuleDecoderRegistry,
256) -> anyhow::Result<(Database, TempDir)> {
257 let snapshot_dirs = get_project_root().unwrap().join("db/migrations");
258 if snapshot_dirs.exists() {
259 for file in fs::read_dir(&snapshot_dirs)
260 .with_context(|| format!("Reading dir content: {}", snapshot_dirs.display()))?
261 .flatten()
262 {
263 let name = file
264 .file_name()
265 .into_string()
266 .map_err(|_e| format_err!("Invalid path name"))?;
267 if name.starts_with(db_prefix) && !name.ends_with("lock") {
268 let temp_path = format!("{}-{}", name.as_str(), OsRng.next_u64());
269
270 let temp_db = open_temp_db_and_copy(&temp_path, &file.path(), decoders.clone())
271 .await
272 .with_context(|| {
273 format!("Opening temp db for {name}. Copying to {temp_path}")
274 })?;
275 return Ok(temp_db);
276 }
277 }
278 }
279
280 Err(anyhow::anyhow!(
281 "No database with prefix {db_prefix} in backup directory"
282 ))
283}
284
285pub async fn validate_migrations_global<F, Fut, C>(
290 validate: F,
291 ctx: C,
292 db_prefix: &str,
293 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
294 decoders: ModuleDecoderRegistry,
295) -> anyhow::Result<()>
296where
297 F: Fn(Database) -> Fut,
298 Fut: futures::Future<Output = anyhow::Result<()>>,
299 C: Clone,
300{
301 let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
302 apply_migrations(&db, ctx, db_prefix.to_string(), migrations, None, None)
303 .await
304 .context("Error applying migrations to temp database")?;
305
306 validate(db)
307 .await
308 .with_context(|| format!("Validating {db_prefix}"))?;
309 Ok(())
310}
311
312pub async fn validate_migrations_server<F, Fut>(
316 module: DynServerModuleInit,
317 db_prefix: &str,
318 validate: F,
319) -> anyhow::Result<()>
320where
321 F: Fn(Database) -> Fut,
322 Fut: futures::Future<Output = anyhow::Result<()>>,
323{
324 let decoders = ModuleDecoderRegistry::from_iter([(
325 TEST_MODULE_INSTANCE_ID,
326 module.module_kind(),
327 module.decoder(),
328 )]);
329 let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
330 apply_migrations(
331 &db,
332 Arc::new(ServerDbMigrationContext) as Arc<_>,
333 module.module_kind().to_string(),
334 module.get_database_migrations(),
335 Some(TEST_MODULE_INSTANCE_ID),
336 None,
337 )
338 .await
339 .context("Error applying migrations to temp database")?;
340
341 let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
342 validate(module_db)
343 .await
344 .with_context(|| format!("Validating {db_prefix}"))?;
345
346 Ok(())
347}
348
349pub async fn validate_migrations_core_client<F, Fut>(
354 db_prefix: &str,
355 validate: F,
356) -> anyhow::Result<()>
357where
358 F: Fn(Database) -> Fut,
359 Fut: futures::Future<Output = anyhow::Result<()>>,
360{
361 let (db, _tmp_dir) = get_temp_database(db_prefix, &ModuleDecoderRegistry::default()).await?;
362 let mut dbtx = db.begin_transaction().await;
363 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), db_prefix.to_string())
364 .await
365 .context("Error applying core client migrations to temp database")?;
366 dbtx.commit_tx_result().await?;
367
368 validate(db)
369 .await
370 .with_context(|| format!("Validating {db_prefix}"))?;
371
372 Ok(())
373}
374
375pub async fn validate_migrations_client<F, Fut, T>(
381 module: DynClientModuleInit,
382 db_prefix: &str,
383 validate: F,
384) -> anyhow::Result<()>
385where
386 F: Fn(Database, Vec<T::States>, Vec<T::States>) -> Fut,
387 Fut: futures::Future<Output = anyhow::Result<()>>,
388 T: ClientModule,
389{
390 let decoders = ModuleDecoderRegistry::from_iter([(
391 TEST_MODULE_INSTANCE_ID,
392 module.as_common().module_kind(),
393 T::decoder(),
394 )]);
395 let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
396 apply_migrations_client_module(
397 &db,
398 module.as_common().module_kind().to_string(),
399 module.get_database_migrations(),
400 TEST_MODULE_INSTANCE_ID,
401 )
402 .await
403 .context("Error applying migrations to temp database")?;
404
405 let mut global_dbtx = db.begin_transaction_nc().await;
406 let active_states = global_dbtx
407 .find_by_prefix(&ActiveStateKeyPrefix)
408 .await
409 .filter_map(|(state, _)| async move {
410 state.0.state.as_any().downcast_ref::<T::States>().cloned()
411 })
412 .collect::<Vec<_>>()
413 .await;
414
415 let inactive_states = global_dbtx
416 .find_by_prefix(&InactiveStateKeyPrefix)
417 .await
418 .filter_map(|(state, _)| async move {
419 state.0.state.as_any().downcast_ref::<T::States>().cloned()
420 })
421 .collect::<Vec<_>>()
422 .await;
423
424 let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
425 validate(module_db, active_states, inactive_states)
426 .await
427 .with_context(|| format!("Validating {db_prefix}"))?;
428
429 Ok(())
430}
431
432async fn open_temp_db_and_copy(
435 temp_path: &str,
436 src_dir: &Path,
437 decoders: ModuleDecoderRegistry,
438) -> anyhow::Result<(Database, TempDir)> {
439 let tmp_dir = block_in_place(|| -> anyhow::Result<TempDir> {
442 let tmp_dir = tempfile::Builder::new().prefix(temp_path).tempdir()?;
443 copy_directory_blocking(src_dir, tmp_dir.path())
444 .context("Error copying database to temporary directory")?;
445
446 Ok(tmp_dir)
447 })?;
448
449 Ok((
450 Database::new(RocksDb::open(&tmp_dir).await?, decoders),
451 tmp_dir,
452 ))
453}
454
455pub fn copy_directory_blocking(src: &Path, dst: &Path) -> io::Result<()> {
458 trace!(target: LOG_TEST, src = %src.display(), dst = %dst.display(), "Copy dir");
459
460 fs::create_dir_all(dst)?;
462
463 for entry in fs::read_dir(src)? {
464 let entry = entry?;
465 let path = entry.path();
466 if path.is_dir() {
467 copy_directory_blocking(&path, &dst.join(entry.file_name()))?;
468 } else {
469 let dst_path = dst.join(entry.file_name());
470 trace!(target: LOG_TEST, src = %path.display(), dst = %dst_path.display(), "Copy file");
471 fs::copy(&path, &dst_path)?;
472 }
473 }
474
475 Ok(())
476}
477
478#[cfg(test)]
479mod fedimint_migration_tests {
480 use anyhow::ensure;
481 use fedimint_client::db::{ClientConfigKey, ClientConfigKeyV0};
482 use fedimint_core::config::{ClientConfigV0, FederationId, GlobalClientConfigV0};
483 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
484 use fedimint_core::module::CoreConsensusVersion;
485 use fedimint_core::module::registry::ModuleDecoderRegistry;
486 use fedimint_logging::TracingSetup;
487
488 use crate::db::{snapshot_db_migrations_with_decoders, validate_migrations_core_client};
489 async fn create_client_db_with_v0_data(db: Database) {
496 let mut dbtx = db.begin_transaction().await;
497
498 let federation_id = FederationId::dummy();
499
500 let client_config_v0 = ClientConfigV0 {
501 global: GlobalClientConfigV0 {
502 api_endpoints: Default::default(),
503 consensus_version: CoreConsensusVersion::new(0, 0),
504 meta: Default::default(),
505 },
506 modules: Default::default(),
507 };
508
509 let client_config_key_v0 = ClientConfigKeyV0 { id: federation_id };
510
511 dbtx.insert_new_entry(&client_config_key_v0, &client_config_v0)
512 .await;
513
514 dbtx.commit_tx().await;
515 }
516
517 #[tokio::test(flavor = "multi_thread")]
518 async fn snapshot_client_db_migrations() -> anyhow::Result<()> {
519 snapshot_db_migrations_with_decoders(
520 "fedimint-client",
521 |db| {
522 Box::pin(async {
523 create_client_db_with_v0_data(db).await;
524 })
525 },
526 ModuleDecoderRegistry::default(),
527 )
528 .await
529 }
530
531 #[tokio::test(flavor = "multi_thread")]
532 async fn test_client_db_migrations() -> anyhow::Result<()> {
533 let _ = TracingSetup::default().init();
534
535 validate_migrations_core_client("fedimint-client", |db| async move {
536 let mut dbtx = db.begin_transaction_nc().await;
537 ensure!(
539 dbtx.get_value(&ClientConfigKey).await.is_some(),
540 "Client config migration to v0 failed"
541 );
542
543 Ok(())
544 })
545 .await?;
546
547 Ok(())
548 }
549}