diff --git a/logger/hydro/SPHENIX/logger_hydro.h b/logger/hydro/SPHENIX/logger_hydro.h index bb60efd67e58ab4c54594543d00eb6e14a55496e..9035cf0cf4d259f86ffd992ad41617204025161c 100644 --- a/logger/hydro/SPHENIX/logger_hydro.h +++ b/logger/hydro/SPHENIX/logger_hydro.h @@ -142,12 +142,12 @@ hydro_logger_interpolate_field(const double t_before, /* Div v */ x[n_linear] = interpolate_cubic_hermite_spline( t_before, bef[n_linear], bef[n_linear + 1], t_after, aft[n_linear], - aft[n_linear + 1], t, /* periodic= */ 0, params); + aft[n_linear + 1], t); /* d Div v / dt */ x[n_linear + 1] = wa * aft[n_linear + 1] + wb * bef[n_linear + 1]; /* Use the linear interpolation */ - x[1] = wa * div_aft[1] + wb * div_bef[1]; + x[1] = wa * aft[n_linear + 2] + wb * bef[n_linear + 2]; break; } diff --git a/logger/logger_parameters.c b/logger/logger_parameters.c index cb78d7cea44a750a2dc2cfd43faee51b6bd22905..59ad7c77b26f03e2cf228d91adc7858902de91cd 100644 --- a/logger/logger_parameters.c +++ b/logger/logger_parameters.c @@ -31,11 +31,14 @@ * @brief Initialize the parameters from the yaml file. * * @param reader The #logger_reader. - * @param basename The basename of the logger files. - * @param verbose The verbose level. + * @param params The #logger_parameters to initialize. + * @param number_threads The number of threads for the threadpool + * (-1 in order to use the default value). + * */ void logger_parameters_init(struct logger_parameters *params, - const struct logger_reader *reader) { + const struct logger_reader *reader, + int number_threads) { /* Generate filename */ char filename[STRING_SIZE]; @@ -67,4 +70,10 @@ void logger_parameters_init(struct logger_parameters *params, /* Read if we are running with cosmology */ params->with_cosmology = parser_get_param_int(&swift_params, "Policy:cosmological integration"); + + /* Save the number of threads */ + params->number_threads = number_threads; + if (reader->verbose > 0) { + message("Running with %i threads", params->number_threads); + } } diff --git a/logger/logger_parameters.h b/logger/logger_parameters.h index 8bf8bd2f55a7f85e570b48fb68f9e8ef1095e702..58d049c76b5377873362e14c1f7716bb28a2b89c 100644 --- a/logger/logger_parameters.h +++ b/logger/logger_parameters.h @@ -43,9 +43,13 @@ struct logger_parameters { /* Are we doing a cosmological simulation? */ int with_cosmology; + + /* Number of threads to use in the threadpools */ + int number_threads; }; void logger_parameters_init(struct logger_parameters *params, - const struct logger_reader *reader); + const struct logger_reader *reader, + int number_threads); #endif // LOGGER_LOGGER_PARAMETERS_H diff --git a/logger/logger_python_wrapper.c b/logger/logger_python_wrapper.c index b696022712ef5428f8719c52c8f9b8887d80cb26..1b068966b1e92e062d07a8278da4d6e8ef3fa22e 100644 --- a/logger/logger_python_wrapper.c +++ b/logger/logger_python_wrapper.c @@ -67,6 +67,9 @@ typedef struct { /* Verbose level */ int verbose; + /* Number of threads to use */ + int number_threads; + /* Reader for each type of particles */ PyParticleReader *part_reader[swift_type_count]; @@ -130,17 +133,19 @@ static int Reader_init(PyObjectReader *self, PyObject *args, PyObject *kwds) { /* input variables. */ char *basename = NULL; int verbose = 0; + int number_threads = 1; /* List of keyword arguments. */ - static char *kwlist[] = {"basename", "verbose", NULL}; + static char *kwlist[] = {"basename", "verbose", "number_threads", NULL}; /* parse the arguments. */ - if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|i", kwlist, &basename, - &verbose)) + if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|ii", kwlist, &basename, + &verbose, &number_threads)) return -1; /* Copy the arguments */ self->verbose = verbose; + self->number_threads = number_threads; size_t n_plus_null = strlen(basename) + 1; self->basename = (char *)malloc(n_plus_null * sizeof(char)); @@ -381,7 +386,7 @@ static PyObject *pyEnter(__attribute__((unused)) PyObject *self, PyObjectReader *self_reader = (PyObjectReader *)self; logger_reader_init(&self_reader->reader, self_reader->basename, - self_reader->verbose); + self_reader->verbose, self_reader->number_threads); self_reader->ready = 1; /* Allocate the particle readers */ @@ -769,6 +774,9 @@ static PyObject *pyGetData(__attribute__((unused)) PyObject *self, output[i] = malloc(n_tot * h->masks[field_indices[i]].size); } + /* Enable multithreading */ + Py_BEGIN_ALLOW_THREADS; + /* Read the particles. */ if (part_ids == Py_None) { logger_reader_read_all_particles(reader, time, logger_reader_lin, @@ -805,6 +813,9 @@ static PyObject *pyGetData(__attribute__((unused)) PyObject *self, } } + /* Disable multithreading */ + Py_END_ALLOW_THREADS; + /* Create the python output. */ PyObject *array = logger_loader_create_output(output, field_indices, n_fields, n_part, n_tot); diff --git a/logger/logger_reader.c b/logger/logger_reader.c index b7713f7e158799b752a68b4dc39f71f8f231d655..d63143686201af1db658f5e08545b75655becfd1 100644 --- a/logger/logger_reader.c +++ b/logger/logger_reader.c @@ -37,9 +37,10 @@ * @param reader The #logger_reader. * @param basename The basename of the logger files. * @param verbose The verbose level. + * @param number_threads The number of threads to use. */ void logger_reader_init(struct logger_reader *reader, const char *basename, - int verbose) { + int verbose, int number_threads) { if (verbose > 1) message("Initializing the reader."); /* Set the variable to the default values */ @@ -52,7 +53,7 @@ void logger_reader_init(struct logger_reader *reader, const char *basename, /* Initialize the reader variables. */ reader->verbose = verbose; - logger_parameters_init(&reader->params, reader); + logger_parameters_init(&reader->params, reader, number_threads); /* Generate the logfile filename */ char logfile_name[STRING_SIZE]; @@ -599,6 +600,166 @@ void logger_reader_get_fields_wanted(const struct logger_reader *reader, } } +/** + * @brief Structure for the mapper function + * #logger_reader_read_all_particles_single_type_mapper. + */ +struct extra_data_read_all { + + /*! The #logger_reader. */ + struct logger_reader *reader; + + /*! The requested time. */ + double time; + + /*! The interpolation type. */ + enum logger_reader_type interp_type; + + /*! The #field_information for all the fields requested. */ + const struct field_information *fields_wanted; + + /*! The number of fields requested. */ + int n_fields_wanted; + + /*! The array of output arrays. */ + void **output; + + /*! The number of particles at the requested time. */ + const uint64_t *n_part; + + /*! The type of particle to read. */ + enum part_type type; + + /*! The number of fields available for this type of particles. */ + int all_fields_count; + + /*! The #field_information of all the fields available. */ + const struct field_information *all_fields; + + /*! The current position in the index file. */ + size_t current_index; + + /*! The current position in the output arrays. */ + size_t current_output; +}; + +/** + * @brief Mapper function of #logger_reader_read_all_particles_single_type. + * + * @param map_data (size_t) The current position (not used) + * @param num_elements The number of elements to process with the current + * thread. + * @param extra_data Pointer to a #extra_data_read_all. + */ +void logger_reader_read_all_particles_single_type_mapper(void *map_data, + int num_elements, + void *extra_data) { + + /* Extract the data */ + struct extra_data_read_all *data_tp = + (struct extra_data_read_all *)extra_data; + struct logger_reader *reader = data_tp->reader; + double time = data_tp->time; + enum logger_reader_type interp_type = data_tp->interp_type; + const struct field_information *fields_wanted = data_tp->fields_wanted; + int n_fields_wanted = data_tp->n_fields_wanted; + void **output = data_tp->output; + const uint64_t *n_part = data_tp->n_part; + enum part_type type = data_tp->type; + int all_fields_count = data_tp->all_fields_count; + const struct field_information *all_fields = data_tp->all_fields; + + /* Create a few variables for later */ + const struct header *h = &reader->log.header; + + const size_t size_index = reader->index.index_prev.nparts[type]; + const size_t size_history = + logger_reader_count_number_new_particles(reader, type); + + struct index_data *data = + logger_index_get_data(&reader->index.index_prev, type); + struct index_data *data_created = + logger_index_get_created_history(&reader->index.index_next, type); + + /* Count the number of previous parts for the shift in output */ + uint64_t prev_npart = 0; + for (int i = 0; i < type; i++) { + prev_npart += n_part[i]; + } + + /* Allocate the temporary memory. */ + void **output_tmp = malloc(n_fields_wanted * sizeof(void *)); + if (output_tmp == NULL) { + error_python("Failed to allocate the temporary output buffer"); + } + for (int i = 0; i < n_fields_wanted; i++) { + const int global = fields_wanted[i].global_index; + output_tmp[i] = malloc(h->masks[global].size); + if (output_tmp[i] == NULL) { + error_python("Failed to allocate the temporary output buffer"); + } + } + + /* Read the particles */ + for (int i = 0; i < num_elements; i++) { + + /* Get the next index. */ + size_t current_index = atomic_inc(&data_tp->current_index); + + /* Are we reading the history? */ + int reading_history = current_index >= size_index; + + /* Check if we still have some particles available. */ + if (reading_history && current_index == size_history + size_index) { + error_python("The logger was not able to find enough particles."); + } + + /* Get the offset */ + const size_t current = + reading_history ? current_index - size_index : current_index; + size_t offset = + reading_history ? data_created[current].offset : data[current].offset; + + /* Loop over each field. */ + int particle_removed = 0; + for (int field = 0; field < n_fields_wanted; field++) { + + /* Read the field. */ + particle_removed = logger_reader_read_field( + reader, time, reader->time.time_offset, interp_type, offset, + &fields_wanted[field], all_fields, all_fields_count, + output_tmp[field], type); + + /* Should we continue to read the fields of this particle? */ + if (particle_removed) { + break; + } + } + + /* Write the particle into the output if it was not removed */ + if (!particle_removed) { + size_t current_output = atomic_inc(&data_tp->current_output); + for (int field = 0; field < n_fields_wanted; field++) { + + /* Get the array */ + const int global = fields_wanted[field].global_index; + void *output_single = + (char *)output[field] + + (current_output + prev_npart) * h->masks[global].size; + + /* Copy the temporary buffer into the global one */ + memcpy(output_single, output_tmp[field], h->masks[global].size); + } + } + } + + /* Free the allocated memory */ + for (int i = 0; i < n_fields_wanted; i++) { + free(output_tmp[i]); + } + free(output_tmp); +} + /** * @brief Read all the particles of a given type from the index file. * @@ -617,15 +778,8 @@ void logger_reader_read_all_particles_single_type( enum logger_reader_type interp_type, const int *global_fields_wanted, const int n_fields_wanted, void **output, const uint64_t *n_part, enum part_type type) { - const struct header *h = &reader->log.header; - /* Count the number of previous parts for the shift in output */ - uint64_t prev_npart = 0; - for (int i = 0; i < type; i++) { - prev_npart += n_part[i]; - } - /* Allocate temporary memory. */ struct field_information *fields_wanted = (struct field_information *)malloc( sizeof(struct field_information) * n_fields_wanted); @@ -633,11 +787,6 @@ void logger_reader_read_all_particles_single_type( error_python("Failed to allocate the field information."); } - struct index_data *data = - logger_index_get_data(&reader->index.index_prev, type); - struct index_data *data_created = - logger_index_get_created_history(&reader->index.index_next, type); - /* Get the list of fields. */ const int all_fields_count = tools_get_number_fields(type); struct field_information *all_fields = (struct field_information *)malloc( @@ -649,54 +798,30 @@ void logger_reader_read_all_particles_single_type( n_fields_wanted, all_fields, all_fields_count, type); - size_t current_in_index = 0; - int reading_history = 0; - const size_t size_index = reader->index.index_prev.nparts[type]; - const size_t size_history = - logger_reader_count_number_new_particles(reader, type); + /* Compact the data */ + struct extra_data_read_all extra = {reader, + time, + interp_type, + fields_wanted, + n_fields_wanted, + output, + n_part, + type, + all_fields_count, + all_fields, + /* current_index */ 0, + /* current_output */ 0}; + + /* Create the threadpool */ + struct threadpool tp; + threadpool_init(&tp, reader->params.number_threads); /* Read the particles */ - for (size_t i = 0; i < n_part[type]; i++) { - int particle_removed = 1; - /* Do it until finding a particle not removed. */ - while (particle_removed) { - /* Should we start to read the history? */ - if (!reading_history && current_in_index == size_index) { - current_in_index = 0; - reading_history = 1; - } - - /* Check if we still have some particles available. */ - if (reading_history && current_in_index == size_history) { - error_python("The logger was not able to find enough particles."); - } - - /* Get the offset */ - size_t offset = reading_history ? data_created[current_in_index].offset - : data[current_in_index].offset; - - /* Loop over each field. */ - for (int field = 0; field < n_fields_wanted; field++) { - const int global = fields_wanted[field].global_index; - void *output_single = - (char *)output[field] + (i + prev_npart) * h->masks[global].size; - - /* Read the field. */ - particle_removed = logger_reader_read_field( - reader, time, reader->time.time_offset, interp_type, offset, - &fields_wanted[field], all_fields, all_fields_count, output_single, - type); - - /* Should we continue to read the fields of this particle? */ - if (particle_removed) { - break; - } - } - current_in_index++; - } - } + threadpool_map(&tp, logger_reader_read_all_particles_single_type_mapper, NULL, + n_part[type], 1, threadpool_auto_chunk_size, &extra); /* Free the memory */ + threadpool_clean(&tp); free(all_fields); free(fields_wanted); } diff --git a/logger/logger_reader.h b/logger/logger_reader.h index 8189589d7d47480471a8383163e5c498577300f9..f3f1ddeb211857ad5439a7d5b41b3eb56e52880b 100644 --- a/logger/logger_reader.h +++ b/logger/logger_reader.h @@ -117,7 +117,7 @@ enum logger_reader_event { void logger_reader_init_index(struct logger_reader *reader); void logger_reader_init(struct logger_reader *reader, const char *basename, - int verbose); + int verbose, int number_threads); void logger_reader_free(struct logger_reader *reader); void logger_reader_set_time(struct logger_reader *reader, double time); diff --git a/logger/tests/testLogfileReader.c b/logger/tests/testLogfileReader.c index 43c2403cee1661d7b503bf02f93de0449d2a5568..84111f99fa489c654a6124ac63f02dc507cfc361 100644 --- a/logger/tests/testLogfileReader.c +++ b/logger/tests/testLogfileReader.c @@ -227,7 +227,7 @@ int main(int argc, char *argv[]) { char basename[200]; parser_get_param_string(¶ms, "Logger:basename", basename); strcat(basename, "_0000"); - logger_reader_init(&reader, basename, /* verbose */ 1); + logger_reader_init(&reader, basename, /* verbose */ 1, /* number_threads */1); /* Finally check everything. diff --git a/logger/tests/testTimeArray.c b/logger/tests/testTimeArray.c index 8746afb4efe04499bcc699c3e78ed075967be4c7..d33d4f3a275762912f87541d372087048a7345e6 100644 --- a/logger/tests/testTimeArray.c +++ b/logger/tests/testTimeArray.c @@ -38,7 +38,7 @@ int main(int argc, char *argv[]) { /* Initialize the time array */ struct time_array times; - time_array_init(×); + time_array_init(×, /* initial_size */1024); /* Add elements */ for (size_t i = 0; i < NUMBER_OF_ELEMENT; i++) { diff --git a/logger/tests/testVirtualReality.c b/logger/tests/testVirtualReality.c index e43546834a582451105f01a49ae4d0bae5595b82..43d87a530176663c7643703770f7f1ffccbe5250 100644 --- a/logger/tests/testVirtualReality.c +++ b/logger/tests/testVirtualReality.c @@ -66,7 +66,7 @@ int main(int argc, char *argv[]) { parser_get_param_string(¶ms, "Logger:basename", basename); strcat(basename, "_0000"); logger_reader_init(&reader, basename, - /* Verbose */ 0); + /* Verbose */ 0, /* number_threads */1); /* Read the time limits */ double begin = logger_reader_get_time_begin(&reader); diff --git a/logger/tests/test_reader.yml b/logger/tests/test_reader.yml new file mode 100644 index 0000000000000000000000000000000000000000..d2e71b424cb768b8baad711c33d387611dedc340 --- /dev/null +++ b/logger/tests/test_reader.yml @@ -0,0 +1,41 @@ +Header: + BoxSize: [1, 1, 1] + Dimension: 3 + RunName: Untitled SWIFT simulation + Periodic: 0 + +InternalUnitSystem: + UnitMass_in_cgs: 1 + UnitLength_in_cgs: 1 + UnitTime_in_cgs: 1 + UnitCurrent_in_cgs: 1 + UnitTemp_in_cgs: 1 + +Policy: + steal: 1 + keep: 1 + block: 0 + cpu tight: 0 + mpi: 0 + numa affinity: 1 + hydro: 1 + self gravity: 0 + external gravity: 0 + cosmological integration: 0 + drift everything: 0 + reconstruct multi-poles: 0 + temperature: 0 + cooling: 0 + stars: 0 + structure finding: 0 + star formation: 0 + feedback: 0 + black holes: 0 + fof search: 0 + time-step limiter: 1 + time-step sync: 0 + logger: 1 + line of sight: 0 + sink: 0 + rt: 0 + diff --git a/logger/tests/testvr.yml b/logger/tests/testvr.yml new file mode 100644 index 0000000000000000000000000000000000000000..d2e71b424cb768b8baad711c33d387611dedc340 --- /dev/null +++ b/logger/tests/testvr.yml @@ -0,0 +1,41 @@ +Header: + BoxSize: [1, 1, 1] + Dimension: 3 + RunName: Untitled SWIFT simulation + Periodic: 0 + +InternalUnitSystem: + UnitMass_in_cgs: 1 + UnitLength_in_cgs: 1 + UnitTime_in_cgs: 1 + UnitCurrent_in_cgs: 1 + UnitTemp_in_cgs: 1 + +Policy: + steal: 1 + keep: 1 + block: 0 + cpu tight: 0 + mpi: 0 + numa affinity: 1 + hydro: 1 + self gravity: 0 + external gravity: 0 + cosmological integration: 0 + drift everything: 0 + reconstruct multi-poles: 0 + temperature: 0 + cooling: 0 + stars: 0 + structure finding: 0 + star formation: 0 + feedback: 0 + black holes: 0 + fof search: 0 + time-step limiter: 1 + time-step sync: 0 + logger: 1 + line of sight: 0 + sink: 0 + rt: 0 + diff --git a/src/logger.h b/src/logger.h index 45cc89d6043b56d6834702de62ae7ae0d4548dfb..77516f348cbb2acac41cb6f4014437669970aab1 100644 --- a/src/logger.h +++ b/src/logger.h @@ -25,6 +25,7 @@ #ifdef WITH_LOGGER /* Includes. */ +#include "align.h" #include "common_io.h" #include "dump.h" #include "error.h"