Commit f8efb3a4 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Merge branch 'large_parallel_hdf5' into 'master'

Large parallel hdf5

Fix to #72. 

This is a workaround to the limitation of parallel-HDF5. The low-level MPI-IO implementations limit writes to 2GB per rank (irrespective of the total amount being written across all nodes). 

The solution involves writing chunks of 2GB (or in practice 2'000'000'000Bytes) and then repeat for the remaining chunks, if any, by shifting the position to write of each node in the file and in memory by 2GB. Ranks that did not pass the threshold just write nothing.
In realistic scenarios we won't need more than a handful of iterations.

See merge request !438
parents ea50f018 fb2c10ac
......@@ -51,24 +51,19 @@
#include "units.h"
#include "xmf.h"
/* The current limit of ROMIO (the underlying MPI-IO layer) is 2GB */
#define HDF5_PARALLEL_IO_MAX_BYTES 2000000000LL
/**
* @brief Reads a data array from a given HDF5 group.
*
* @param grp The group from which to read.
* @param name The name of the array to read.
* @param type The #DATA_TYPE of the attribute.
* @param N The number of particles.
* @param dim The dimension of the data (1 for scalar, 3 for vector)
* @param part_c A (char*) pointer on the first occurrence of the field of
*interest in the parts array
* @param importance If COMPULSORY, the data must be present in the IC file. If
*OPTIONAL, the array will be zeroed when the data is not present.
*
* @todo A better version using HDF5 hyper-slabs to read the file directly into
*the part array
* will be written once the structures have been stabilized.
*
* Calls #error() if an error occurs.
* @param prop The #io_props of the field to read.
* @param N The number of particles on that rank.
* @param N_total The total number of particles.
* @param offset The offset in the array on disk for this rank.
* @param internal_units The #unit_system used internally.
* @param ic_units The #unit_system used in the ICs.
*/
void readArray(hid_t grp, const struct io_props prop, size_t N,
long long N_total, long long offset,
......@@ -183,36 +178,34 @@ void readArray(hid_t grp, const struct io_props prop, size_t N,
*-----------------------------------------------------------------------------*/
/**
* @brief Writes a data array in given HDF5 group.
* @brief Writes a chunk of data in an open HDF5 dataset
*
* @param e The #engine we are writing from.
* @param grp The group in which to write.
* @param fileName The name of the file in which the data is written
* @param xmfFile The FILE used to write the XMF description
* @param h_data The HDF5 dataset to write to.
* @param h_plist_id the parallel HDF5 properties.
* @param props The #io_props of the field to read.
* @param N The number of particles to write.
* @param N_total Total number of particles across all cores
* @param offset Offset in the array where this mpi task starts writing
* @param internal_units The #unit_system used internally
* @param snapshot_units The #unit_system used in the snapshots
*
* @todo A better version using HDF5 hyper-slabs to write the file directly from
* the part array will be written once the structures have been stabilized.
*
* @param offset Offset in the array where this mpi task starts writing.
* @param internal_units The #unit_system used internally.
* @param snapshot_units The #unit_system used in the snapshots.
*/
void writeArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile,
char* partTypeGroupName, const struct io_props props, size_t N,
long long N_total, int mpi_rank, long long offset,
const struct unit_system* internal_units,
const struct unit_system* snapshot_units) {
void writeArray_chunk(struct engine* e, hid_t h_data, hid_t h_plist_id,
const struct io_props props, size_t N, long long offset,
const struct unit_system* internal_units,
const struct unit_system* snapshot_units) {
const size_t typeSize = io_sizeof_type(props.type);
const size_t copySize = typeSize * props.dimension;
const size_t num_elements = N * props.dimension;
/* Can't handle writes of more than 2GB */
if (N * props.dimension * typeSize > HDF5_PARALLEL_IO_MAX_BYTES)
error("Dataset too large to be written in one pass!");
/* message("Writing '%s' array...", props.name); */
/* Allocate temporary buffer */
void* temp = malloc(num_elements * io_sizeof_type(props.type));
void* temp = malloc(num_elements * typeSize);
if (temp == NULL) error("Unable to allocate memory for temporary buffer");
/* Copy particle data to temporary buffer */
......@@ -259,29 +252,19 @@ void writeArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile,
props.name);
}
hid_t h_filespace = H5Screate(H5S_SIMPLE);
if (h_filespace < 0) {
error("Error while creating data space (file) for field '%s'.", props.name);
}
int rank;
hsize_t shape[2];
hsize_t shape_total[2];
hsize_t offsets[2];
if (props.dimension > 1) {
rank = 2;
shape[0] = N;
shape[1] = props.dimension;
shape_total[0] = N_total;
shape_total[1] = props.dimension;
offsets[0] = offset;
offsets[1] = 0;
} else {
rank = 1;
shape[0] = N;
shape[1] = 0;
shape_total[0] = N_total;
shape_total[1] = 0;
offsets[0] = offset;
offsets[1] = 0;
}
......@@ -293,34 +276,147 @@ void writeArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile,
props.name);
}
/* Select the hyper-salb corresponding to this rank */
hid_t h_filespace = H5Dget_space(h_data);
if (N > 0) {
H5Sselect_hyperslab(h_filespace, H5S_SELECT_SET, offsets, NULL, shape,
NULL);
} else {
H5Sselect_none(h_filespace);
}
/* message("Writing %lld '%s', %zd elements = %zd bytes (int=%d) at offset
* %zd", */
/* N, props.name, N * props.dimension, N * props.dimension * typeSize, */
/* (int)(N * props.dimension * typeSize), offset); */
/* Write temporary buffer to HDF5 dataspace */
h_err = H5Dwrite(h_data, io_hdf5_type(props.type), h_memspace, h_filespace,
h_plist_id, temp);
if (h_err < 0) {
error("Error while writing data array '%s'.", props.name);
}
/* Free and close everything */
free(temp);
H5Sclose(h_memspace);
H5Sclose(h_filespace);
}
/**
* @brief Writes a data array in given HDF5 group.
*
* @param e The #engine we are writing from.
* @param grp The group in which to write.
* @param fileName The name of the file in which the data is written.
* @param xmfFile The FILE used to write the XMF description.
* @param partTypeGroupName The name of the group containing the particles in
* the HDF5 file.
* @param props The #io_props of the field to read
* @param N The number of particles to write.
* @param N_total Total number of particles across all cores.
* @param mpi_rank The rank of this node.
* @param offset Offset in the array where this mpi task starts writing.
* @param internal_units The #unit_system used internally.
* @param snapshot_units The #unit_system used in the snapshots.
*/
void writeArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile,
char* partTypeGroupName, struct io_props props, size_t N,
long long N_total, int mpi_rank, long long offset,
const struct unit_system* internal_units,
const struct unit_system* snapshot_units) {
const size_t typeSize = io_sizeof_type(props.type);
/* Work out properties of the array in the file */
int rank;
hsize_t shape_total[2];
hsize_t chunk_shape[2];
if (props.dimension > 1) {
rank = 2;
shape_total[0] = N_total;
shape_total[1] = props.dimension;
chunk_shape[0] = 1 << 16; /* Just a guess...*/
chunk_shape[1] = props.dimension;
} else {
rank = 1;
shape_total[0] = N_total;
shape_total[1] = 0;
chunk_shape[0] = 1 << 16; /* Just a guess...*/
chunk_shape[1] = 0;
}
/* Make sure the chunks are not larger than the dataset */
if (chunk_shape[0] > N_total) chunk_shape[0] = N_total;
/* Create the space in the file */
hid_t h_filespace = H5Screate(H5S_SIMPLE);
if (h_filespace < 0) {
error("Error while creating data space (file) for field '%s'.", props.name);
}
/* Change shape of file data space */
h_err = H5Sset_extent_simple(h_filespace, rank, shape_total, NULL);
hid_t h_err = H5Sset_extent_simple(h_filespace, rank, shape_total, NULL);
if (h_err < 0) {
error("Error while changing data space (file) shape for field '%s'.",
props.name);
}
/* Dataset properties */
const hid_t h_prop = H5Pcreate(H5P_DATASET_CREATE);
/* Set chunk size */
/* h_err = H5Pset_chunk(h_prop, rank, chunk_shape); */
/* if (h_err < 0) { */
/* error("Error while setting chunk size (%llu, %llu) for field '%s'.", */
/* chunk_shape[0], chunk_shape[1], props.name); */
/* } */
/* Create dataset */
const hid_t h_data =
H5Dcreate(grp, props.name, io_hdf5_type(props.type), h_filespace,
H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
const hid_t h_data = H5Dcreate(grp, props.name, io_hdf5_type(props.type),
h_filespace, H5P_DEFAULT, h_prop, H5P_DEFAULT);
if (h_data < 0) {
error("Error while creating dataset '%s'.", props.name);
}
H5Sclose(h_filespace);
h_filespace = H5Dget_space(h_data);
H5Sselect_hyperslab(h_filespace, H5S_SELECT_SET, offsets, NULL, shape, NULL);
/* Create property list for collective dataset write. */
const hid_t h_plist_id = H5Pcreate(H5P_DATASET_XFER);
H5Pset_dxpl_mpio(h_plist_id, H5FD_MPIO_COLLECTIVE);
/* Write temporary buffer to HDF5 dataspace */
h_err = H5Dwrite(h_data, io_hdf5_type(props.type), h_memspace, h_filespace,
h_plist_id, temp);
if (h_err < 0) {
error("Error while writing data array '%s'.", props.name);
/* Given the limitations of ROM-IO we will need to write the data in chunk of
HDF5_PARALLEL_IO_MAX_BYTES bytes per node until all the nodes are done. */
char redo = 1;
while (redo) {
/* Maximal number of elements */
const size_t max_chunk_size =
HDF5_PARALLEL_IO_MAX_BYTES / (props.dimension * typeSize);
/* Write the first chunk */
const size_t this_chunk = (N > max_chunk_size) ? max_chunk_size : N;
writeArray_chunk(e, h_data, h_plist_id, props, this_chunk, offset,
internal_units, snapshot_units);
/* Compute how many items are left */
if (N > max_chunk_size) {
N -= max_chunk_size;
props.field += max_chunk_size * props.partSize;
offset += max_chunk_size;
redo = 1;
} else {
N = 0;
offset += 0;
redo = 0;
}
/* Do we need to run again ? */
MPI_Allreduce(MPI_IN_PLACE, &redo, 1, MPI_SIGNED_CHAR, MPI_MAX,
MPI_COMM_WORLD);
if (redo && e->verbose && mpi_rank == 0)
message("Need to redo one iteration for array '%s'", props.name);
}
/* Write XMF description for this data set */
......@@ -340,12 +436,10 @@ void writeArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile,
units_a_factor(snapshot_units, props.units));
io_write_attribute_s(h_data, "Conversion factor", buffer);
/* Free and close everything */
free(temp);
/* Close everything */
H5Pclose(h_prop);
H5Dclose(h_data);
H5Pclose(h_plist_id);
H5Sclose(h_memspace);
H5Sclose(h_filespace);
}
/**
......@@ -355,25 +449,23 @@ void writeArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile,
* @param internal_units The system units used internally
* @param dim (output) The dimension of the volume read from the file.
* @param parts (output) The array of #part read from the file.
* @param N (output) The number of particles read from the file.
* @param gparts (output) The array of #gpart read from the file.
* @param sparts (output) The array of #spart read from the file.
* @param Ngas (output) The number of particles read from the file.
* @param Ngparts (output) The number of particles read from the file.
* @param Nstars (output) The number of particles read from the file.
* @param periodic (output) 1 if the volume is periodic, 0 if not.
* @param flag_entropy (output) 1 if the ICs contained Entropy in the
* InternalEnergy field
* @param with_hydro Are we running with hydro ?
* @param with_gravity Are we running with gravity ?
* @param with_stars Are we running with stars ?
* @param mpi_rank The MPI rank of this node
* @param mpi_size The number of MPI ranks
* @param comm The MPI communicator
* @param info The MPI information object
* @param dry_run If 1, don't read the particle. Only allocates the arrays.
*
* Opens the HDF5 file fileName and reads the particles contained
* in the parts array. N is the returned number of particles found
* in the file.
*
* @warning Can not read snapshot distributed over more than 1 file !!!
* @todo Read snapshots distributed in more than one file.
*
* Calls #error() if an error occurs.
*
*/
void read_ic_parallel(char* fileName, const struct unit_system* internal_units,
double dim[3], struct part** parts, struct gpart** gparts,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment