fedimint_testing_core/
db.rs

1use std::collections::BTreeMap;
2use std::io::ErrorKind;
3use std::path::{Path, PathBuf};
4use std::{env, fs, io};
5
6use anyhow::{Context, bail, format_err};
7use fedimint_client::db::{apply_migrations_client_module, apply_migrations_core_client_dbtx};
8use fedimint_client::module_init::DynClientModuleInit;
9use fedimint_client::sm::executor::{
10    ActiveStateKeyBytes, ActiveStateKeyPrefix, InactiveStateKeyBytes, InactiveStateKeyPrefix,
11};
12use fedimint_client_module::module::ClientModule;
13use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
14use fedimint_core::core::OperationId;
15use fedimint_core::db::{
16    CoreMigrationFn, Database, DatabaseVersion, IDatabaseTransactionOpsCoreTyped, apply_migrations,
17    apply_migrations_server,
18};
19use fedimint_core::module::CommonModuleInit;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_logging::LOG_TEST;
22use fedimint_rocksdb::RocksDb;
23use fedimint_server::core::DynServerModuleInit;
24use futures::future::BoxFuture;
25use futures::{FutureExt, StreamExt};
26use rand::RngCore;
27use rand::rngs::OsRng;
28use tempfile::TempDir;
29use tracing::{debug, trace};
30
31use crate::envs::FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV;
32
33/// Get the project root (relative to closest Cargo.lock file)
34/// ```rust
35/// match fedimint_testing_core::db::get_project_root() {
36///     Ok(p) => println!("Current project root is {:?}", p),
37///     Err(e) => println!("Error obtaining project root {:?}", e),
38/// };
39/// ```
40pub fn get_project_root() -> io::Result<PathBuf> {
41    let path = env::current_dir()?;
42    let path_ancestors = path.as_path().ancestors();
43
44    for path in path_ancestors {
45        if path.join("Cargo.lock").try_exists()? {
46            return Ok(PathBuf::from(path));
47        }
48    }
49    Err(io::Error::new(
50        ErrorKind::NotFound,
51        "Ran out of places to find Cargo.toml",
52    ))
53}
54
55/// Opens the backup database in the `snapshot_dir`. If the `is_isolated` flag
56/// is set, the database will be opened as an isolated database with
57/// `TEST_MODULE_INSTANCE_ID` as the prefix.
58fn open_snapshot_db(
59    decoders: ModuleDecoderRegistry,
60    snapshot_dir: &Path,
61    is_isolated: bool,
62) -> anyhow::Result<Database> {
63    if is_isolated {
64        Ok(Database::new(
65            RocksDb::open(snapshot_dir)
66                .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
67            decoders,
68        )
69        .with_prefix_module_id(TEST_MODULE_INSTANCE_ID)
70        .0)
71    } else {
72        Ok(Database::new(
73            RocksDb::open(snapshot_dir)
74                .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
75            decoders,
76        ))
77    }
78}
79
80/// Creates a backup database in the `snapshot_dir` according to the
81/// `FM_PREPARE_DB_MIGRATION_SNAPSHOTS`, since we do not want to re-create a
82/// backup database every time we run the tests.
83async fn create_snapshot<'a, F>(
84    snapshot_dir: PathBuf,
85    decoders: ModuleDecoderRegistry,
86    is_isolated: bool,
87    prepare_fn: F,
88) -> anyhow::Result<()>
89where
90    F: FnOnce(Database) -> BoxFuture<'a, ()>,
91{
92    match (
93        std::env::var_os(FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV)
94            .map(|s| s.to_string_lossy().into_owned())
95            .as_deref(),
96        snapshot_dir.exists(),
97    ) {
98        (Some("force"), true) => {
99            tokio::fs::remove_dir_all(&snapshot_dir).await?;
100            let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated)?;
101            prepare_fn(db).await;
102        }
103        (Some(_), true) => {
104            bail!(
105                "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} set, but {} already exists already exists. Set to 'force' to overwrite.",
106                snapshot_dir.display()
107            );
108        }
109        (Some(_), false) => {
110            debug!(dir = %snapshot_dir.display(), "Snapshot dir does not exist. Creating.");
111            let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated)?;
112            prepare_fn(db).await;
113        }
114        (None, true) => {
115            debug!(dir = %snapshot_dir.display(), "Snapshot dir already exist. Nothing to do.");
116        }
117        (None, false) => {
118            bail!(
119                "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} not set, but {} doest not exist.",
120                snapshot_dir.display()
121            );
122        }
123    }
124    Ok(())
125}
126
127/// Creates the database backup for `snapshot_name`
128/// to `db/migrations`. Then this function will execute the provided
129/// `prepare_fn` which is expected to populate the database with the appropriate
130/// data for testing a migration. If the snapshot directory already exists,
131/// this function will do nothing.
132pub async fn snapshot_db_migrations_with_decoders<'a, F>(
133    snapshot_name: &str,
134    prepare_fn: F,
135    decoders: ModuleDecoderRegistry,
136) -> anyhow::Result<()>
137where
138    F: Fn(Database) -> BoxFuture<'a, ()>,
139{
140    let project_root = get_project_root().unwrap();
141    let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
142    create_snapshot(snapshot_dir, decoders, false, prepare_fn).await
143}
144
145/// Creates the database backup directory for a server module by appending the
146/// `snapshot_name` to `db/migrations`. Then this function will execute the
147/// provided `prepare_fn` which is expected to populate the database with the
148/// appropriate data for testing a migration.
149pub async fn snapshot_db_migrations<'a, F, I>(
150    snapshot_name: &str,
151    prepare_fn: F,
152) -> anyhow::Result<()>
153where
154    F: Fn(Database) -> BoxFuture<'a, ()>,
155    I: CommonModuleInit,
156{
157    let project_root = get_project_root().unwrap();
158    let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
159
160    let decoders =
161        ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
162    create_snapshot(snapshot_dir, decoders, true, prepare_fn).await
163}
164
165/// Create the database backup directory for a client module.
166/// Two prepare functions are taken as parameters. `data_prepare` is expected to
167/// create any data that the client module uses and is stored in the isolated
168/// namespace. `state_machine_prepare` creates client state machine data that
169/// can be used for testing state machine migrations. This is created in the
170/// global namespace.
171pub async fn snapshot_db_migrations_client<'a, F, S, I>(
172    snapshot_name: &str,
173    data_prepare: F,
174    state_machine_prepare: S,
175) -> anyhow::Result<()>
176where
177    F: Fn(Database) -> BoxFuture<'a, ()> + Send + Sync,
178    S: Fn() -> (Vec<Vec<u8>>, Vec<Vec<u8>>) + Send + Sync,
179    I: CommonModuleInit,
180{
181    let project_root = get_project_root().unwrap();
182    let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
183
184    let decoders =
185        ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
186
187    let snapshot_fn = |db: Database| {
188        async move {
189            let isolated_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
190            data_prepare(isolated_db).await;
191
192            let (active_states, inactive_states) = state_machine_prepare();
193            let mut global_dbtx = db.begin_transaction().await;
194
195            for state in active_states {
196                global_dbtx
197                    .insert_new_entry(
198                        &ActiveStateKeyBytes {
199                            operation_id: OperationId::new_random(),
200                            module_instance_id: TEST_MODULE_INSTANCE_ID,
201                            state,
202                        },
203                        &ActiveStateMeta {
204                            created_at: fedimint_core::time::now(),
205                        },
206                    )
207                    .await;
208            }
209
210            for state in inactive_states {
211                global_dbtx
212                    .insert_new_entry(
213                        &InactiveStateKeyBytes {
214                            operation_id: OperationId::new_random(),
215                            module_instance_id: TEST_MODULE_INSTANCE_ID,
216                            state,
217                        },
218                        &InactiveStateMeta {
219                            created_at: fedimint_core::time::now(),
220                            exited_at: fedimint_core::time::now(),
221                        },
222                    )
223                    .await;
224            }
225
226            global_dbtx.commit_tx().await;
227        }
228        .boxed()
229    };
230
231    create_snapshot(snapshot_dir, decoders, false, snapshot_fn).await
232}
233
234pub const STRING_64: &str = "0123456789012345678901234567890101234567890123456789012345678901";
235pub const BYTE_8: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
236pub const BYTE_20: [u8; 20] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
237pub const BYTE_32: [u8; 32] = [
238    0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
239];
240pub const BYTE_33: [u8; 33] = [
241    0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
242    2,
243];
244pub const TEST_MODULE_INSTANCE_ID: u16 = 0;
245
246/// Retrieves a temporary database from the database backup directory.
247/// The first folder that starts with `db_prefix` will return as a temporary
248/// database.
249fn get_temp_database(
250    db_prefix: &str,
251    decoders: &ModuleDecoderRegistry,
252) -> anyhow::Result<(Database, TempDir)> {
253    let snapshot_dirs = get_project_root().unwrap().join("db/migrations");
254    if snapshot_dirs.exists() {
255        for file in fs::read_dir(&snapshot_dirs)
256            .with_context(|| format!("Reading dir content: {}", snapshot_dirs.display()))?
257            .flatten()
258        {
259            let name = file
260                .file_name()
261                .into_string()
262                .map_err(|_e| format_err!("Invalid path name"))?;
263            if name.starts_with(db_prefix) {
264                let temp_path = format!("{}-{}", name.as_str(), OsRng.next_u64());
265                let temp_db = open_temp_db_and_copy(&temp_path, &file.path(), decoders.clone())
266                    .with_context(|| {
267                        format!("Opening temp db for {name}. Copying to {temp_path}")
268                    })?;
269                return Ok(temp_db);
270            }
271        }
272    }
273
274    Err(anyhow::anyhow!(
275        "No database with prefix {db_prefix} in backup directory"
276    ))
277}
278
279/// Validates the database migrations. `decoders` need to be
280/// passed in as an argument since this is module agnostic. First
281/// applies all defined migrations to the database then executes the `validate``
282/// function which should confirm the database migrations were successful.
283pub async fn validate_migrations_global<F, Fut>(
284    validate: F,
285    db_prefix: &str,
286    migrations: BTreeMap<DatabaseVersion, CoreMigrationFn>,
287    decoders: ModuleDecoderRegistry,
288) -> anyhow::Result<()>
289where
290    F: Fn(Database) -> Fut,
291    Fut: futures::Future<Output = anyhow::Result<()>>,
292{
293    let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders)?;
294    apply_migrations_server(&db, db_prefix.to_string(), migrations)
295        .await
296        .context("Error applying migrations to temp database")?;
297
298    validate(db)
299        .await
300        .with_context(|| format!("Validating {db_prefix}"))?;
301    Ok(())
302}
303
304/// Validates the database migrations for a server module. First applies all
305/// database migrations to the module, then calls the `validate` which should
306/// confirm the database migrations were successful.
307pub async fn validate_migrations_server<F, Fut>(
308    module: DynServerModuleInit,
309    db_prefix: &str,
310    validate: F,
311) -> anyhow::Result<()>
312where
313    F: Fn(Database) -> Fut,
314    Fut: futures::Future<Output = anyhow::Result<()>>,
315{
316    let decoders = ModuleDecoderRegistry::from_iter([(
317        TEST_MODULE_INSTANCE_ID,
318        module.module_kind(),
319        module.decoder(),
320    )]);
321    let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders)?;
322    apply_migrations(
323        &db,
324        module.module_kind().to_string(),
325        module.get_database_migrations(),
326        Some(TEST_MODULE_INSTANCE_ID),
327        None,
328    )
329    .await
330    .context("Error applying migrations to temp database")?;
331
332    let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
333    validate(module_db)
334        .await
335        .with_context(|| format!("Validating {db_prefix}"))?;
336
337    Ok(())
338}
339
340/// Validates the database migrations for the core client. First applies all
341/// database migrations to the core client. Then calls the `validate` function,
342/// including the new `active_states` and `inactive_states`, and is expected to
343/// confirm the database migrations were successful.
344pub async fn validate_migrations_core_client<F, Fut>(
345    db_prefix: &str,
346    validate: F,
347) -> anyhow::Result<()>
348where
349    F: Fn(Database) -> Fut,
350    Fut: futures::Future<Output = anyhow::Result<()>>,
351{
352    let (db, _tmp_dir) = get_temp_database(db_prefix, &ModuleDecoderRegistry::default())?;
353    let mut dbtx = db.begin_transaction().await;
354    apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), db_prefix.to_string())
355        .await
356        .context("Error applying core client migrations to temp database")?;
357    dbtx.commit_tx_result().await?;
358
359    validate(db)
360        .await
361        .with_context(|| format!("Validating {db_prefix}"))?;
362
363    Ok(())
364}
365
366/// Validates the database migrations for a client module. First applies all
367/// database migrations to the module, including the state machine migrations.
368/// Then calls the `validate` function, including the new `active_states` and
369/// `inactive_states`, and is expected to confirm the database migrations were
370/// successful.
371pub async fn validate_migrations_client<F, Fut, T>(
372    module: DynClientModuleInit,
373    db_prefix: &str,
374    validate: F,
375) -> anyhow::Result<()>
376where
377    F: Fn(Database, Vec<T::States>, Vec<T::States>) -> Fut,
378    Fut: futures::Future<Output = anyhow::Result<()>>,
379    T: ClientModule,
380{
381    let decoders = ModuleDecoderRegistry::from_iter([(
382        TEST_MODULE_INSTANCE_ID,
383        module.as_common().module_kind(),
384        T::decoder(),
385    )]);
386    let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders)?;
387    apply_migrations_client_module(
388        &db,
389        module.as_common().module_kind().to_string(),
390        module.get_database_migrations(),
391        TEST_MODULE_INSTANCE_ID,
392    )
393    .await
394    .context("Error applying migrations to temp database")?;
395
396    let mut global_dbtx = db.begin_transaction_nc().await;
397    let active_states = global_dbtx
398        .find_by_prefix(&ActiveStateKeyPrefix)
399        .await
400        .filter_map(|(state, _)| async move {
401            state.0.state.as_any().downcast_ref::<T::States>().cloned()
402        })
403        .collect::<Vec<_>>()
404        .await;
405
406    let inactive_states = global_dbtx
407        .find_by_prefix(&InactiveStateKeyPrefix)
408        .await
409        .filter_map(|(state, _)| async move {
410            state.0.state.as_any().downcast_ref::<T::States>().cloned()
411        })
412        .collect::<Vec<_>>()
413        .await;
414
415    let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
416    validate(module_db, active_states, inactive_states)
417        .await
418        .with_context(|| format!("Validating {db_prefix}"))?;
419
420    Ok(())
421}
422
423/// Open a temporary database located at `temp_path` and copy the contents from
424/// the folder `src_dir` to the temporary database's path.
425fn open_temp_db_and_copy(
426    temp_path: &str,
427    src_dir: &Path,
428    decoders: ModuleDecoderRegistry,
429) -> anyhow::Result<(Database, TempDir)> {
430    // First copy the contents from src_dir to the path where the database will be
431    // opened
432    let tmp_dir = tempfile::Builder::new().prefix(temp_path).tempdir()?;
433    copy_directory(src_dir, tmp_dir.path())
434        .context("Error copying database to temporary directory")?;
435
436    Ok((Database::new(RocksDb::open(&tmp_dir)?, decoders), tmp_dir))
437}
438
439/// Helper function that recursively copies all contents from
440/// `src` to `dst`.
441pub fn copy_directory(src: &Path, dst: &Path) -> io::Result<()> {
442    trace!(target: LOG_TEST, src = %src.display(), dst = %dst.display(), "Copy dir");
443
444    // Create the destination directory if it doesn't exist
445    fs::create_dir_all(dst)?;
446
447    for entry in fs::read_dir(src)? {
448        let entry = entry?;
449        let path = entry.path();
450        if path.is_dir() {
451            copy_directory(&path, &dst.join(entry.file_name()))?;
452        } else {
453            let dst_path = dst.join(entry.file_name());
454            trace!(target: LOG_TEST, src = %path.display(), dst = %dst_path.display(), "Copy file");
455            fs::copy(&path, &dst_path)?;
456        }
457    }
458
459    Ok(())
460}
461
462#[cfg(test)]
463mod fedimint_migration_tests {
464    use anyhow::ensure;
465    use fedimint_client::db::{ClientConfigKey, ClientConfigKeyV0};
466    use fedimint_core::config::{ClientConfigV0, FederationId, GlobalClientConfigV0};
467    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
468    use fedimint_core::module::CoreConsensusVersion;
469    use fedimint_core::module::registry::ModuleDecoderRegistry;
470    use fedimint_logging::TracingSetup;
471
472    use crate::db::{snapshot_db_migrations_with_decoders, validate_migrations_core_client};
473    /// Create a client database with version 0 data. The database produced is
474    /// not intended to be real data or semantically correct. It is only
475    /// intended to provide coverage when reading the database
476    /// in future code versions. This function should not be updated when
477    /// database keys/values change - instead a new function should be added
478    /// that creates a new database backup that can be tested.
479    async fn create_client_db_with_v0_data(db: Database) {
480        let mut dbtx = db.begin_transaction().await;
481
482        let federation_id = FederationId::dummy();
483
484        let client_config_v0 = ClientConfigV0 {
485            global: GlobalClientConfigV0 {
486                api_endpoints: Default::default(),
487                consensus_version: CoreConsensusVersion::new(0, 0),
488                meta: Default::default(),
489            },
490            modules: Default::default(),
491        };
492
493        let client_config_key_v0 = ClientConfigKeyV0 { id: federation_id };
494
495        dbtx.insert_new_entry(&client_config_key_v0, &client_config_v0)
496            .await;
497
498        dbtx.commit_tx().await;
499    }
500
501    #[tokio::test(flavor = "multi_thread")]
502    async fn snapshot_client_db_migrations() -> anyhow::Result<()> {
503        snapshot_db_migrations_with_decoders(
504            "fedimint-client",
505            |db| {
506                Box::pin(async {
507                    create_client_db_with_v0_data(db).await;
508                })
509            },
510            ModuleDecoderRegistry::default(),
511        )
512        .await
513    }
514
515    #[tokio::test(flavor = "multi_thread")]
516    async fn test_client_db_migrations() -> anyhow::Result<()> {
517        let _ = TracingSetup::default().init();
518
519        validate_migrations_core_client("fedimint-client", |db| async move {
520            let mut dbtx = db.begin_transaction_nc().await;
521            // Checks that client config migrated to ClientConfig with broadcast_public_keys
522            ensure!(
523                dbtx.get_value(&ClientConfigKey).await.is_some(),
524                "Client config migration to v0 failed"
525            );
526
527            Ok(())
528        })
529        .await?;
530
531        Ok(())
532    }
533}