fedimint_testing_core/
db.rs

1use std::collections::BTreeMap;
2use std::io::ErrorKind;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::{env, fs, io};
6
7use anyhow::{Context, bail, format_err};
8use fedimint_client::db::{apply_migrations_client_module, apply_migrations_core_client_dbtx};
9use fedimint_client::module_init::DynClientModuleInit;
10use fedimint_client::sm::executor::{
11    ActiveStateKeyBytes, ActiveStateKeyPrefix, InactiveStateKeyBytes, InactiveStateKeyPrefix,
12};
13use fedimint_client_module::module::ClientModule;
14use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
15use fedimint_core::core::OperationId;
16use fedimint_core::db::{
17    Database, DatabaseVersion, DbMigrationFn, IDatabaseTransactionOpsCoreTyped, apply_migrations,
18};
19use fedimint_core::module::CommonModuleInit;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_core::task::block_in_place;
22use fedimint_logging::LOG_TEST;
23use fedimint_rocksdb::RocksDb;
24use fedimint_server::consensus::db::ServerDbMigrationContext;
25use fedimint_server::core::DynServerModuleInit;
26use futures::future::BoxFuture;
27use futures::{FutureExt, StreamExt};
28use rand::RngCore;
29use rand::rngs::OsRng;
30use tempfile::TempDir;
31use tracing::{debug, trace};
32
33use crate::envs::FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV;
34
35/// Get the project root (relative to closest Cargo.lock file)
36/// ```rust
37/// match fedimint_testing_core::db::get_project_root() {
38///     Ok(p) => println!("Current project root is {:?}", p),
39///     Err(e) => println!("Error obtaining project root {:?}", e),
40/// };
41/// ```
42pub fn get_project_root() -> io::Result<PathBuf> {
43    let path = env::current_dir()?;
44    let path_ancestors = path.as_path().ancestors();
45
46    for path in path_ancestors {
47        if path.join("Cargo.lock").try_exists()? {
48            return Ok(PathBuf::from(path));
49        }
50    }
51    Err(io::Error::new(
52        ErrorKind::NotFound,
53        "Ran out of places to find Cargo.toml",
54    ))
55}
56
57/// Opens the backup database in the `snapshot_dir`. If the `is_isolated` flag
58/// is set, the database will be opened as an isolated database with
59/// `TEST_MODULE_INSTANCE_ID` as the prefix.
60async fn open_snapshot_db(
61    decoders: ModuleDecoderRegistry,
62    snapshot_dir: &Path,
63    is_isolated: bool,
64) -> anyhow::Result<Database> {
65    if is_isolated {
66        Ok(Database::new(
67            RocksDb::build(snapshot_dir)
68                .open()
69                .await
70                .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
71            decoders,
72        )
73        .with_prefix_module_id(TEST_MODULE_INSTANCE_ID)
74        .0)
75    } else {
76        Ok(Database::new(
77            RocksDb::build(snapshot_dir)
78                .open()
79                .await
80                .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
81            decoders,
82        ))
83    }
84}
85
86/// Creates a backup database in the `snapshot_dir` according to the
87/// `FM_PREPARE_DB_MIGRATION_SNAPSHOTS`, since we do not want to re-create a
88/// backup database every time we run the tests.
89async fn create_snapshot<'a, F>(
90    snapshot_dir: PathBuf,
91    decoders: ModuleDecoderRegistry,
92    is_isolated: bool,
93    prepare_fn: F,
94) -> anyhow::Result<()>
95where
96    F: FnOnce(Database) -> BoxFuture<'a, ()>,
97{
98    match (
99        std::env::var_os(FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV)
100            .map(|s| s.to_string_lossy().into_owned())
101            .as_deref(),
102        snapshot_dir.exists(),
103    ) {
104        (Some("force"), true) => {
105            tokio::fs::remove_dir_all(&snapshot_dir).await?;
106            let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated).await?;
107            prepare_fn(db).await;
108        }
109        (Some(_), true) => {
110            bail!(
111                "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} set, but {} already exists already exists. Set to 'force' to overwrite.",
112                snapshot_dir.display()
113            );
114        }
115        (Some(_), false) => {
116            debug!(dir = %snapshot_dir.display(), "Snapshot dir does not exist. Creating.");
117            let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated).await?;
118            prepare_fn(db).await;
119        }
120        (None, true) => {
121            debug!(dir = %snapshot_dir.display(), "Snapshot dir already exist. Nothing to do.");
122        }
123        (None, false) => {
124            bail!(
125                "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} not set, but {} doest not exist.",
126                snapshot_dir.display()
127            );
128        }
129    }
130    Ok(())
131}
132
133/// Creates the database backup for `snapshot_name`
134/// to `db/migrations`. Then this function will execute the provided
135/// `prepare_fn` which is expected to populate the database with the appropriate
136/// data for testing a migration. If the snapshot directory already exists,
137/// this function will do nothing.
138pub async fn snapshot_db_migrations_with_decoders<'a, F>(
139    snapshot_name: &str,
140    prepare_fn: F,
141    decoders: ModuleDecoderRegistry,
142) -> anyhow::Result<()>
143where
144    F: Fn(Database) -> BoxFuture<'a, ()>,
145{
146    let project_root = get_project_root().unwrap();
147    let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
148    create_snapshot(snapshot_dir, decoders, false, prepare_fn).await
149}
150
151/// Creates the database backup directory for a server module by appending the
152/// `snapshot_name` to `db/migrations`. Then this function will execute the
153/// provided `prepare_fn` which is expected to populate the database with the
154/// appropriate data for testing a migration.
155pub async fn snapshot_db_migrations<'a, F, I>(
156    snapshot_name: &str,
157    prepare_fn: F,
158) -> anyhow::Result<()>
159where
160    F: Fn(Database) -> BoxFuture<'a, ()>,
161    I: CommonModuleInit,
162{
163    let project_root = get_project_root().unwrap();
164    let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
165
166    let decoders =
167        ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
168    create_snapshot(snapshot_dir, decoders, true, prepare_fn).await
169}
170
171/// Create the database backup directory for a client module.
172/// Two prepare functions are taken as parameters. `data_prepare` is expected to
173/// create any data that the client module uses and is stored in the isolated
174/// namespace. `state_machine_prepare` creates client state machine data that
175/// can be used for testing state machine migrations. This is created in the
176/// global namespace.
177pub async fn snapshot_db_migrations_client<'a, F, S, I>(
178    snapshot_name: &str,
179    data_prepare: F,
180    state_machine_prepare: S,
181) -> anyhow::Result<()>
182where
183    F: Fn(Database) -> BoxFuture<'a, ()> + Send + Sync,
184    S: Fn() -> (Vec<Vec<u8>>, Vec<Vec<u8>>) + Send + Sync,
185    I: CommonModuleInit,
186{
187    let project_root = get_project_root().unwrap();
188    let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
189
190    let decoders =
191        ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
192
193    let snapshot_fn = |db: Database| {
194        async move {
195            let isolated_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
196            data_prepare(isolated_db).await;
197
198            let (active_states, inactive_states) = state_machine_prepare();
199            let mut global_dbtx = db.begin_transaction().await;
200
201            for state in active_states {
202                global_dbtx
203                    .insert_new_entry(
204                        &ActiveStateKeyBytes {
205                            operation_id: OperationId::new_random(),
206                            module_instance_id: TEST_MODULE_INSTANCE_ID,
207                            state,
208                        },
209                        &ActiveStateMeta {
210                            created_at: fedimint_core::time::now(),
211                        },
212                    )
213                    .await;
214            }
215
216            for state in inactive_states {
217                global_dbtx
218                    .insert_new_entry(
219                        &InactiveStateKeyBytes {
220                            operation_id: OperationId::new_random(),
221                            module_instance_id: TEST_MODULE_INSTANCE_ID,
222                            state,
223                        },
224                        &InactiveStateMeta {
225                            created_at: fedimint_core::time::now(),
226                            exited_at: fedimint_core::time::now(),
227                        },
228                    )
229                    .await;
230            }
231
232            global_dbtx.commit_tx().await;
233        }
234        .boxed()
235    };
236
237    create_snapshot(snapshot_dir, decoders, false, snapshot_fn).await
238}
239
240pub const STRING_64: &str = "0123456789012345678901234567890101234567890123456789012345678901";
241pub const BYTE_8: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
242pub const BYTE_20: [u8; 20] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
243pub const BYTE_32: [u8; 32] = [
244    0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
245];
246pub const BYTE_33: [u8; 33] = [
247    0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
248    2,
249];
250pub const TEST_MODULE_INSTANCE_ID: u16 = 0;
251
252/// Retrieves a temporary database from the database backup directory.
253/// The first folder that starts with `db_prefix` will return as a temporary
254/// database.
255async fn get_temp_database(
256    db_prefix: &str,
257    decoders: &ModuleDecoderRegistry,
258) -> anyhow::Result<(Database, TempDir)> {
259    let snapshot_dirs = get_project_root().unwrap().join("db/migrations");
260    if snapshot_dirs.exists() {
261        for file in fs::read_dir(&snapshot_dirs)
262            .with_context(|| format!("Reading dir content: {}", snapshot_dirs.display()))?
263            .flatten()
264        {
265            let name = file
266                .file_name()
267                .into_string()
268                .map_err(|_e| format_err!("Invalid path name"))?;
269            if name.starts_with(db_prefix) && !name.ends_with("lock") {
270                let temp_path = format!("{}-{}", name.as_str(), OsRng.next_u64());
271
272                let temp_db = open_temp_db_and_copy(&temp_path, &file.path(), decoders.clone())
273                    .await
274                    .with_context(|| {
275                        format!("Opening temp db for {name}. Copying to {temp_path}")
276                    })?;
277                return Ok(temp_db);
278            }
279        }
280    }
281
282    Err(anyhow::anyhow!(
283        "No database with prefix {db_prefix} in backup directory"
284    ))
285}
286
287/// Validates the database migrations. `decoders` need to be
288/// passed in as an argument since this is module agnostic. First
289/// applies all defined migrations to the database then executes the `validate``
290/// function which should confirm the database migrations were successful.
291pub async fn validate_migrations_global<F, Fut, C>(
292    validate: F,
293    ctx: C,
294    db_prefix: &str,
295    migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
296    decoders: ModuleDecoderRegistry,
297) -> anyhow::Result<()>
298where
299    F: Fn(Database) -> Fut,
300    Fut: futures::Future<Output = anyhow::Result<()>>,
301    C: Clone,
302{
303    let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
304    apply_migrations(&db, ctx, db_prefix.to_string(), migrations, None, None)
305        .await
306        .context("Error applying migrations to temp database")?;
307
308    validate(db)
309        .await
310        .with_context(|| format!("Validating {db_prefix}"))?;
311    Ok(())
312}
313
314/// Validates the database migrations for a server module. First applies all
315/// database migrations to the module, then calls the `validate` which should
316/// confirm the database migrations were successful.
317pub async fn validate_migrations_server<F, Fut>(
318    module: DynServerModuleInit,
319    db_prefix: &str,
320    validate: F,
321) -> anyhow::Result<()>
322where
323    F: Fn(Database) -> Fut,
324    Fut: futures::Future<Output = anyhow::Result<()>>,
325{
326    let decoders = ModuleDecoderRegistry::from_iter([(
327        TEST_MODULE_INSTANCE_ID,
328        module.module_kind(),
329        module.decoder(),
330    )]);
331    let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
332    apply_migrations(
333        &db,
334        Arc::new(ServerDbMigrationContext) as Arc<_>,
335        module.module_kind().to_string(),
336        module.get_database_migrations(),
337        Some(TEST_MODULE_INSTANCE_ID),
338        None,
339    )
340    .await
341    .context("Error applying migrations to temp database")?;
342
343    let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
344    validate(module_db)
345        .await
346        .with_context(|| format!("Validating {db_prefix}"))?;
347
348    Ok(())
349}
350
351/// Validates the database migrations for the core client. First applies all
352/// database migrations to the core client. Then calls the `validate` function,
353/// including the new `active_states` and `inactive_states`, and is expected to
354/// confirm the database migrations were successful.
355pub async fn validate_migrations_core_client<F, Fut>(
356    db_prefix: &str,
357    validate: F,
358) -> anyhow::Result<()>
359where
360    F: Fn(Database) -> Fut,
361    Fut: futures::Future<Output = anyhow::Result<()>>,
362{
363    let (db, _tmp_dir) = get_temp_database(db_prefix, &ModuleDecoderRegistry::default()).await?;
364    let mut dbtx = db.begin_transaction().await;
365    apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), db_prefix.to_string())
366        .await
367        .context("Error applying core client migrations to temp database")?;
368    dbtx.commit_tx_result().await?;
369
370    validate(db)
371        .await
372        .with_context(|| format!("Validating {db_prefix}"))?;
373
374    Ok(())
375}
376
377/// Validates the database migrations for a client module. First applies all
378/// database migrations to the module, including the state machine migrations.
379/// Then calls the `validate` function, including the new `active_states` and
380/// `inactive_states`, and is expected to confirm the database migrations were
381/// successful.
382pub async fn validate_migrations_client<F, Fut, T>(
383    module: DynClientModuleInit,
384    db_prefix: &str,
385    validate: F,
386) -> anyhow::Result<()>
387where
388    F: Fn(Database, Vec<T::States>, Vec<T::States>) -> Fut,
389    Fut: futures::Future<Output = anyhow::Result<()>>,
390    T: ClientModule,
391{
392    let decoders = ModuleDecoderRegistry::from_iter([(
393        TEST_MODULE_INSTANCE_ID,
394        module.as_common().module_kind(),
395        T::decoder(),
396    )]);
397    let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
398    apply_migrations_client_module(
399        &db,
400        module.as_common().module_kind().to_string(),
401        module.get_database_migrations(),
402        TEST_MODULE_INSTANCE_ID,
403    )
404    .await
405    .context("Error applying migrations to temp database")?;
406
407    let mut global_dbtx = db.begin_transaction_nc().await;
408    let active_states = global_dbtx
409        .find_by_prefix(&ActiveStateKeyPrefix)
410        .await
411        .filter_map(|(state, _)| async move {
412            state.0.state.as_any().downcast_ref::<T::States>().cloned()
413        })
414        .collect::<Vec<_>>()
415        .await;
416
417    let inactive_states = global_dbtx
418        .find_by_prefix(&InactiveStateKeyPrefix)
419        .await
420        .filter_map(|(state, _)| async move {
421            state.0.state.as_any().downcast_ref::<T::States>().cloned()
422        })
423        .collect::<Vec<_>>()
424        .await;
425
426    let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
427    validate(module_db, active_states, inactive_states)
428        .await
429        .with_context(|| format!("Validating {db_prefix}"))?;
430
431    Ok(())
432}
433
434/// Open a temporary database located at `temp_path` and copy the contents from
435/// the folder `src_dir` to the temporary database's path.
436async fn open_temp_db_and_copy(
437    temp_path: &str,
438    src_dir: &Path,
439    decoders: ModuleDecoderRegistry,
440) -> anyhow::Result<(Database, TempDir)> {
441    // First copy the contents from src_dir to the path where the database will be
442    // opened
443    let tmp_dir = block_in_place(|| -> anyhow::Result<TempDir> {
444        let tmp_dir = tempfile::Builder::new().prefix(temp_path).tempdir()?;
445        copy_directory_blocking(src_dir, tmp_dir.path())
446            .context("Error copying database to temporary directory")?;
447
448        Ok(tmp_dir)
449    })?;
450
451    Ok((
452        Database::new(RocksDb::build(&tmp_dir).open().await?, decoders),
453        tmp_dir,
454    ))
455}
456
457/// Helper function that recursively copies all contents from
458/// `src` to `dst`.
459pub fn copy_directory_blocking(src: &Path, dst: &Path) -> io::Result<()> {
460    trace!(target: LOG_TEST, src = %src.display(), dst = %dst.display(), "Copy dir");
461
462    // Create the destination directory if it doesn't exist
463    fs::create_dir_all(dst)?;
464
465    for entry in fs::read_dir(src)? {
466        let entry = entry?;
467        let path = entry.path();
468        if path.is_dir() {
469            copy_directory_blocking(&path, &dst.join(entry.file_name()))?;
470        } else {
471            let dst_path = dst.join(entry.file_name());
472            trace!(target: LOG_TEST, src = %path.display(), dst = %dst_path.display(), "Copy file");
473            fs::copy(&path, &dst_path)?;
474        }
475    }
476
477    Ok(())
478}
479
480#[cfg(test)]
481mod fedimint_migration_tests {
482    use anyhow::ensure;
483    use fedimint_client::db::{ClientConfigKey, ClientConfigKeyV0};
484    use fedimint_core::config::{ClientConfigV0, FederationId, GlobalClientConfigV0};
485    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
486    use fedimint_core::module::CoreConsensusVersion;
487    use fedimint_core::module::registry::ModuleDecoderRegistry;
488    use fedimint_logging::TracingSetup;
489
490    use crate::db::{snapshot_db_migrations_with_decoders, validate_migrations_core_client};
491    /// Create a client database with version 0 data. The database produced is
492    /// not intended to be real data or semantically correct. It is only
493    /// intended to provide coverage when reading the database
494    /// in future code versions. This function should not be updated when
495    /// database keys/values change - instead a new function should be added
496    /// that creates a new database backup that can be tested.
497    async fn create_client_db_with_v0_data(db: Database) {
498        let mut dbtx = db.begin_transaction().await;
499
500        let federation_id = FederationId::dummy();
501
502        let client_config_v0 = ClientConfigV0 {
503            global: GlobalClientConfigV0 {
504                api_endpoints: Default::default(),
505                consensus_version: CoreConsensusVersion::new(0, 0),
506                meta: Default::default(),
507            },
508            modules: Default::default(),
509        };
510
511        let client_config_key_v0 = ClientConfigKeyV0 { id: federation_id };
512
513        dbtx.insert_new_entry(&client_config_key_v0, &client_config_v0)
514            .await;
515
516        dbtx.commit_tx().await;
517    }
518
519    #[tokio::test(flavor = "multi_thread")]
520    async fn snapshot_client_db_migrations() -> anyhow::Result<()> {
521        snapshot_db_migrations_with_decoders(
522            "fedimint-client",
523            |db| {
524                Box::pin(async {
525                    create_client_db_with_v0_data(db).await;
526                })
527            },
528            ModuleDecoderRegistry::default(),
529        )
530        .await
531    }
532
533    #[tokio::test(flavor = "multi_thread")]
534    async fn test_client_db_migrations() -> anyhow::Result<()> {
535        let _ = TracingSetup::default().init();
536
537        validate_migrations_core_client("fedimint-client", |db| async move {
538            let mut dbtx = db.begin_transaction_nc().await;
539            // Checks that client config migrated to ClientConfig with broadcast_public_keys
540            ensure!(
541                dbtx.get_value(&ClientConfigKey).await.is_some(),
542                "Client config migration to v0 failed"
543            );
544
545            Ok(())
546        })
547        .await?;
548
549        Ok(())
550    }
551}