1use std::collections::BTreeMap;
2use std::io::ErrorKind;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::{env, fs, io};
6
7use anyhow::{Context, bail, format_err};
8use fedimint_client::db::{apply_migrations_client_module, apply_migrations_core_client_dbtx};
9use fedimint_client::module_init::DynClientModuleInit;
10use fedimint_client::sm::executor::{
11 ActiveStateKeyBytes, ActiveStateKeyPrefix, InactiveStateKeyBytes, InactiveStateKeyPrefix,
12};
13use fedimint_client_module::module::ClientModule;
14use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
15use fedimint_core::core::OperationId;
16use fedimint_core::db::{
17 Database, DatabaseVersion, DbMigrationFn, IDatabaseTransactionOpsCoreTyped, apply_migrations,
18};
19use fedimint_core::module::CommonModuleInit;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_core::task::block_in_place;
22use fedimint_logging::LOG_TEST;
23use fedimint_rocksdb::RocksDb;
24use fedimint_server::consensus::db::ServerDbMigrationContext;
25use fedimint_server::core::DynServerModuleInit;
26use futures::future::BoxFuture;
27use futures::{FutureExt, StreamExt};
28use rand::RngCore;
29use rand::rngs::OsRng;
30use tempfile::TempDir;
31use tracing::{debug, trace};
32
33use crate::envs::FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV;
34
35pub fn get_project_root() -> io::Result<PathBuf> {
43 let path = env::current_dir()?;
44 let path_ancestors = path.as_path().ancestors();
45
46 for path in path_ancestors {
47 if path.join("Cargo.lock").try_exists()? {
48 return Ok(PathBuf::from(path));
49 }
50 }
51 Err(io::Error::new(
52 ErrorKind::NotFound,
53 "Ran out of places to find Cargo.toml",
54 ))
55}
56
57async fn open_snapshot_db(
61 decoders: ModuleDecoderRegistry,
62 snapshot_dir: &Path,
63 is_isolated: bool,
64) -> anyhow::Result<Database> {
65 if is_isolated {
66 Ok(Database::new(
67 RocksDb::build(snapshot_dir)
68 .open()
69 .await
70 .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
71 decoders,
72 )
73 .with_prefix_module_id(TEST_MODULE_INSTANCE_ID)
74 .0)
75 } else {
76 Ok(Database::new(
77 RocksDb::build(snapshot_dir)
78 .open()
79 .await
80 .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
81 decoders,
82 ))
83 }
84}
85
86async fn create_snapshot<'a, F>(
90 snapshot_dir: PathBuf,
91 decoders: ModuleDecoderRegistry,
92 is_isolated: bool,
93 prepare_fn: F,
94) -> anyhow::Result<()>
95where
96 F: FnOnce(Database) -> BoxFuture<'a, ()>,
97{
98 match (
99 std::env::var_os(FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV)
100 .map(|s| s.to_string_lossy().into_owned())
101 .as_deref(),
102 snapshot_dir.exists(),
103 ) {
104 (Some("force"), true) => {
105 tokio::fs::remove_dir_all(&snapshot_dir).await?;
106 let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated).await?;
107 prepare_fn(db).await;
108 }
109 (Some(_), true) => {
110 bail!(
111 "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} set, but {} already exists already exists. Set to 'force' to overwrite.",
112 snapshot_dir.display()
113 );
114 }
115 (Some(_), false) => {
116 debug!(dir = %snapshot_dir.display(), "Snapshot dir does not exist. Creating.");
117 let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated).await?;
118 prepare_fn(db).await;
119 }
120 (None, true) => {
121 debug!(dir = %snapshot_dir.display(), "Snapshot dir already exist. Nothing to do.");
122 }
123 (None, false) => {
124 bail!(
125 "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} not set, but {} doest not exist.",
126 snapshot_dir.display()
127 );
128 }
129 }
130 Ok(())
131}
132
133pub async fn snapshot_db_migrations_with_decoders<'a, F>(
139 snapshot_name: &str,
140 prepare_fn: F,
141 decoders: ModuleDecoderRegistry,
142) -> anyhow::Result<()>
143where
144 F: Fn(Database) -> BoxFuture<'a, ()>,
145{
146 let project_root = get_project_root().unwrap();
147 let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
148 create_snapshot(snapshot_dir, decoders, false, prepare_fn).await
149}
150
151pub async fn snapshot_db_migrations<'a, F, I>(
156 snapshot_name: &str,
157 prepare_fn: F,
158) -> anyhow::Result<()>
159where
160 F: Fn(Database) -> BoxFuture<'a, ()>,
161 I: CommonModuleInit,
162{
163 let project_root = get_project_root().unwrap();
164 let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
165
166 let decoders =
167 ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
168 create_snapshot(snapshot_dir, decoders, true, prepare_fn).await
169}
170
171pub async fn snapshot_db_migrations_client<'a, F, S, I>(
178 snapshot_name: &str,
179 data_prepare: F,
180 state_machine_prepare: S,
181) -> anyhow::Result<()>
182where
183 F: Fn(Database) -> BoxFuture<'a, ()> + Send + Sync,
184 S: Fn() -> (Vec<Vec<u8>>, Vec<Vec<u8>>) + Send + Sync,
185 I: CommonModuleInit,
186{
187 let project_root = get_project_root().unwrap();
188 let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
189
190 let decoders =
191 ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
192
193 let snapshot_fn = |db: Database| {
194 async move {
195 let isolated_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
196 data_prepare(isolated_db).await;
197
198 let (active_states, inactive_states) = state_machine_prepare();
199 let mut global_dbtx = db.begin_transaction().await;
200
201 for state in active_states {
202 global_dbtx
203 .insert_new_entry(
204 &ActiveStateKeyBytes {
205 operation_id: OperationId::new_random(),
206 module_instance_id: TEST_MODULE_INSTANCE_ID,
207 state,
208 },
209 &ActiveStateMeta {
210 created_at: fedimint_core::time::now(),
211 },
212 )
213 .await;
214 }
215
216 for state in inactive_states {
217 global_dbtx
218 .insert_new_entry(
219 &InactiveStateKeyBytes {
220 operation_id: OperationId::new_random(),
221 module_instance_id: TEST_MODULE_INSTANCE_ID,
222 state,
223 },
224 &InactiveStateMeta {
225 created_at: fedimint_core::time::now(),
226 exited_at: fedimint_core::time::now(),
227 },
228 )
229 .await;
230 }
231
232 global_dbtx.commit_tx().await;
233 }
234 .boxed()
235 };
236
237 create_snapshot(snapshot_dir, decoders, false, snapshot_fn).await
238}
239
240pub const STRING_64: &str = "0123456789012345678901234567890101234567890123456789012345678901";
241pub const BYTE_8: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
242pub const BYTE_20: [u8; 20] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
243pub const BYTE_32: [u8; 32] = [
244 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
245];
246pub const BYTE_33: [u8; 33] = [
247 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
248 2,
249];
250pub const TEST_MODULE_INSTANCE_ID: u16 = 0;
251
252async fn get_temp_database(
256 db_prefix: &str,
257 decoders: &ModuleDecoderRegistry,
258) -> anyhow::Result<(Database, TempDir)> {
259 let snapshot_dirs = get_project_root().unwrap().join("db/migrations");
260 if snapshot_dirs.exists() {
261 for file in fs::read_dir(&snapshot_dirs)
262 .with_context(|| format!("Reading dir content: {}", snapshot_dirs.display()))?
263 .flatten()
264 {
265 let name = file
266 .file_name()
267 .into_string()
268 .map_err(|_e| format_err!("Invalid path name"))?;
269 if name.starts_with(db_prefix) && !name.ends_with("lock") {
270 let temp_path = format!("{}-{}", name.as_str(), OsRng.next_u64());
271
272 let temp_db = open_temp_db_and_copy(&temp_path, &file.path(), decoders.clone())
273 .await
274 .with_context(|| {
275 format!("Opening temp db for {name}. Copying to {temp_path}")
276 })?;
277 return Ok(temp_db);
278 }
279 }
280 }
281
282 Err(anyhow::anyhow!(
283 "No database with prefix {db_prefix} in backup directory"
284 ))
285}
286
287pub async fn validate_migrations_global<F, Fut, C>(
292 validate: F,
293 ctx: C,
294 db_prefix: &str,
295 migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
296 decoders: ModuleDecoderRegistry,
297) -> anyhow::Result<()>
298where
299 F: Fn(Database) -> Fut,
300 Fut: futures::Future<Output = anyhow::Result<()>>,
301 C: Clone,
302{
303 let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
304 apply_migrations(&db, ctx, db_prefix.to_string(), migrations, None, None)
305 .await
306 .context("Error applying migrations to temp database")?;
307
308 validate(db)
309 .await
310 .with_context(|| format!("Validating {db_prefix}"))?;
311 Ok(())
312}
313
314pub async fn validate_migrations_server<F, Fut>(
318 module: DynServerModuleInit,
319 db_prefix: &str,
320 validate: F,
321) -> anyhow::Result<()>
322where
323 F: Fn(Database) -> Fut,
324 Fut: futures::Future<Output = anyhow::Result<()>>,
325{
326 let decoders = ModuleDecoderRegistry::from_iter([(
327 TEST_MODULE_INSTANCE_ID,
328 module.module_kind(),
329 module.decoder(),
330 )]);
331 let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
332 apply_migrations(
333 &db,
334 Arc::new(ServerDbMigrationContext) as Arc<_>,
335 module.module_kind().to_string(),
336 module.get_database_migrations(),
337 Some(TEST_MODULE_INSTANCE_ID),
338 None,
339 )
340 .await
341 .context("Error applying migrations to temp database")?;
342
343 let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
344 validate(module_db)
345 .await
346 .with_context(|| format!("Validating {db_prefix}"))?;
347
348 Ok(())
349}
350
351pub async fn validate_migrations_core_client<F, Fut>(
356 db_prefix: &str,
357 validate: F,
358) -> anyhow::Result<()>
359where
360 F: Fn(Database) -> Fut,
361 Fut: futures::Future<Output = anyhow::Result<()>>,
362{
363 let (db, _tmp_dir) = get_temp_database(db_prefix, &ModuleDecoderRegistry::default()).await?;
364 let mut dbtx = db.begin_transaction().await;
365 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), db_prefix.to_string())
366 .await
367 .context("Error applying core client migrations to temp database")?;
368 dbtx.commit_tx_result().await?;
369
370 validate(db)
371 .await
372 .with_context(|| format!("Validating {db_prefix}"))?;
373
374 Ok(())
375}
376
377pub async fn validate_migrations_client<F, Fut, T>(
383 module: DynClientModuleInit,
384 db_prefix: &str,
385 validate: F,
386) -> anyhow::Result<()>
387where
388 F: Fn(Database, Vec<T::States>, Vec<T::States>) -> Fut,
389 Fut: futures::Future<Output = anyhow::Result<()>>,
390 T: ClientModule,
391{
392 let decoders = ModuleDecoderRegistry::from_iter([(
393 TEST_MODULE_INSTANCE_ID,
394 module.as_common().module_kind(),
395 T::decoder(),
396 )]);
397 let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
398 apply_migrations_client_module(
399 &db,
400 module.as_common().module_kind().to_string(),
401 module.get_database_migrations(),
402 TEST_MODULE_INSTANCE_ID,
403 )
404 .await
405 .context("Error applying migrations to temp database")?;
406
407 let mut global_dbtx = db.begin_transaction_nc().await;
408 let active_states = global_dbtx
409 .find_by_prefix(&ActiveStateKeyPrefix)
410 .await
411 .filter_map(|(state, _)| async move {
412 state.0.state.as_any().downcast_ref::<T::States>().cloned()
413 })
414 .collect::<Vec<_>>()
415 .await;
416
417 let inactive_states = global_dbtx
418 .find_by_prefix(&InactiveStateKeyPrefix)
419 .await
420 .filter_map(|(state, _)| async move {
421 state.0.state.as_any().downcast_ref::<T::States>().cloned()
422 })
423 .collect::<Vec<_>>()
424 .await;
425
426 let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
427 validate(module_db, active_states, inactive_states)
428 .await
429 .with_context(|| format!("Validating {db_prefix}"))?;
430
431 Ok(())
432}
433
434async fn open_temp_db_and_copy(
437 temp_path: &str,
438 src_dir: &Path,
439 decoders: ModuleDecoderRegistry,
440) -> anyhow::Result<(Database, TempDir)> {
441 let tmp_dir = block_in_place(|| -> anyhow::Result<TempDir> {
444 let tmp_dir = tempfile::Builder::new().prefix(temp_path).tempdir()?;
445 copy_directory_blocking(src_dir, tmp_dir.path())
446 .context("Error copying database to temporary directory")?;
447
448 Ok(tmp_dir)
449 })?;
450
451 Ok((
452 Database::new(RocksDb::build(&tmp_dir).open().await?, decoders),
453 tmp_dir,
454 ))
455}
456
457pub fn copy_directory_blocking(src: &Path, dst: &Path) -> io::Result<()> {
460 trace!(target: LOG_TEST, src = %src.display(), dst = %dst.display(), "Copy dir");
461
462 fs::create_dir_all(dst)?;
464
465 for entry in fs::read_dir(src)? {
466 let entry = entry?;
467 let path = entry.path();
468 if path.is_dir() {
469 copy_directory_blocking(&path, &dst.join(entry.file_name()))?;
470 } else {
471 let dst_path = dst.join(entry.file_name());
472 trace!(target: LOG_TEST, src = %path.display(), dst = %dst_path.display(), "Copy file");
473 fs::copy(&path, &dst_path)?;
474 }
475 }
476
477 Ok(())
478}
479
480#[cfg(test)]
481mod fedimint_migration_tests {
482 use anyhow::ensure;
483 use fedimint_client::db::{ClientConfigKey, ClientConfigKeyV0};
484 use fedimint_core::config::{ClientConfigV0, FederationId, GlobalClientConfigV0};
485 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
486 use fedimint_core::module::CoreConsensusVersion;
487 use fedimint_core::module::registry::ModuleDecoderRegistry;
488 use fedimint_logging::TracingSetup;
489
490 use crate::db::{snapshot_db_migrations_with_decoders, validate_migrations_core_client};
491 async fn create_client_db_with_v0_data(db: Database) {
498 let mut dbtx = db.begin_transaction().await;
499
500 let federation_id = FederationId::dummy();
501
502 let client_config_v0 = ClientConfigV0 {
503 global: GlobalClientConfigV0 {
504 api_endpoints: Default::default(),
505 consensus_version: CoreConsensusVersion::new(0, 0),
506 meta: Default::default(),
507 },
508 modules: Default::default(),
509 };
510
511 let client_config_key_v0 = ClientConfigKeyV0 { id: federation_id };
512
513 dbtx.insert_new_entry(&client_config_key_v0, &client_config_v0)
514 .await;
515
516 dbtx.commit_tx().await;
517 }
518
519 #[tokio::test(flavor = "multi_thread")]
520 async fn snapshot_client_db_migrations() -> anyhow::Result<()> {
521 snapshot_db_migrations_with_decoders(
522 "fedimint-client",
523 |db| {
524 Box::pin(async {
525 create_client_db_with_v0_data(db).await;
526 })
527 },
528 ModuleDecoderRegistry::default(),
529 )
530 .await
531 }
532
533 #[tokio::test(flavor = "multi_thread")]
534 async fn test_client_db_migrations() -> anyhow::Result<()> {
535 let _ = TracingSetup::default().init();
536
537 validate_migrations_core_client("fedimint-client", |db| async move {
538 let mut dbtx = db.begin_transaction_nc().await;
539 ensure!(
541 dbtx.get_value(&ClientConfigKey).await.is_some(),
542 "Client config migration to v0 failed"
543 );
544
545 Ok(())
546 })
547 .await?;
548
549 Ok(())
550 }
551}