fedimint_testing_core/
db.rs

1use std::collections::BTreeMap;
2use std::io::ErrorKind;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::{env, fs, io};
6
7use anyhow::{Context, bail, format_err};
8use fedimint_client::db::{apply_migrations_client_module, apply_migrations_core_client_dbtx};
9use fedimint_client::module_init::DynClientModuleInit;
10use fedimint_client::sm::executor::{
11    ActiveStateKeyBytes, ActiveStateKeyPrefix, InactiveStateKeyBytes, InactiveStateKeyPrefix,
12};
13use fedimint_client_module::module::ClientModule;
14use fedimint_client_module::sm::{ActiveStateMeta, InactiveStateMeta};
15use fedimint_core::core::OperationId;
16use fedimint_core::db::{
17    Database, DatabaseVersion, DbMigrationFn, IDatabaseTransactionOpsCoreTyped, apply_migrations,
18};
19use fedimint_core::module::CommonModuleInit;
20use fedimint_core::module::registry::ModuleDecoderRegistry;
21use fedimint_core::task::block_in_place;
22use fedimint_logging::LOG_TEST;
23use fedimint_rocksdb::RocksDb;
24use fedimint_server::consensus::db::ServerDbMigrationContext;
25use fedimint_server::core::DynServerModuleInit;
26use futures::future::BoxFuture;
27use futures::{FutureExt, StreamExt};
28use rand::RngCore;
29use rand::rngs::OsRng;
30use tempfile::TempDir;
31use tracing::{debug, trace};
32
33use crate::envs::FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV;
34
35/// Get the project root (relative to closest Cargo.lock file)
36/// ```rust
37/// match fedimint_testing_core::db::get_project_root() {
38///     Ok(p) => println!("Current project root is {:?}", p),
39///     Err(e) => println!("Error obtaining project root {:?}", e),
40/// };
41/// ```
42pub fn get_project_root() -> io::Result<PathBuf> {
43    let path = env::current_dir()?;
44    let path_ancestors = path.as_path().ancestors();
45
46    for path in path_ancestors {
47        if path.join("Cargo.lock").try_exists()? {
48            return Ok(PathBuf::from(path));
49        }
50    }
51    Err(io::Error::new(
52        ErrorKind::NotFound,
53        "Ran out of places to find Cargo.toml",
54    ))
55}
56
57/// Opens the backup database in the `snapshot_dir`. If the `is_isolated` flag
58/// is set, the database will be opened as an isolated database with
59/// `TEST_MODULE_INSTANCE_ID` as the prefix.
60async fn open_snapshot_db(
61    decoders: ModuleDecoderRegistry,
62    snapshot_dir: &Path,
63    is_isolated: bool,
64) -> anyhow::Result<Database> {
65    if is_isolated {
66        Ok(Database::new(
67            RocksDb::open(snapshot_dir)
68                .await
69                .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
70            decoders,
71        )
72        .with_prefix_module_id(TEST_MODULE_INSTANCE_ID)
73        .0)
74    } else {
75        Ok(Database::new(
76            RocksDb::open(snapshot_dir)
77                .await
78                .with_context(|| format!("Preparing snapshot in {}", snapshot_dir.display()))?,
79            decoders,
80        ))
81    }
82}
83
84/// Creates a backup database in the `snapshot_dir` according to the
85/// `FM_PREPARE_DB_MIGRATION_SNAPSHOTS`, since we do not want to re-create a
86/// backup database every time we run the tests.
87async fn create_snapshot<'a, F>(
88    snapshot_dir: PathBuf,
89    decoders: ModuleDecoderRegistry,
90    is_isolated: bool,
91    prepare_fn: F,
92) -> anyhow::Result<()>
93where
94    F: FnOnce(Database) -> BoxFuture<'a, ()>,
95{
96    match (
97        std::env::var_os(FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV)
98            .map(|s| s.to_string_lossy().into_owned())
99            .as_deref(),
100        snapshot_dir.exists(),
101    ) {
102        (Some("force"), true) => {
103            tokio::fs::remove_dir_all(&snapshot_dir).await?;
104            let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated).await?;
105            prepare_fn(db).await;
106        }
107        (Some(_), true) => {
108            bail!(
109                "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} set, but {} already exists already exists. Set to 'force' to overwrite.",
110                snapshot_dir.display()
111            );
112        }
113        (Some(_), false) => {
114            debug!(dir = %snapshot_dir.display(), "Snapshot dir does not exist. Creating.");
115            let db = open_snapshot_db(decoders, &snapshot_dir, is_isolated).await?;
116            prepare_fn(db).await;
117        }
118        (None, true) => {
119            debug!(dir = %snapshot_dir.display(), "Snapshot dir already exist. Nothing to do.");
120        }
121        (None, false) => {
122            bail!(
123                "{FM_PREPARE_DB_MIGRATION_SNAPSHOTS_ENV} not set, but {} doest not exist.",
124                snapshot_dir.display()
125            );
126        }
127    }
128    Ok(())
129}
130
131/// Creates the database backup for `snapshot_name`
132/// to `db/migrations`. Then this function will execute the provided
133/// `prepare_fn` which is expected to populate the database with the appropriate
134/// data for testing a migration. If the snapshot directory already exists,
135/// this function will do nothing.
136pub async fn snapshot_db_migrations_with_decoders<'a, F>(
137    snapshot_name: &str,
138    prepare_fn: F,
139    decoders: ModuleDecoderRegistry,
140) -> anyhow::Result<()>
141where
142    F: Fn(Database) -> BoxFuture<'a, ()>,
143{
144    let project_root = get_project_root().unwrap();
145    let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
146    create_snapshot(snapshot_dir, decoders, false, prepare_fn).await
147}
148
149/// Creates the database backup directory for a server module by appending the
150/// `snapshot_name` to `db/migrations`. Then this function will execute the
151/// provided `prepare_fn` which is expected to populate the database with the
152/// appropriate data for testing a migration.
153pub async fn snapshot_db_migrations<'a, F, I>(
154    snapshot_name: &str,
155    prepare_fn: F,
156) -> anyhow::Result<()>
157where
158    F: Fn(Database) -> BoxFuture<'a, ()>,
159    I: CommonModuleInit,
160{
161    let project_root = get_project_root().unwrap();
162    let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
163
164    let decoders =
165        ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
166    create_snapshot(snapshot_dir, decoders, true, prepare_fn).await
167}
168
169/// Create the database backup directory for a client module.
170/// Two prepare functions are taken as parameters. `data_prepare` is expected to
171/// create any data that the client module uses and is stored in the isolated
172/// namespace. `state_machine_prepare` creates client state machine data that
173/// can be used for testing state machine migrations. This is created in the
174/// global namespace.
175pub async fn snapshot_db_migrations_client<'a, F, S, I>(
176    snapshot_name: &str,
177    data_prepare: F,
178    state_machine_prepare: S,
179) -> anyhow::Result<()>
180where
181    F: Fn(Database) -> BoxFuture<'a, ()> + Send + Sync,
182    S: Fn() -> (Vec<Vec<u8>>, Vec<Vec<u8>>) + Send + Sync,
183    I: CommonModuleInit,
184{
185    let project_root = get_project_root().unwrap();
186    let snapshot_dir = project_root.join("db/migrations").join(snapshot_name);
187
188    let decoders =
189        ModuleDecoderRegistry::from_iter([(TEST_MODULE_INSTANCE_ID, I::KIND, I::decoder())]);
190
191    let snapshot_fn = |db: Database| {
192        async move {
193            let isolated_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
194            data_prepare(isolated_db).await;
195
196            let (active_states, inactive_states) = state_machine_prepare();
197            let mut global_dbtx = db.begin_transaction().await;
198
199            for state in active_states {
200                global_dbtx
201                    .insert_new_entry(
202                        &ActiveStateKeyBytes {
203                            operation_id: OperationId::new_random(),
204                            module_instance_id: TEST_MODULE_INSTANCE_ID,
205                            state,
206                        },
207                        &ActiveStateMeta {
208                            created_at: fedimint_core::time::now(),
209                        },
210                    )
211                    .await;
212            }
213
214            for state in inactive_states {
215                global_dbtx
216                    .insert_new_entry(
217                        &InactiveStateKeyBytes {
218                            operation_id: OperationId::new_random(),
219                            module_instance_id: TEST_MODULE_INSTANCE_ID,
220                            state,
221                        },
222                        &InactiveStateMeta {
223                            created_at: fedimint_core::time::now(),
224                            exited_at: fedimint_core::time::now(),
225                        },
226                    )
227                    .await;
228            }
229
230            global_dbtx.commit_tx().await;
231        }
232        .boxed()
233    };
234
235    create_snapshot(snapshot_dir, decoders, false, snapshot_fn).await
236}
237
238pub const STRING_64: &str = "0123456789012345678901234567890101234567890123456789012345678901";
239pub const BYTE_8: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
240pub const BYTE_20: [u8; 20] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
241pub const BYTE_32: [u8; 32] = [
242    0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
243];
244pub const BYTE_33: [u8; 33] = [
245    0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
246    2,
247];
248pub const TEST_MODULE_INSTANCE_ID: u16 = 0;
249
250/// Retrieves a temporary database from the database backup directory.
251/// The first folder that starts with `db_prefix` will return as a temporary
252/// database.
253async fn get_temp_database(
254    db_prefix: &str,
255    decoders: &ModuleDecoderRegistry,
256) -> anyhow::Result<(Database, TempDir)> {
257    let snapshot_dirs = get_project_root().unwrap().join("db/migrations");
258    if snapshot_dirs.exists() {
259        for file in fs::read_dir(&snapshot_dirs)
260            .with_context(|| format!("Reading dir content: {}", snapshot_dirs.display()))?
261            .flatten()
262        {
263            let name = file
264                .file_name()
265                .into_string()
266                .map_err(|_e| format_err!("Invalid path name"))?;
267            if name.starts_with(db_prefix) {
268                let temp_path = format!("{}-{}", name.as_str(), OsRng.next_u64());
269                let temp_db = open_temp_db_and_copy(&temp_path, &file.path(), decoders.clone())
270                    .await
271                    .with_context(|| {
272                        format!("Opening temp db for {name}. Copying to {temp_path}")
273                    })?;
274                return Ok(temp_db);
275            }
276        }
277    }
278
279    Err(anyhow::anyhow!(
280        "No database with prefix {db_prefix} in backup directory"
281    ))
282}
283
284/// Validates the database migrations. `decoders` need to be
285/// passed in as an argument since this is module agnostic. First
286/// applies all defined migrations to the database then executes the `validate``
287/// function which should confirm the database migrations were successful.
288pub async fn validate_migrations_global<F, Fut, C>(
289    validate: F,
290    ctx: C,
291    db_prefix: &str,
292    migrations: BTreeMap<DatabaseVersion, DbMigrationFn<C>>,
293    decoders: ModuleDecoderRegistry,
294) -> anyhow::Result<()>
295where
296    F: Fn(Database) -> Fut,
297    Fut: futures::Future<Output = anyhow::Result<()>>,
298    C: Clone,
299{
300    let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
301    apply_migrations(&db, ctx, db_prefix.to_string(), migrations, None, None)
302        .await
303        .context("Error applying migrations to temp database")?;
304
305    validate(db)
306        .await
307        .with_context(|| format!("Validating {db_prefix}"))?;
308    Ok(())
309}
310
311/// Validates the database migrations for a server module. First applies all
312/// database migrations to the module, then calls the `validate` which should
313/// confirm the database migrations were successful.
314pub async fn validate_migrations_server<F, Fut>(
315    module: DynServerModuleInit,
316    db_prefix: &str,
317    validate: F,
318) -> anyhow::Result<()>
319where
320    F: Fn(Database) -> Fut,
321    Fut: futures::Future<Output = anyhow::Result<()>>,
322{
323    let decoders = ModuleDecoderRegistry::from_iter([(
324        TEST_MODULE_INSTANCE_ID,
325        module.module_kind(),
326        module.decoder(),
327    )]);
328    let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
329    apply_migrations(
330        &db,
331        Arc::new(ServerDbMigrationContext) as Arc<_>,
332        module.module_kind().to_string(),
333        module.get_database_migrations(),
334        Some(TEST_MODULE_INSTANCE_ID),
335        None,
336    )
337    .await
338    .context("Error applying migrations to temp database")?;
339
340    let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
341    validate(module_db)
342        .await
343        .with_context(|| format!("Validating {db_prefix}"))?;
344
345    Ok(())
346}
347
348/// Validates the database migrations for the core client. First applies all
349/// database migrations to the core client. Then calls the `validate` function,
350/// including the new `active_states` and `inactive_states`, and is expected to
351/// confirm the database migrations were successful.
352pub async fn validate_migrations_core_client<F, Fut>(
353    db_prefix: &str,
354    validate: F,
355) -> anyhow::Result<()>
356where
357    F: Fn(Database) -> Fut,
358    Fut: futures::Future<Output = anyhow::Result<()>>,
359{
360    let (db, _tmp_dir) = get_temp_database(db_prefix, &ModuleDecoderRegistry::default()).await?;
361    let mut dbtx = db.begin_transaction().await;
362    apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), db_prefix.to_string())
363        .await
364        .context("Error applying core client migrations to temp database")?;
365    dbtx.commit_tx_result().await?;
366
367    validate(db)
368        .await
369        .with_context(|| format!("Validating {db_prefix}"))?;
370
371    Ok(())
372}
373
374/// Validates the database migrations for a client module. First applies all
375/// database migrations to the module, including the state machine migrations.
376/// Then calls the `validate` function, including the new `active_states` and
377/// `inactive_states`, and is expected to confirm the database migrations were
378/// successful.
379pub async fn validate_migrations_client<F, Fut, T>(
380    module: DynClientModuleInit,
381    db_prefix: &str,
382    validate: F,
383) -> anyhow::Result<()>
384where
385    F: Fn(Database, Vec<T::States>, Vec<T::States>) -> Fut,
386    Fut: futures::Future<Output = anyhow::Result<()>>,
387    T: ClientModule,
388{
389    let decoders = ModuleDecoderRegistry::from_iter([(
390        TEST_MODULE_INSTANCE_ID,
391        module.as_common().module_kind(),
392        T::decoder(),
393    )]);
394    let (db, _tmp_dir) = get_temp_database(db_prefix, &decoders).await?;
395    apply_migrations_client_module(
396        &db,
397        module.as_common().module_kind().to_string(),
398        module.get_database_migrations(),
399        TEST_MODULE_INSTANCE_ID,
400    )
401    .await
402    .context("Error applying migrations to temp database")?;
403
404    let mut global_dbtx = db.begin_transaction_nc().await;
405    let active_states = global_dbtx
406        .find_by_prefix(&ActiveStateKeyPrefix)
407        .await
408        .filter_map(|(state, _)| async move {
409            state.0.state.as_any().downcast_ref::<T::States>().cloned()
410        })
411        .collect::<Vec<_>>()
412        .await;
413
414    let inactive_states = global_dbtx
415        .find_by_prefix(&InactiveStateKeyPrefix)
416        .await
417        .filter_map(|(state, _)| async move {
418            state.0.state.as_any().downcast_ref::<T::States>().cloned()
419        })
420        .collect::<Vec<_>>()
421        .await;
422
423    let module_db = db.with_prefix_module_id(TEST_MODULE_INSTANCE_ID).0;
424    validate(module_db, active_states, inactive_states)
425        .await
426        .with_context(|| format!("Validating {db_prefix}"))?;
427
428    Ok(())
429}
430
431/// Open a temporary database located at `temp_path` and copy the contents from
432/// the folder `src_dir` to the temporary database's path.
433async fn open_temp_db_and_copy(
434    temp_path: &str,
435    src_dir: &Path,
436    decoders: ModuleDecoderRegistry,
437) -> anyhow::Result<(Database, TempDir)> {
438    // First copy the contents from src_dir to the path where the database will be
439    // opened
440    let tmp_dir = block_in_place(|| -> anyhow::Result<TempDir> {
441        let tmp_dir = tempfile::Builder::new().prefix(temp_path).tempdir()?;
442        copy_directory_blocking(src_dir, tmp_dir.path())
443            .context("Error copying database to temporary directory")?;
444
445        Ok(tmp_dir)
446    })?;
447
448    Ok((
449        Database::new(RocksDb::open(&tmp_dir).await?, decoders),
450        tmp_dir,
451    ))
452}
453
454/// Helper function that recursively copies all contents from
455/// `src` to `dst`.
456pub fn copy_directory_blocking(src: &Path, dst: &Path) -> io::Result<()> {
457    trace!(target: LOG_TEST, src = %src.display(), dst = %dst.display(), "Copy dir");
458
459    // Create the destination directory if it doesn't exist
460    fs::create_dir_all(dst)?;
461
462    for entry in fs::read_dir(src)? {
463        let entry = entry?;
464        let path = entry.path();
465        if path.is_dir() {
466            copy_directory_blocking(&path, &dst.join(entry.file_name()))?;
467        } else {
468            let dst_path = dst.join(entry.file_name());
469            trace!(target: LOG_TEST, src = %path.display(), dst = %dst_path.display(), "Copy file");
470            fs::copy(&path, &dst_path)?;
471        }
472    }
473
474    Ok(())
475}
476
477#[cfg(test)]
478mod fedimint_migration_tests {
479    use anyhow::ensure;
480    use fedimint_client::db::{ClientConfigKey, ClientConfigKeyV0};
481    use fedimint_core::config::{ClientConfigV0, FederationId, GlobalClientConfigV0};
482    use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
483    use fedimint_core::module::CoreConsensusVersion;
484    use fedimint_core::module::registry::ModuleDecoderRegistry;
485    use fedimint_logging::TracingSetup;
486
487    use crate::db::{snapshot_db_migrations_with_decoders, validate_migrations_core_client};
488    /// Create a client database with version 0 data. The database produced is
489    /// not intended to be real data or semantically correct. It is only
490    /// intended to provide coverage when reading the database
491    /// in future code versions. This function should not be updated when
492    /// database keys/values change - instead a new function should be added
493    /// that creates a new database backup that can be tested.
494    async fn create_client_db_with_v0_data(db: Database) {
495        let mut dbtx = db.begin_transaction().await;
496
497        let federation_id = FederationId::dummy();
498
499        let client_config_v0 = ClientConfigV0 {
500            global: GlobalClientConfigV0 {
501                api_endpoints: Default::default(),
502                consensus_version: CoreConsensusVersion::new(0, 0),
503                meta: Default::default(),
504            },
505            modules: Default::default(),
506        };
507
508        let client_config_key_v0 = ClientConfigKeyV0 { id: federation_id };
509
510        dbtx.insert_new_entry(&client_config_key_v0, &client_config_v0)
511            .await;
512
513        dbtx.commit_tx().await;
514    }
515
516    #[tokio::test(flavor = "multi_thread")]
517    async fn snapshot_client_db_migrations() -> anyhow::Result<()> {
518        snapshot_db_migrations_with_decoders(
519            "fedimint-client",
520            |db| {
521                Box::pin(async {
522                    create_client_db_with_v0_data(db).await;
523                })
524            },
525            ModuleDecoderRegistry::default(),
526        )
527        .await
528    }
529
530    #[tokio::test(flavor = "multi_thread")]
531    async fn test_client_db_migrations() -> anyhow::Result<()> {
532        let _ = TracingSetup::default().init();
533
534        validate_migrations_core_client("fedimint-client", |db| async move {
535            let mut dbtx = db.begin_transaction_nc().await;
536            // Checks that client config migrated to ClientConfig with broadcast_public_keys
537            ensure!(
538                dbtx.get_value(&ClientConfigKey).await.is_some(),
539                "Client config migration to v0 failed"
540            );
541
542            Ok(())
543        })
544        .await?;
545
546        Ok(())
547    }
548}