fedimint_testing_core/
db.rs

1use std::collections::BTreeMap;
2use std::io::ErrorKind;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::{env, fs, io};
6
7use anyhow::{Context, bail, format_err};
8use fedimint_client::db::{apply_migrations_client_module, apply_migrations_core_client_dbtx};
9use fedimint_client::module_init::DynClientModuleInit;
10use fedimint_client::sm::executor::{
11    ActiveStateKeyBytes, ActiveStateKeyPrefix, InactiveStateKeyBytes, InactiveStateKeyPrefix,
12};
13use fedimint_client_module::module::ClientModule;
14use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
15use fedimint_core::core::OperationId;
16use fedimint_core::db::{
17    Database, DatabaseVersion, DbMigrationFn, IDatabaseTransactionOpsCoreTyped, apply_migrations,
18};
19use fedimint_core::module::CommonModuleInit;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_core::task::block_in_place;
22use fedimint_logging::LOG_TEST;
23use fedimint_rocksdb::RocksDb;
24use fedimint_server::consensus::db::ServerDbMigrationContext;
25use fedimint_server::core::DynServerModuleInit;
26use futures::future::BoxFuture;
27use futures::{FutureExt, StreamExt};
28use rand::RngCore;
29use rand::rngs::OsRng;
30use tempfile::TempDir;
31use tracing::{debug, trace};
32
33use crate::envs::FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV;
34
35/// Get the project root (relative to closest Cargo.lock file)
36/// ```rust
37/// match fedimint_testing_core::db::get_project_root() {
38///     Ok(p) => println!("Current project root is {:?}", p),
39///     Err(e) => println!("Error obtaining project root {:?}", e),
40/// };
41/// ```
42pub fn get_project_root() -> io::Result<PathBuf> {
43    let path = env::current_dir()?;
44    let path_ancestors = path.as_path().ancestors();
45
46    for path in path_ancestors {
47        if path.join("Cargo.lock").try_exists()? {
48            return Ok(PathBuf::from(path));
49        }
50    }
51    Err(io::Error::new(
52        ErrorKind::NotFound,
53        "Ran out of places to find Cargo.toml",
54    ))
55}
56
57/// Opens the backup database in the `snapshot_dir`. If the `is_isolated` flag
58/// is set, the database will be opened as an isolated database with
59/// `TEST_MODULE_INSTANCE_ID` as the prefix.
60async fn open_snapshot_db(
61    decoders: ModuleDecoderRegistry,
62    snapshot_dir: &Path,
63    is_isolated: bool,
64) -> anyhow::Result<Database> {
65    if is_isolated {
66        Ok(Database::new(
67            RocksDb::open(snapshot_dir)
68                .await
69                .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
70            decoders,
71        )
72        .with_prefix_module_id(TEST_MODULE_INSTANCE_ID)
73        .0)
74    } else {
75        Ok(Database::new(
76            RocksDb::open(snapshot_dir)
77                .await
78                .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
79            decoders,
80        ))
81    }
82}
83
84/// Creates a backup database in the `snapshot_dir` according to the
85/// `FM_PREPARE_DB_MIGRATION_SNAPSHOTS`, since we do not want to re-create a
86/// backup database every time we run the tests.
87async fn create_snapshot<'a, F>(
88    snapshot_dir: PathBuf,
89    decoders: ModuleDecoderRegistry,
90    is_isolated: bool,
91    prepare_fn: F,
92) -> anyhow::Result<()>
93where
94    F: FnOnce(Database) -> BoxFuture<'a, ()>,
95{
96    match (
97        std::env::var_os(FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV)
98            .map(|s| s.to_string_lossy().into_owned())
99            .as_deref(),
100        snapshot_dir.exists(),
101    ) {
102        (Some("force"), true) => {
103            tokio::fs::remove_dir_all(&snapshot_dir).await?;
104            let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated).await?;
105            prepare_fn(db).await;
106        }
107        (Some(_), true) => {
108            bail!(
109                "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} set, but {} already exists already exists. Set to 'force' to overwrite.",
110                snapshot_dir.display()
111            );
112        }
113        (Some(_), false) => {
114            debug!(dir = %snapshot_dir.display(), "Snapshot dir does not exist. Creating.");
115            let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated).await?;
116            prepare_fn(db).await;
117        }
118        (None, true) => {
119            debug!(dir = %snapshot_dir.display(), "Snapshot dir already exist. Nothing to do.");
120        }
121        (None, false) => {
122            bail!(
123                "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} not set, but {} doest not exist.",
124                snapshot_dir.display()
125            );
126        }
127    }
128    Ok(())
129}
130
131/// Creates the database backup for `snapshot_name`
132/// to `db/migrations`. Then this function will execute the provided
133/// `prepare_fn` which is expected to populate the database with the appropriate
134/// data for testing a migration. If the snapshot directory already exists,
135/// this function will do nothing.
136pub async fn snapshot_db_migrations_with_decoders<'a, F>(
137    snapshot_name: &str,
138    prepare_fn: F,
139    decoders: ModuleDecoderRegistry,
140) -> anyhow::Result<()>
141where
142    F: Fn(Database) -> BoxFuture<'a, ()>,
143{
144    let project_root = get_project_root().unwrap();
145    let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
146    create_snapshot(snapshot_dir, decoders, false, prepare_fn).await
147}
148
149/// Creates the database backup directory for a server module by appending the
150/// `snapshot_name` to `db/migrations`. Then this function will execute the
151/// provided `prepare_fn` which is expected to populate the database with the
152/// appropriate data for testing a migration.
153pub async fn snapshot_db_migrations<'a, F, I>(
154    snapshot_name: &str,
155    prepare_fn: F,
156) -> anyhow::Result<()>
157where
158    F: Fn(Database) -> BoxFuture<'a, ()>,
159    I: CommonModuleInit,
160{
161    let project_root = get_project_root().unwrap();
162    let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
163
164    let decoders =
165        ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
166    create_snapshot(snapshot_dir, decoders, true, prepare_fn).await
167}
168
169/// Create the database backup directory for a client module.
170/// Two prepare functions are taken as parameters. `data_prepare` is expected to
171/// create any data that the client module uses and is stored in the isolated
172/// namespace. `state_machine_prepare` creates client state machine data that
173/// can be used for testing state machine migrations. This is created in the
174/// global namespace.
175pub async fn snapshot_db_migrations_client<'a, F, S, I>(
176    snapshot_name: &str,
177    data_prepare: F,
178    state_machine_prepare: S,
179) -> anyhow::Result<()>
180where
181    F: Fn(Database) -> BoxFuture<'a, ()> + Send + Sync,
182    S: Fn() -> (Vec<Vec<u8>>, Vec<Vec<u8>>) + Send + Sync,
183    I: CommonModuleInit,
184{
185    let project_root = get_project_root().unwrap();
186    let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
187
188    let decoders =
189        ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
190
191    let snapshot_fn = |db: Database| {
192        async move {
193            let isolated_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
194            data_prepare(isolated_db).await;
195
196            let (active_states, inactive_states) = state_machine_prepare();
197            let mut global_dbtx = db.begin_transaction().await;
198
199            for state in active_states {
200                global_dbtx
201                    .insert_new_entry(
202                        &ActiveStateKeyBytes {
203                            operation_id: OperationId::new_random(),
204                            module_instance_id: TEST_MODULE_INSTANCE_ID,
205                            state,
206                        },
207                        &ActiveStateMeta {
208                            created_at: fedimint_core::time::now(),
209                        },
210                    )
211                    .await;
212            }
213
214            for state in inactive_states {
215                global_dbtx
216                    .insert_new_entry(
217                        &InactiveStateKeyBytes {
218                            operation_id: OperationId::new_random(),
219                            module_instance_id: TEST_MODULE_INSTANCE_ID,
220                            state,
221                        },
222                        &InactiveStateMeta {
223                            created_at: fedimint_core::time::now(),
224                            exited_at: fedimint_core::time::now(),
225                        },
226                    )
227                    .await;
228            }
229
230            global_dbtx.commit_tx().await;
231        }
232        .boxed()
233    };
234
235    create_snapshot(snapshot_dir, decoders, false, snapshot_fn).await
236}
237
238pub const STRING_64: &str = "0123456789012345678901234567890101234567890123456789012345678901";
239pub const BYTE_8: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
240pub const BYTE_20: [u8; 20] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
241pub const BYTE_32: [u8; 32] = [
242    0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
243];
244pub const BYTE_33: [u8; 33] = [
245    0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
246    2,
247];
248pub const TEST_MODULE_INSTANCE_ID: u16 = 0;
249
250/// Retrieves a temporary database from the database backup directory.
251/// The first folder that starts with `db_prefix` will return as a temporary
252/// database.
253async fn get_temp_database(
254    db_prefix: &str,
255    decoders: &ModuleDecoderRegistry,
256) -> anyhow::Result<(Database, TempDir)> {
257    let snapshot_dirs = get_project_root().unwrap().join("db/migrations");
258    if snapshot_dirs.exists() {
259        for file in fs::read_dir(&snapshot_dirs)
260            .with_context(|| format!("Reading dir content: {}", snapshot_dirs.display()))?
261            .flatten()
262        {
263            let name = file
264                .file_name()
265                .into_string()
266                .map_err(|_e| format_err!("Invalid path name"))?;
267            if name.starts_with(db_prefix) && !name.ends_with("lock") {
268                let temp_path = format!("{}-{}", name.as_str(), OsRng.next_u64());
269
270                let temp_db = open_temp_db_and_copy(&temp_path, &file.path(), decoders.clone())
271                    .await
272                    .with_context(|| {
273                        format!("Opening temp db for {name}. Copying to {temp_path}")
274                    })?;
275                return Ok(temp_db);
276            }
277        }
278    }
279
280    Err(anyhow::anyhow!(
281        "No database with prefix {db_prefix} in backup directory"
282    ))
283}
284
285/// Validates the database migrations. `decoders` need to be
286/// passed in as an argument since this is module agnostic. First
287/// applies all defined migrations to the database then executes the `validate``
288/// function which should confirm the database migrations were successful.
289pub async fn validate_migrations_global<F, Fut, C>(
290    validate: F,
291    ctx: C,
292    db_prefix: &str,
293    migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
294    decoders: ModuleDecoderRegistry,
295) -> anyhow::Result<()>
296where
297    F: Fn(Database) -> Fut,
298    Fut: futures::Future<Output = anyhow::Result<()>>,
299    C: Clone,
300{
301    let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
302    apply_migrations(&db, ctx, db_prefix.to_string(), migrations, None, None)
303        .await
304        .context("Error applying migrations to temp database")?;
305
306    validate(db)
307        .await
308        .with_context(|| format!("Validating {db_prefix}"))?;
309    Ok(())
310}
311
312/// Validates the database migrations for a server module. First applies all
313/// database migrations to the module, then calls the `validate` which should
314/// confirm the database migrations were successful.
315pub async fn validate_migrations_server<F, Fut>(
316    module: DynServerModuleInit,
317    db_prefix: &str,
318    validate: F,
319) -> anyhow::Result<()>
320where
321    F: Fn(Database) -> Fut,
322    Fut: futures::Future<Output = anyhow::Result<()>>,
323{
324    let decoders = ModuleDecoderRegistry::from_iter([(
325        TEST_MODULE_INSTANCE_ID,
326        module.module_kind(),
327        module.decoder(),
328    )]);
329    let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
330    apply_migrations(
331        &db,
332        Arc::new(ServerDbMigrationContext) as Arc<_>,
333        module.module_kind().to_string(),
334        module.get_database_migrations(),
335        Some(TEST_MODULE_INSTANCE_ID),
336        None,
337    )
338    .await
339    .context("Error applying migrations to temp database")?;
340
341    let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
342    validate(module_db)
343        .await
344        .with_context(|| format!("Validating {db_prefix}"))?;
345
346    Ok(())
347}
348
349/// Validates the database migrations for the core client. First applies all
350/// database migrations to the core client. Then calls the `validate` function,
351/// including the new `active_states` and `inactive_states`, and is expected to
352/// confirm the database migrations were successful.
353pub async fn validate_migrations_core_client<F, Fut>(
354    db_prefix: &str,
355    validate: F,
356) -> anyhow::Result<()>
357where
358    F: Fn(Database) -> Fut,
359    Fut: futures::Future<Output = anyhow::Result<()>>,
360{
361    let (db, _tmp_dir) = get_temp_database(db_prefix, &ModuleDecoderRegistry::default()).await?;
362    let mut dbtx = db.begin_transaction().await;
363    apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), db_prefix.to_string())
364        .await
365        .context("Error applying core client migrations to temp database")?;
366    dbtx.commit_tx_result().await?;
367
368    validate(db)
369        .await
370        .with_context(|| format!("Validating {db_prefix}"))?;
371
372    Ok(())
373}
374
375/// Validates the database migrations for a client module. First applies all
376/// database migrations to the module, including the state machine migrations.
377/// Then calls the `validate` function, including the new `active_states` and
378/// `inactive_states`, and is expected to confirm the database migrations were
379/// successful.
380pub async fn validate_migrations_client<F, Fut, T>(
381    module: DynClientModuleInit,
382    db_prefix: &str,
383    validate: F,
384) -> anyhow::Result<()>
385where
386    F: Fn(Database, Vec<T::States>, Vec<T::States>) -> Fut,
387    Fut: futures::Future<Output = anyhow::Result<()>>,
388    T: ClientModule,
389{
390    let decoders = ModuleDecoderRegistry::from_iter([(
391        TEST_MODULE_INSTANCE_ID,
392        module.as_common().module_kind(),
393        T::decoder(),
394    )]);
395    let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
396    apply_migrations_client_module(
397        &db,
398        module.as_common().module_kind().to_string(),
399        module.get_database_migrations(),
400        TEST_MODULE_INSTANCE_ID,
401    )
402    .await
403    .context("Error applying migrations to temp database")?;
404
405    let mut global_dbtx = db.begin_transaction_nc().await;
406    let active_states = global_dbtx
407        .find_by_prefix(&ActiveStateKeyPrefix)
408        .await
409        .filter_map(|(state, _)| async move {
410            state.0.state.as_any().downcast_ref::<T::States>().cloned()
411        })
412        .collect::<Vec<_>>()
413        .await;
414
415    let inactive_states = global_dbtx
416        .find_by_prefix(&InactiveStateKeyPrefix)
417        .await
418        .filter_map(|(state, _)| async move {
419            state.0.state.as_any().downcast_ref::<T::States>().cloned()
420        })
421        .collect::<Vec<_>>()
422        .await;
423
424    let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
425    validate(module_db, active_states, inactive_states)
426        .await
427        .with_context(|| format!("Validating {db_prefix}"))?;
428
429    Ok(())
430}
431
432/// Open a temporary database located at `temp_path` and copy the contents from
433/// the folder `src_dir` to the temporary database's path.
434async fn open_temp_db_and_copy(
435    temp_path: &str,
436    src_dir: &Path,
437    decoders: ModuleDecoderRegistry,
438) -> anyhow::Result<(Database, TempDir)> {
439    // First copy the contents from src_dir to the path where the database will be
440    // opened
441    let tmp_dir = block_in_place(|| -> anyhow::Result<TempDir> {
442        let tmp_dir = tempfile::Builder::new().prefix(temp_path).tempdir()?;
443        copy_directory_blocking(src_dir, tmp_dir.path())
444            .context("Error copying database to temporary directory")?;
445
446        Ok(tmp_dir)
447    })?;
448
449    Ok((
450        Database::new(RocksDb::open(&tmp_dir).await?, decoders),
451        tmp_dir,
452    ))
453}
454
455/// Helper function that recursively copies all contents from
456/// `src` to `dst`.
457pub fn copy_directory_blocking(src: &Path, dst: &Path) -> io::Result<()> {
458    trace!(target: LOG_TEST, src = %src.display(), dst = %dst.display(), "Copy dir");
459
460    // Create the destination directory if it doesn't exist
461    fs::create_dir_all(dst)?;
462
463    for entry in fs::read_dir(src)? {
464        let entry = entry?;
465        let path = entry.path();
466        if path.is_dir() {
467            copy_directory_blocking(&path, &dst.join(entry.file_name()))?;
468        } else {
469            let dst_path = dst.join(entry.file_name());
470            trace!(target: LOG_TEST, src = %path.display(), dst = %dst_path.display(), "Copy file");
471            fs::copy(&path, &dst_path)?;
472        }
473    }
474
475    Ok(())
476}
477
478#[cfg(test)]
479mod fedimint_migration_tests {
480    use anyhow::ensure;
481    use fedimint_client::db::{ClientConfigKey, ClientConfigKeyV0};
482    use fedimint_core::config::{ClientConfigV0, FederationId, GlobalClientConfigV0};
483    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
484    use fedimint_core::module::CoreConsensusVersion;
485    use fedimint_core::module::registry::ModuleDecoderRegistry;
486    use fedimint_logging::TracingSetup;
487
488    use crate::db::{snapshot_db_migrations_with_decoders, validate_migrations_core_client};
489    /// Create a client database with version 0 data. The database produced is
490    /// not intended to be real data or semantically correct. It is only
491    /// intended to provide coverage when reading the database
492    /// in future code versions. This function should not be updated when
493    /// database keys/values change - instead a new function should be added
494    /// that creates a new database backup that can be tested.
495    async fn create_client_db_with_v0_data(db: Database) {
496        let mut dbtx = db.begin_transaction().await;
497
498        let federation_id = FederationId::dummy();
499
500        let client_config_v0 = ClientConfigV0 {
501            global: GlobalClientConfigV0 {
502                api_endpoints: Default::default(),
503                consensus_version: CoreConsensusVersion::new(0, 0),
504                meta: Default::default(),
505            },
506            modules: Default::default(),
507        };
508
509        let client_config_key_v0 = ClientConfigKeyV0 { id: federation_id };
510
511        dbtx.insert_new_entry(&client_config_key_v0, &client_config_v0)
512            .await;
513
514        dbtx.commit_tx().await;
515    }
516
517    #[tokio::test(flavor = "multi_thread")]
518    async fn snapshot_client_db_migrations() -> anyhow::Result<()> {
519        snapshot_db_migrations_with_decoders(
520            "fedimint-client",
521            |db| {
522                Box::pin(async {
523                    create_client_db_with_v0_data(db).await;
524                })
525            },
526            ModuleDecoderRegistry::default(),
527        )
528        .await
529    }
530
531    #[tokio::test(flavor = "multi_thread")]
532    async fn test_client_db_migrations() -> anyhow::Result<()> {
533        let _ = TracingSetup::default().init();
534
535        validate_migrations_core_client("fedimint-client", |db| async move {
536            let mut dbtx = db.begin_transaction_nc().await;
537            // Checks that client config migrated to ClientConfig with broadcast_public_keys
538            ensure!(
539                dbtx.get_value(&ClientConfigKey).await.is_some(),
540                "Client config migration to v0 failed"
541            );
542
543            Ok(())
544        })
545        .await?;
546
547        Ok(())
548    }
549}