diff --git a/src/parallel_io.c b/src/parallel_io.c index d80a1080caf5f015668ac4dd59be73b6e397a5c1..a542a5ff6f7f18d4e616abf355120844652075f2 100644 --- a/src/parallel_io.c +++ b/src/parallel_io.c @@ -55,56 +55,28 @@ #define HDF5_PARALLEL_IO_MAX_BYTES 2000000000LL /** - * @brief Reads a data array from a given HDF5 group. + * @brief Reads a chunk of data from an open HDF5 dataset * - * @param grp The group from which to read. - * @param prop The #io_props of the field to read. - * @param N The number of particles on that rank. - * @param N_total The total number of particles. - * @param offset The offset in the array on disk for this rank. + * @param h_data The HDF5 dataset to write to. + * @param h_plist_id the parallel HDF5 properties. + * @param props The #io_props of the field to read. + * @param N The number of particles to write. + * @param offset Offset in the array where this mpi task starts writing. * @param internal_units The #unit_system used internally. - * @param ic_units The #unit_system used in the ICs. + * @param ic_units The #unit_system used in the snapshots. */ -void readArray(hid_t grp, const struct io_props prop, size_t N, - long long N_total, long long offset, - const struct unit_system* internal_units, - const struct unit_system* ic_units) { - - const size_t typeSize = io_sizeof_type(prop.type); - const size_t copySize = typeSize * prop.dimension; - const size_t num_elements = N * prop.dimension; +void readArray_chunk(hid_t h_data, hid_t h_plist_id, + const struct io_props props, size_t N, long long offset, + const struct unit_system* internal_units, + const struct unit_system* ic_units) { - /* Can't handle reads of more than 2GB */ - if (N * prop.dimension * typeSize > HDF5_PARALLEL_IO_MAX_BYTES) - error("Dataset too large to be read in one pass!"); - - /* Check whether the dataspace exists or not */ - const htri_t exist = H5Lexists(grp, prop.name, 0); - if (exist < 0) { - error("Error while checking the existence of data set '%s'.", prop.name); - } else if (exist == 0) { - if (prop.importance == COMPULSORY) { - error("Compulsory data set '%s' not present in the file.", prop.name); - } else { - for (size_t i = 0; i < N; ++i) - memset(prop.field + i * prop.partSize, 0, copySize); - return; - } - } - - /* message("Reading %s '%s' array...", */ - /* prop.importance == COMPULSORY ? "compulsory" : "optional ", */ - /* prop.name); */ - - /* Open data space in file */ - const hid_t h_data = H5Dopen2(grp, prop.name, H5P_DEFAULT); - if (h_data < 0) error("Error while opening data space '%s'.", prop.name); + const size_t typeSize = io_sizeof_type(props.type); + const size_t copySize = typeSize * props.dimension; + const size_t num_elements = N * props.dimension; - /* Check data type */ - const hid_t h_type = H5Dget_type(h_data); - if (h_type < 0) error("Unable to retrieve data type from the file"); - /* if (!H5Tequal(h_type, hdf5_type(type))) */ - /* error("Non-matching types between the code and the file"); */ + /* Can't handle writes of more than 2GB */ + if (N * props.dimension * typeSize > HDF5_PARALLEL_IO_MAX_BYTES) + error("Dataset too large to be written in one pass!"); /* Allocate temporary buffer */ void* temp = malloc(num_elements * typeSize); @@ -113,10 +85,10 @@ void readArray(hid_t grp, const struct io_props prop, size_t N, /* Prepare information for hyper-slab */ hsize_t shape[2], offsets[2]; int rank; - if (prop.dimension > 1) { + if (props.dimension > 1) { rank = 2; shape[0] = N; - shape[1] = prop.dimension; + shape[1] = props.dimension; offsets[0] = offset; offsets[1] = 0; } else { @@ -134,27 +106,23 @@ void readArray(hid_t grp, const struct io_props prop, size_t N, const hid_t h_filespace = H5Dget_space(h_data); H5Sselect_hyperslab(h_filespace, H5S_SELECT_SET, offsets, NULL, shape, NULL); - /* Set collective reading properties */ - const hid_t h_plist_id = H5Pcreate(H5P_DATASET_XFER); - H5Pset_dxpl_mpio(h_plist_id, H5FD_MPIO_COLLECTIVE); - /* Read HDF5 dataspace in temporary buffer */ /* Dirty version that happens to work for vectors but should be improved */ /* Using HDF5 dataspaces would be better */ - const hid_t h_err = H5Dread(h_data, io_hdf5_type(prop.type), h_memspace, + const hid_t h_err = H5Dread(h_data, io_hdf5_type(props.type), h_memspace, h_filespace, h_plist_id, temp); if (h_err < 0) { - error("Error while reading data array '%s'.", prop.name); + error("Error while reading data array '%s'.", props.name); } /* Unit conversion if necessary */ const double factor = - units_conversion_factor(ic_units, internal_units, prop.units); - if (factor != 1. && exist != 0) { + units_conversion_factor(ic_units, internal_units, props.units); + if (factor != 1.) { /* message("Converting ! factor=%e", factor); */ - if (io_is_double_precision(prop.type)) { + if (io_is_double_precision(props.type)) { double* temp_d = temp; for (size_t i = 0; i < num_elements; ++i) temp_d[i] *= factor; } else { @@ -166,13 +134,98 @@ void readArray(hid_t grp, const struct io_props prop, size_t N, /* Copy temporary buffer to particle data */ char* temp_c = temp; for (size_t i = 0; i < N; ++i) - memcpy(prop.field + i * prop.partSize, &temp_c[i * copySize], copySize); + memcpy(props.field + i * props.partSize, &temp_c[i * copySize], copySize); /* Free and close everything */ free(temp); - H5Pclose(h_plist_id); H5Sclose(h_filespace); H5Sclose(h_memspace); +} + +/** + * @brief Reads a data array from a given HDF5 group. + * + * @param grp The group from which to read. + * @param prop The #io_props of the field to read. + * @param N The number of particles on that rank. + * @param N_total The total number of particles. + * @param mpi_rank The MPI rank of this node. + * @param offset The offset in the array on disk for this rank. + * @param internal_units The #unit_system used internally. + * @param ic_units The #unit_system used in the ICs. + */ +void readArray(hid_t grp, struct io_props props, size_t N, long long N_total, + int mpi_rank, long long offset, + const struct unit_system* internal_units, + const struct unit_system* ic_units) { + + const size_t typeSize = io_sizeof_type(props.type); + const size_t copySize = typeSize * props.dimension; + + /* Check whether the dataspace exists or not */ + const htri_t exist = H5Lexists(grp, props.name, 0); + if (exist < 0) { + error("Error while checking the existence of data set '%s'.", props.name); + } else if (exist == 0) { + if (props.importance == COMPULSORY) { + error("Compulsory data set '%s' not present in the file.", props.name); + } else { + for (size_t i = 0; i < N; ++i) + memset(props.field + i * props.partSize, 0, copySize); + return; + } + } + + /* Open data space in file */ + const hid_t h_data = H5Dopen2(grp, props.name, H5P_DEFAULT); + if (h_data < 0) error("Error while opening data space '%s'.", props.name); + + /* Check data type */ + const hid_t h_type = H5Dget_type(h_data); + if (h_type < 0) error("Unable to retrieve data type from the file"); + /* if (!H5Tequal(h_type, hdf5_type(type))) */ + /* error("Non-matching types between the code and the file"); */ + + /* Create property list for collective dataset read. */ + const hid_t h_plist_id = H5Pcreate(H5P_DATASET_XFER); + H5Pset_dxpl_mpio(h_plist_id, H5FD_MPIO_COLLECTIVE); + + /* Given the limitations of ROM-IO we will need to read the data in chunk of + HDF5_PARALLEL_IO_MAX_BYTES bytes per node until all the nodes are done. */ + char redo = 1; + while (redo) { + + /* Maximal number of elements */ + const size_t max_chunk_size = + HDF5_PARALLEL_IO_MAX_BYTES / (props.dimension * typeSize); + + /* Write the first chunk */ + const size_t this_chunk = (N > max_chunk_size) ? max_chunk_size : N; + readArray_chunk(h_data, h_plist_id, props, this_chunk, offset, + internal_units, ic_units); + + /* Compute how many items are left */ + if (N > max_chunk_size) { + N -= max_chunk_size; + props.field += max_chunk_size * props.partSize; + offset += max_chunk_size; + redo = 1; + } else { + N = 0; + offset += 0; + redo = 0; + } + + /* Do we need to run again ? */ + MPI_Allreduce(MPI_IN_PLACE, &redo, 1, MPI_SIGNED_CHAR, MPI_MAX, + MPI_COMM_WORLD); + + if (redo && mpi_rank == 0) + message("Need to redo one iteration for array '%s'", props.name); + } + + /* Close everything */ + H5Pclose(h_plist_id); H5Tclose(h_type); H5Dclose(h_data); } @@ -688,8 +741,8 @@ void read_ic_parallel(char* fileName, const struct unit_system* internal_units, /* Read everything */ if (!dry_run) for (int i = 0; i < num_fields; ++i) - readArray(h_grp, list[i], Nparticles, N_total[ptype], offset[ptype], - internal_units, ic_units); + readArray(h_grp, list[i], Nparticles, N_total[ptype], mpi_rank, + offset[ptype], internal_units, ic_units); /* Close particle group */ H5Gclose(h_grp);