From a2f6e4dc417bac1feb8d319aec8525ad384c788f Mon Sep 17 00:00:00 2001
From: Matthieu Schaller <matthieu.schaller@durham.ac.uk>
Date: Mon, 2 Oct 2017 23:48:03 +0100
Subject: [PATCH] When using parallel-hdf5, write data in chunks of 2GB to
 overcome the limits of the ROMIO layer.

---
 src/parallel_io.c | 121 +++++++++++++++++++++++++++++++++++-----------
 1 file changed, 93 insertions(+), 28 deletions(-)

diff --git a/src/parallel_io.c b/src/parallel_io.c
index 65f8fc9c20..2f5c0b0c1f 100644
--- a/src/parallel_io.c
+++ b/src/parallel_io.c
@@ -51,6 +51,9 @@
 #include "units.h"
 #include "xmf.h"
 
+/* The current limit of ROMIO (the underlying MPI-IO layer) is 2GB */
+#define HDF5_PARALLEL_IO_MAX_BYTES 2000000000ll
+
 /**
  * @brief Reads a data array from a given HDF5 group.
  *
@@ -199,20 +202,24 @@ void readArray(hid_t grp, const struct io_props prop, size_t N,
  * the part array will be written once the structures have been stabilized.
  *
  */
-void writeArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile,
-                char* partTypeGroupName, const struct io_props props, size_t N,
-                long long N_total, int mpi_rank, long long offset,
-                const struct unit_system* internal_units,
-                const struct unit_system* snapshot_units) {
-
+void writeArray_chunk(struct engine* e, hid_t h_data, hid_t h_plist_id, 
+		      const struct io_props props, size_t N,
+		      long long offset,
+		      const struct unit_system* internal_units,
+		      const struct unit_system* snapshot_units) {
+  
   const size_t typeSize = io_sizeof_type(props.type);
   const size_t copySize = typeSize * props.dimension;
   const size_t num_elements = N * props.dimension;
 
+  /* Can't handle writes of more than 2GB */
+  if(N * props.dimension * typeSize > HDF5_PARALLEL_IO_MAX_BYTES)
+    error("Dataset too large to be written in one pass!");
+    
   /* message("Writing '%s' array...", props.name); */
 
   /* Allocate temporary buffer */
-  void* temp = malloc(num_elements * io_sizeof_type(props.type));
+  void* temp = malloc(num_elements * typeSize);
   if (temp == NULL) error("Unable to allocate memory for temporary buffer");
 
   /* Copy particle data to temporary buffer */
@@ -259,29 +266,20 @@ void writeArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile,
           props.name);
   }
 
-  hid_t h_filespace = H5Screate(H5S_SIMPLE);
-  if (h_filespace < 0) {
-    error("Error while creating data space (file) for field '%s'.", props.name);
-  }
 
   int rank;
   hsize_t shape[2];
-  hsize_t shape_total[2];
   hsize_t offsets[2];
   if (props.dimension > 1) {
     rank = 2;
     shape[0] = N;
     shape[1] = props.dimension;
-    shape_total[0] = N_total;
-    shape_total[1] = props.dimension;
     offsets[0] = offset;
     offsets[1] = 0;
   } else {
     rank = 1;
     shape[0] = N;
     shape[1] = 0;
-    shape_total[0] = N_total;
-    shape_total[1] = 0;
     offsets[0] = offset;
     offsets[1] = 0;
   }
@@ -293,8 +291,56 @@ void writeArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile,
           props.name);
   }
 
+  /* Select the hyper-salb corresponding to this rank */
+  hid_t h_filespace = H5Dget_space(h_data);
+  H5Sselect_hyperslab(h_filespace, H5S_SELECT_SET, offsets, NULL, shape, NULL);
+
+  /* message("Writing %zd elements = %zd bytes (int=%d) at offset %zd", */
+  /* 	  N * props.dimension, N * props.dimension * typeSize, */
+  /* 	  (int)(N * props.dimension * typeSize), offset); */
+
+  /* Write temporary buffer to HDF5 dataspace */
+  h_err = H5Dwrite(h_data, io_hdf5_type(props.type), h_memspace, h_filespace,
+                   h_plist_id, temp);
+  if (h_err < 0) {
+    error("Error while writing data array '%s'.", props.name);
+  }
+
+
+  /* Free and close everything */
+  free(temp);
+  H5Sclose(h_memspace);
+  H5Sclose(h_filespace);
+}
+
+void writeArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile,
+                char* partTypeGroupName, const struct io_props props, size_t N,
+                long long N_total, int mpi_rank, long long offset,
+                const struct unit_system* internal_units,
+                const struct unit_system* snapshot_units) {
+
+  const size_t typeSize = io_sizeof_type(props.type);
+
+  /* Work out properties of the array in the file */
+  int rank;
+  hsize_t shape_total[2];
+  if (props.dimension > 1) {
+    rank = 2;
+    shape_total[0] = N_total;
+    shape_total[1] = props.dimension;
+  } else {
+    rank = 1;
+    shape_total[0] = N_total;
+    shape_total[1] = 0;
+  }
+
+  hid_t h_filespace = H5Screate(H5S_SIMPLE);
+  if (h_filespace < 0) {
+    error("Error while creating data space (file) for field '%s'.", props.name);
+  }
+  
   /* Change shape of file data space */
-  h_err = H5Sset_extent_simple(h_filespace, rank, shape_total, NULL);
+  hid_t h_err = H5Sset_extent_simple(h_filespace, rank, shape_total, NULL);
   if (h_err < 0) {
     error("Error while changing data space (file) shape for field '%s'.",
           props.name);
@@ -309,19 +355,41 @@ void writeArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile,
   }
 
   H5Sclose(h_filespace);
-  h_filespace = H5Dget_space(h_data);
-  H5Sselect_hyperslab(h_filespace, H5S_SELECT_SET, offsets, NULL, shape, NULL);
 
   /* Create property list for collective dataset write.    */
   const hid_t h_plist_id = H5Pcreate(H5P_DATASET_XFER);
   H5Pset_dxpl_mpio(h_plist_id, H5FD_MPIO_COLLECTIVE);
 
-  /* Write temporary buffer to HDF5 dataspace */
-  h_err = H5Dwrite(h_data, io_hdf5_type(props.type), h_memspace, h_filespace,
-                   h_plist_id, temp);
-  if (h_err < 0) {
-    error("Error while writing data array '%s'.", props.name);
+  /* Given the limitations of ROM-IO we will need to write the data in chunk of
+     HDF5_PARALLEL_IO_MAX_BYTES bytes per node until all the nodes are done. */
+  char redo = 1;
+  const size_t max_chunk_size = HDF5_PARALLEL_IO_MAX_BYTES / ( props.dimension * typeSize);
+  while(redo) {
+    
+    /* Write the first chunk */
+    const size_t this_chunk = (N > max_chunk_size ) ? max_chunk_size : N;
+    writeArray_chunk(e, h_data, h_plist_id, props, this_chunk, offset,
+		     internal_units, snapshot_units);
+
+    /* Compute how many items are left */
+    if(N > max_chunk_size) {
+      N -= max_chunk_size;
+      offset += max_chunk_size;
+      redo = 1;
+    }
+    else{
+      N = 0;
+      offset += 0;
+      redo = 0;
+    }
+
+    if(redo && e->verbose && mpi_rank == 0)
+      message("Need to redo one iteration for array '%s'", props.name);
+    
+    /* Do we need to run again ? */
+    MPI_Allreduce(MPI_IN_PLACE, &redo, 1, MPI_SIGNED_CHAR, MPI_MAX, MPI_COMM_WORLD); 
   }
+  
 
   /* Write XMF description for this data set */
   if (mpi_rank == 0)
@@ -340,12 +408,9 @@ void writeArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile,
                        units_a_factor(snapshot_units, props.units));
   io_write_attribute_s(h_data, "Conversion factor", buffer);
 
-  /* Free and close everything */
-  free(temp);
   H5Dclose(h_data);
   H5Pclose(h_plist_id);
-  H5Sclose(h_memspace);
-  H5Sclose(h_filespace);
+
 }
 
 /**
-- 
GitLab