1use std::collections::BTreeMap;
2use std::io::ErrorKind;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::{env, fs, io};
6
7use anyhow::{Context, bail, format_err};
8use fedimint_client::db::{apply_migrations_client_module, apply_migrations_core_client_dbtx};
9use fedimint_client::module_init::DynClientModuleInit;
10use fedimint_client::sm::executor::{
11 ActiveStateKeyBytes, ActiveStateKeyPrefix, InactiveStateKeyBytes, InactiveStateKeyPrefix,
12};
13use fedimint_client_module::module::ClientModule;
14use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
15use fedimint_core::core::OperationId;
16use fedimint_core::db::{
17 Database, DatabaseVersion, DbMigrationFn, IDatabaseTransactionOpsCoreTyped, apply_migrations,
18};
19use fedimint_core::module::CommonModuleInit;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_core::task::block_in_place;
22use fedimint_logging::LOG_TEST;
23use fedimint_rocksdb::RocksDb;
24use fedimint_server::consensus::db::ServerDbMigrationContext;
25use fedimint_server::core::DynServerModuleInit;
26use futures::future::BoxFuture;
27use futures::{FutureExt, StreamExt};
28use rand::RngCore;
29use rand::rngs::OsRng;
30use tempfile::TempDir;
31use tracing::{debug, trace};
32
33use crate::envs::FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV;
34
35pub fn get_project_root() -> io::Result<PathBuf> {
43 let path = env::current_dir()?;
44 let path_ancestors = path.as_path().ancestors();
45
46 for path in path_ancestors {
47 if path.join("Cargo.lock").try_exists()? {
48 return Ok(PathBuf::from(path));
49 }
50 }
51 Err(io::Error::new(
52 ErrorKind::NotFound,
53 "Ran out of places to find Cargo.toml",
54 ))
55}
56
57async fn open_snapshot_db(
61 decoders: ModuleDecoderRegistry,
62 snapshot_dir: &Path,
63 is_isolated: bool,
64) -> anyhow::Result<Database> {
65 if is_isolated {
66 Ok(Database::new(
67 RocksDb::open(snapshot_dir)
68 .await
69 .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
70 decoders,
71 )
72 .with_prefix_module_id(TEST_MODULE_INSTANCE_ID)
73 .0)
74 } else {
75 Ok(Database::new(
76 RocksDb::open(snapshot_dir)
77 .await
78 .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
79 decoders,
80 ))
81 }
82}
83
84async fn create_snapshot<'a, F>(
88 snapshot_dir: PathBuf,
89 decoders: ModuleDecoderRegistry,
90 is_isolated: bool,
91 prepare_fn: F,
92) -> anyhow::Result<()>
93where
94 F: FnOnce(Database) -> BoxFuture<'a, ()>,
95{
96 match (
97 std::env::var_os(FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV)
98 .map(|s| s.to_string_lossy().into_owned())
99 .as_deref(),
100 snapshot_dir.exists(),
101 ) {
102 (Some("force"), true) => {
103 tokio::fs::remove_dir_all(&snapshot_dir).await?;
104 let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated).await?;
105 prepare_fn(db).await;
106 }
107 (Some(_), true) => {
108 bail!(
109 "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} set, but {} already exists already exists. Set to 'force' to overwrite.",
110 snapshot_dir.display()
111 );
112 }
113 (Some(_), false) => {
114 debug!(dir = %snapshot_dir.display(), "Snapshot dir does not exist. Creating.");
115 let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated).await?;
116 prepare_fn(db).await;
117 }
118 (None, true) => {
119 debug!(dir = %snapshot_dir.display(), "Snapshot dir already exist. Nothing to do.");
120 }
121 (None, false) => {
122 bail!(
123 "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} not set, but {} doest not exist.",
124 snapshot_dir.display()
125 );
126 }
127 }
128 Ok(())
129}
130
131pub async fn snapshot_db_migrations_with_decoders<'a, F>(
137 snapshot_name: &str,
138 prepare_fn: F,
139 decoders: ModuleDecoderRegistry,
140) -> anyhow::Result<()>
141where
142 F: Fn(Database) -> BoxFuture<'a, ()>,
143{
144 let project_root = get_project_root().unwrap();
145 let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
146 create_snapshot(snapshot_dir, decoders, false, prepare_fn).await
147}
148
149pub async fn snapshot_db_migrations<'a, F, I>(
154 snapshot_name: &str,
155 prepare_fn: F,
156) -> anyhow::Result<()>
157where
158 F: Fn(Database) -> BoxFuture<'a, ()>,
159 I: CommonModuleInit,
160{
161 let project_root = get_project_root().unwrap();
162 let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
163
164 let decoders =
165 ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
166 create_snapshot(snapshot_dir, decoders, true, prepare_fn).await
167}
168
169pub async fn snapshot_db_migrations_client<'a, F, S, I>(
176 snapshot_name: &str,
177 data_prepare: F,
178 state_machine_prepare: S,
179) -> anyhow::Result<()>
180where
181 F: Fn(Database) -> BoxFuture<'a, ()> + Send + Sync,
182 S: Fn() -> (Vec<Vec<u8>>, Vec<Vec<u8>>) + Send + Sync,
183 I: CommonModuleInit,
184{
185 let project_root = get_project_root().unwrap();
186 let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
187
188 let decoders =
189 ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
190
191 let snapshot_fn = |db: Database| {
192 async move {
193 let isolated_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
194 data_prepare(isolated_db).await;
195
196 let (active_states, inactive_states) = state_machine_prepare();
197 let mut global_dbtx = db.begin_transaction().await;
198
199 for state in active_states {
200 global_dbtx
201 .insert_new_entry(
202 &ActiveStateKeyBytes {
203 operation_id: OperationId::new_random(),
204 module_instance_id: TEST_MODULE_INSTANCE_ID,
205 state,
206 },
207 &ActiveStateMeta {
208 created_at: fedimint_core::time::now(),
209 },
210 )
211 .await;
212 }
213
214 for state in inactive_states {
215 global_dbtx
216 .insert_new_entry(
217 &InactiveStateKeyBytes {
218 operation_id: OperationId::new_random(),
219 module_instance_id: TEST_MODULE_INSTANCE_ID,
220 state,
221 },
222 &InactiveStateMeta {
223 created_at: fedimint_core::time::now(),
224 exited_at: fedimint_core::time::now(),
225 },
226 )
227 .await;
228 }
229
230 global_dbtx.commit_tx().await;
231 }
232 .boxed()
233 };
234
235 create_snapshot(snapshot_dir, decoders, false, snapshot_fn).await
236}
237
238pub const STRING_64: &str = "0123456789012345678901234567890101234567890123456789012345678901";
239pub const BYTE_8: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
240pub const BYTE_20: [u8; 20] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
241pub const BYTE_32: [u8; 32] = [
242 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
243];
244pub const BYTE_33: [u8; 33] = [
245 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
246 2,
247];
248pub const TEST_MODULE_INSTANCE_ID: u16 = 0;
249
250async fn get_temp_database(
254 db_prefix: &str,
255 decoders: &ModuleDecoderRegistry,
256) -> anyhow::Result<(Database, TempDir)> {
257 let snapshot_dirs = get_project_root().unwrap().join("db/migrations");
258 if snapshot_dirs.exists() {
259 for file in fs::read_dir(&snapshot_dirs)
260 .with_context(|| format!("Reading dir content: {}", snapshot_dirs.display()))?
261 .flatten()
262 {
263 let name = file
264 .file_name()
265 .into_string()
266 .map_err(|_e| format_err!("Invalid path name"))?;
267 if name.starts_with(db_prefix) {
268 let temp_path = format!("{}-{}", name.as_str(), OsRng.next_u64());
269 let temp_db = open_temp_db_and_copy(&temp_path, &file.path(), decoders.clone())
270 .await
271 .with_context(|| {
272 format!("Opening temp db for {name}. Copying to {temp_path}")
273 })?;
274 return Ok(temp_db);
275 }
276 }
277 }
278
279 Err(anyhow::anyhow!(
280 "No database with prefix {db_prefix} in backup directory"
281 ))
282}
283
284pub async fn validate_migrations_global<F, Fut, C>(
289 validate: F,
290 ctx: C,
291 db_prefix: &str,
292 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
293 decoders: ModuleDecoderRegistry,
294) -> anyhow::Result<()>
295where
296 F: Fn(Database) -> Fut,
297 Fut: futures::Future<Output = anyhow::Result<()>>,
298 C: Clone,
299{
300 let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
301 apply_migrations(&db, ctx, db_prefix.to_string(), migrations, None, None)
302 .await
303 .context("Error applying migrations to temp database")?;
304
305 validate(db)
306 .await
307 .with_context(|| format!("Validating {db_prefix}"))?;
308 Ok(())
309}
310
311pub async fn validate_migrations_server<F, Fut>(
315 module: DynServerModuleInit,
316 db_prefix: &str,
317 validate: F,
318) -> anyhow::Result<()>
319where
320 F: Fn(Database) -> Fut,
321 Fut: futures::Future<Output = anyhow::Result<()>>,
322{
323 let decoders = ModuleDecoderRegistry::from_iter([(
324 TEST_MODULE_INSTANCE_ID,
325 module.module_kind(),
326 module.decoder(),
327 )]);
328 let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
329 apply_migrations(
330 &db,
331 Arc::new(ServerDbMigrationContext) as Arc<_>,
332 module.module_kind().to_string(),
333 module.get_database_migrations(),
334 Some(TEST_MODULE_INSTANCE_ID),
335 None,
336 )
337 .await
338 .context("Error applying migrations to temp database")?;
339
340 let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
341 validate(module_db)
342 .await
343 .with_context(|| format!("Validating {db_prefix}"))?;
344
345 Ok(())
346}
347
348pub async fn validate_migrations_core_client<F, Fut>(
353 db_prefix: &str,
354 validate: F,
355) -> anyhow::Result<()>
356where
357 F: Fn(Database) -> Fut,
358 Fut: futures::Future<Output = anyhow::Result<()>>,
359{
360 let (db, _tmp_dir) = get_temp_database(db_prefix, &ModuleDecoderRegistry::default()).await?;
361 let mut dbtx = db.begin_transaction().await;
362 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), db_prefix.to_string())
363 .await
364 .context("Error applying core client migrations to temp database")?;
365 dbtx.commit_tx_result().await?;
366
367 validate(db)
368 .await
369 .with_context(|| format!("Validating {db_prefix}"))?;
370
371 Ok(())
372}
373
374pub async fn validate_migrations_client<F, Fut, T>(
380 module: DynClientModuleInit,
381 db_prefix: &str,
382 validate: F,
383) -> anyhow::Result<()>
384where
385 F: Fn(Database, Vec<T::States>, Vec<T::States>) -> Fut,
386 Fut: futures::Future<Output = anyhow::Result<()>>,
387 T: ClientModule,
388{
389 let decoders = ModuleDecoderRegistry::from_iter([(
390 TEST_MODULE_INSTANCE_ID,
391 module.as_common().module_kind(),
392 T::decoder(),
393 )]);
394 let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
395 apply_migrations_client_module(
396 &db,
397 module.as_common().module_kind().to_string(),
398 module.get_database_migrations(),
399 TEST_MODULE_INSTANCE_ID,
400 )
401 .await
402 .context("Error applying migrations to temp database")?;
403
404 let mut global_dbtx = db.begin_transaction_nc().await;
405 let active_states = global_dbtx
406 .find_by_prefix(&ActiveStateKeyPrefix)
407 .await
408 .filter_map(|(state, _)| async move {
409 state.0.state.as_any().downcast_ref::<T::States>().cloned()
410 })
411 .collect::<Vec<_>>()
412 .await;
413
414 let inactive_states = global_dbtx
415 .find_by_prefix(&InactiveStateKeyPrefix)
416 .await
417 .filter_map(|(state, _)| async move {
418 state.0.state.as_any().downcast_ref::<T::States>().cloned()
419 })
420 .collect::<Vec<_>>()
421 .await;
422
423 let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
424 validate(module_db, active_states, inactive_states)
425 .await
426 .with_context(|| format!("Validating {db_prefix}"))?;
427
428 Ok(())
429}
430
431async fn open_temp_db_and_copy(
434 temp_path: &str,
435 src_dir: &Path,
436 decoders: ModuleDecoderRegistry,
437) -> anyhow::Result<(Database, TempDir)> {
438 let tmp_dir = block_in_place(|| -> anyhow::Result<TempDir> {
441 let tmp_dir = tempfile::Builder::new().prefix(temp_path).tempdir()?;
442 copy_directory_blocking(src_dir, tmp_dir.path())
443 .context("Error copying database to temporary directory")?;
444
445 Ok(tmp_dir)
446 })?;
447
448 Ok((
449 Database::new(RocksDb::open(&tmp_dir).await?, decoders),
450 tmp_dir,
451 ))
452}
453
454pub fn copy_directory_blocking(src: &Path, dst: &Path) -> io::Result<()> {
457 trace!(target: LOG_TEST, src = %src.display(), dst = %dst.display(), "Copy dir");
458
459 fs::create_dir_all(dst)?;
461
462 for entry in fs::read_dir(src)? {
463 let entry = entry?;
464 let path = entry.path();
465 if path.is_dir() {
466 copy_directory_blocking(&path, &dst.join(entry.file_name()))?;
467 } else {
468 let dst_path = dst.join(entry.file_name());
469 trace!(target: LOG_TEST, src = %path.display(), dst = %dst_path.display(), "Copy file");
470 fs::copy(&path, &dst_path)?;
471 }
472 }
473
474 Ok(())
475}
476
477#[cfg(test)]
478mod fedimint_migration_tests {
479 use anyhow::ensure;
480 use fedimint_client::db::{ClientConfigKey, ClientConfigKeyV0};
481 use fedimint_core::config::{ClientConfigV0, FederationId, GlobalClientConfigV0};
482 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
483 use fedimint_core::module::CoreConsensusVersion;
484 use fedimint_core::module::registry::ModuleDecoderRegistry;
485 use fedimint_logging::TracingSetup;
486
487 use crate::db::{snapshot_db_migrations_with_decoders, validate_migrations_core_client};
488 async fn create_client_db_with_v0_data(db: Database) {
495 let mut dbtx = db.begin_transaction().await;
496
497 let federation_id = FederationId::dummy();
498
499 let client_config_v0 = ClientConfigV0 {
500 global: GlobalClientConfigV0 {
501 api_endpoints: Default::default(),
502 consensus_version: CoreConsensusVersion::new(0, 0),
503 meta: Default::default(),
504 },
505 modules: Default::default(),
506 };
507
508 let client_config_key_v0 = ClientConfigKeyV0 { id: federation_id };
509
510 dbtx.insert_new_entry(&client_config_key_v0, &client_config_v0)
511 .await;
512
513 dbtx.commit_tx().await;
514 }
515
516 #[tokio::test(flavor = "multi_thread")]
517 async fn snapshot_client_db_migrations() -> anyhow::Result<()> {
518 snapshot_db_migrations_with_decoders(
519 "fedimint-client",
520 |db| {
521 Box::pin(async {
522 create_client_db_with_v0_data(db).await;
523 })
524 },
525 ModuleDecoderRegistry::default(),
526 )
527 .await
528 }
529
530 #[tokio::test(flavor = "multi_thread")]
531 async fn test_client_db_migrations() -> anyhow::Result<()> {
532 let _ = TracingSetup::default().init();
533
534 validate_migrations_core_client("fedimint-client", |db| async move {
535 let mut dbtx = db.begin_transaction_nc().await;
536 ensure!(
538 dbtx.get_value(&ClientConfigKey).await.is_some(),
539 "Client config migration to v0 failed"
540 );
541
542 Ok(())
543 })
544 .await?;
545
546 Ok(())
547 }
548}