-
Loic Hausammann authoredLoic Hausammann authored
csds_reader.c 36.25 KiB
/*******************************************************************************
* This file is part of CSDS.
* Copyright (c) 2019 Loic Hausammann (loic.hausammann@epfl.ch)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/* Include corresponding header */
#include "csds_reader.h"
/* Include CSDS headers */
#include "csds_fields.h"
#include "csds_parameters.h"
/* Include standard library */
#include <sys/sysinfo.h>
#include <unistd.h>
/* Include local headers */
#include "threadpool.h"
#define nr_threads get_nprocs()
/**
* @brief Initialize the reader.
*
* @param reader The #csds_reader.
* @param basename The basename of the csds files.
* @param verbose The verbose level.
* @param number_threads The number of threads to use.
* @param number_index The number of index files to write.
* @param restart_init Are we restarting the initial reading?
*/
void csds_reader_init(struct csds_reader *reader, const char *basename,
int verbose, int number_threads, int number_index,
int restart_init) {
if (verbose > 1) message("Initializing the reader.");
/* Set the variable to the default values */
reader->time.time = -1.;
reader->time.int_time = 0;
reader->time.time_offset = 0;
/* Copy the base name */
strcpy(reader->basename, basename);
/* Initialize the reader variables. */
reader->verbose = verbose;
csds_parameters_init(&reader->params, reader, number_threads);
/* Generate the logfile filename */
char logfile_name[STRING_SIZE];
sprintf(logfile_name, "%s.dump", basename);
/* Initialize the log file. */
csds_logfile_init_from_file(&reader->log, logfile_name, reader,
/* only_header */ 0);
/* Initialize the index files and creates them if needed */
csds_reader_init_index(reader, number_index, restart_init);
/* Save the time array */
time_array_save(&reader->log.times, &reader->log);
if (verbose > 1) message("Initialization done.");
}
/**
* @brief Initialize the index part of the reader.
*
* @param reader The #csds_reader.
* @param number_index Number of requested index files.
* @param restart Are we restarting the generation of index files?
*/
void csds_reader_init_index(struct csds_reader *reader, int number_index,
int restart) {
/* Initialize the csds_index */
csds_index_init(&reader->index.index_prev, reader);
csds_index_init(&reader->index.index_next, reader);
/* Count the number of files */
int count = 0;
while (1) {
char filename[STRING_SIZE + 50];
sprintf(filename, "%s_%04i.index", reader->basename, count);
/* Check if file exists */
if (access(filename, F_OK) != -1) {
count++;
} else {
break;
}
}
/* Ensures that we do not have an unexpected situation */
if (number_index == 1 || (count == 1 && !restart)) {
error(
"The CSDS cannot run with only one index file."
" Did you forget to restart their generation?");
}
/* Check if need to generate the index files or not. */
if ((count != number_index && restart) || count == 0) {
csds_reader_generate_index_files(reader, number_index, count);
count = number_index;
} else if (reader->verbose > 0 && count != number_index) {
message(
"Found %i index files. If you wish to have the"
" requested %i files, please delete the old ones.",
count, number_index);
}
reader->index.n_files = count;
/* Initialize the arrays */
if ((reader->index.times = (double *)malloc(count * sizeof(double))) ==
NULL) {
error_python("Failed to allocate the list of times");
}
if ((reader->index.int_times =
(integertime_t *)malloc(count * sizeof(integertime_t))) == NULL) {
error_python("Failed to allocate the list of times");
}
/* Get the information contained in the headers */
for (int i = 0; i < reader->index.n_files; i++) {
char filename[STRING_SIZE + 50];
sprintf(filename, "%s_%04i.index", reader->basename, i);
/* Read the header */
csds_index_read_header(&reader->index.index_prev, filename);
/* Save the required information */
reader->index.times[i] = reader->index.index_prev.time;
reader->index.int_times[i] = reader->index.index_prev.integer_time;
}
}
/**
* @brief Free the reader.
*
* @param reader The #csds_reader.
*/
void csds_reader_free(struct csds_reader *reader) {
/* Free the log. */
csds_logfile_free(&reader->log);
if (reader->time.time != -1.) {
csds_index_free(&reader->index.index_prev);
}
free(reader->index.int_times);
free(reader->index.times);
}
/**
* @brief Set the reader to a given time and read the correct index file.
*
* @param reader The #csds_reader.
* @param time The requested time.
*/
void csds_reader_set_time(struct csds_reader *reader, double time) {
/* Set the time */
reader->time.time = time;
unsigned int left = 0;
unsigned int right = reader->index.n_files - 1;
/* Check if the time is meaningful */
if (reader->index.times[left] > time || reader->index.times[right] < time)
error("The requested time (%f) is not within the index limits [%f, %f]",
time, reader->index.times[left], reader->index.times[right]);
/* Find the correct index */
while (left != right) {
/* Do a ceil - division */
unsigned int m = (left + right + 1) / 2;
if (reader->index.times[m] > time) {
right = m - 1;
} else {
left = m;
}
}
/* Deal with the final time */
if (left == (unsigned int)reader->index.n_files - 1) {
left -= 1;
}
/* Generate the filename */
char filename_prev[STRING_SIZE + 50];
sprintf(filename_prev, "%s_%04u.index", reader->basename, left);
char filename_next[STRING_SIZE + 50];
sprintf(filename_next, "%s_%04u.index", reader->basename, left + 1);
/* Check if the file is already mapped */
if (reader->index.index_prev.index.map != NULL) {
csds_index_free(&reader->index.index_prev);
}
if (reader->index.index_next.index.map != NULL) {
csds_index_free(&reader->index.index_next);
}
/* Read the file */
csds_index_read_header(&reader->index.index_prev, filename_prev);
csds_index_map_file(&reader->index.index_prev, filename_prev,
/* sorted */ 1);
csds_index_read_header(&reader->index.index_next, filename_next);
csds_index_map_file(&reader->index.index_next, filename_next,
/* sorted */ 1);
/* Get the offset of the time chunk */
size_t ind = time_array_get_index_from_time(&reader->log.times, time);
/* ind == 0 and ind == 1 are the same time, but when reading we need
data before the initial time. */
if (ind == 0) {
ind = 1;
}
/* Check if we requested exactly a time step */
if (reader->log.times.records[ind].time != time) {
/* In order to interpolate, we need to be above and not below the time */
ind += 1;
}
/* Save the values */
reader->time.index = ind;
reader->time.int_time = reader->log.times.records[ind].int_time;
reader->time.time_offset = reader->log.times.records[ind].offset;
}
/**
* @brief Count the number of new particles at the time set since last index
* file.
*
* @param reader The #csds_reader.
* @param part_type The index corresponding to the particle type.
*
* @param The number of new particles.
*/
size_t csds_reader_count_number_new_particles(struct csds_reader *reader,
enum part_type part_type) {
const size_t threshold = reader->time.time_offset;
/* Get the history of created particles. */
struct index_data *data =
csds_index_get_created_history(&reader->index.index_next, part_type);
/* Do we have any new particle? */
if (reader->index.index_next.nparts_created[part_type] == 0 ||
threshold < data[0].offset) {
return 0;
}
/* Perform a binary search */
size_t right = reader->index.index_next.nparts_created[part_type] - 1;
size_t left = 0;
while (left <= right) {
size_t m = (left + right) / 2;
if (data[m].offset < threshold) {
left = m + 1;
} else if (data[m].offset > threshold) {
right = m - 1;
} else {
return m;
}
}
/* Compute the return value. */
size_t ret = (left + right) / 2;
/* Check if the binary search is correct. */
if (data[ret].offset > threshold) {
error_python("The binary search has failed.");
}
/* We need the count, not the index. */
return ret + 1;
}
/**
* @brief Count the number of removed particles at the time set since last index
* file.
*
* @param reader The #csds_reader.
* @param n_type (output) The number of particle type possible.
* @param part_type The index corresponding to the particle type.
*
* @param The number of removed particles.
*/
size_t csds_reader_count_number_removed_particles(struct csds_reader *reader,
enum part_type part_type) {
const size_t threshold = reader->time.time_offset;
/* Get the history of created particles. */
struct index_data *data =
csds_index_get_removed_history(&reader->index.index_next, part_type);
/* Do we have any new particle? */
if (reader->index.index_next.nparts_removed[part_type] == 0 ||
threshold < data[0].offset) {
return 0;
}
/* Perform a binary search */
size_t right = reader->index.index_next.nparts_removed[part_type] - 1;
size_t left = 0;
while (left <= right) {
size_t m = (left + right) / 2;
if (data[m].offset < threshold) {
left = m + 1;
} else if (data[m].offset > threshold) {
right = m - 1;
} else {
return m;
}
}
/* Compute the return value. */
size_t ret = (left + right) / 2;
/* Check if the binary search is correct. */
if (data[ret].offset > threshold) {
error_python("The binary search has failed.");
}
/* We need the count, not the index. */
return ret + 1;
}
/**
* @brief Provides the number of particle (per type) from the index file.
*
* @param reader The #csds_reader.
* @param n_parts (out) Number of particles at the time set in the reader.
* @param read_types Should we read this type of particles?
*/
void csds_reader_get_number_particles(struct csds_reader *reader,
uint64_t *n_parts,
const int *read_types) {
for (enum part_type i = (enum part_type)0; i < swift_type_count; i++) {
/* Should we skip this type of particles? */
if (read_types[i] == 0) {
n_parts[i] = 0;
continue;
}
/* Count the number of particles present in the last index file. */
const uint64_t count_prev = reader->index.index_prev.nparts[i];
/* Count the number of particles that have been created since last index. */
const uint64_t count_new =
csds_reader_count_number_new_particles(reader, i);
/* Count the number of particles that have been removed since last index. */
const uint64_t count_removed =
csds_reader_count_number_removed_particles(reader, i);
n_parts[i] = (count_prev + count_new) - count_removed;
}
}
/**
* @brief Read a single field for a single part from the csds and interpolate
it if needed.
* This function will jump from the last known full record until the requested
time and then
* read the last record containing the requested field.
* If an interpolation is required, the function will continue jumping until
finding a record
* with the requested field. If the first or second derivative is present in the
last record before /
* first record after the requested time, it will use it for the interpolation.
*
* @param reader The #csds_reader.
* @param time The requested time.
* @param offset_time The offset of the corresponding time record.
* @param interp_type The type of interpolation requested.
* @param offset_last_full_record The offset of this particle last record
containing all the fields.
* @param field_wanted The field to read.
* @param output Pointers to the output array
* @param type The #part_type.
*
* @return Is the particle removed from the logfile?
*/
int csds_reader_read_field(struct csds_reader *reader, double time,
size_t offset_time,
enum csds_reader_type interp_type,
const size_t offset_last_full_record,
const struct field_information *field_wanted,
void *output, enum part_type type) {
const struct header *h = &reader->log.header;
size_t offset = offset_last_full_record;
/* Check if the offsets are correct. */
if (offset > offset_time) {
error_python("Last offset is larger than the requested time.");
}
/* Offset of the field read before the requested time. */
size_t offset_before = offset_last_full_record;
/* Record's header information */
size_t mask, h_offset;
/* Find the data for the previous record.
As we start from a full record,
no need to check if the field is found.
*/
while (offset < offset_time) {
/* Read the particle. */
csds_loader_io_read_mask(h, reader->log.log.map + offset, &mask, &h_offset);
/* Is the particle removed from the logfile? */
if (mask & SPECIAL_FLAGS_MASK) {
int data = 0;
int part_type = 0;
enum csds_special_flags flag = csds_particle_read_special_flag(
reader, offset, &mask, &h_offset, &data, &part_type);
#ifdef SWIFT_DEBUG_CHECKS
if (part_type != type) {
error("Found particles that do not correspond.");
}
#endif
if (flag == csds_flag_change_type || flag == csds_flag_mpi_exit ||
flag == csds_flag_delete) {
return 1;
}
}
/* Check if there is a next record (avoid infinite loop). */
if (h_offset == 0) {
error_python("No next record found.");
}
/* Check if the field is present */
if (mask & field_wanted->field.mask) {
offset_before = offset;
}
/* Go to the next record. */
offset += h_offset;
}
/* Read the field */
csds_particle_read_field(reader, offset_before, output, type, field_wanted,
/* derivative */ 0, &mask, &h_offset);
/* Deal with the first derivative. */
int first_found = field_wanted->first.field != field_enum_none &&
mask & field_wanted->first.mask;
char first_deriv[field_wanted->first.size];
if (first_found) {
/* Read the first derivative */
csds_particle_read_field(reader, offset_before, first_deriv, type,
field_wanted, /* derivative */ 1, &mask,
&h_offset);
}
/* Deal with the second derivative. */
int second_found = field_wanted->second.field != field_enum_none &&
mask & field_wanted->second.mask;
char second_deriv[field_wanted->second.size];
if (second_found) {
/* Read the first derivative */
csds_particle_read_field(reader, offset_before, second_deriv, type,
field_wanted, /* derivative */ 2, &mask,
&h_offset);
}
/* Get the time. */
// TODO reduce search interval
double time_before = time_array_get_time(&reader->log.times, offset_before);
/* When interpolating, we need to get the next record after
the requested time.
*/
if (interp_type != csds_reader_const) {
/* Loop over the records until having all the fields. */
while (1) {
/* Read the particle. */
csds_loader_io_read_mask(h, reader->log.log.map + offset, &mask,
&h_offset);
/* Do we have the field? */
if (mask & field_wanted->field.mask) {
break;
}
/* Check if we can still move forward */
if (h_offset == 0) {
error_python("There is no record after the current one");
}
/* Go to the next record. */
offset += h_offset;
}
/* Output after the requested time. */
char output_after[field_wanted->field.size];
/* Read the field */
csds_particle_read_field(reader, offset, output_after, type, field_wanted,
/* derivative */ 0, &mask, &h_offset);
/* Deal with the first derivative. */
char first_deriv_after[field_wanted->first.size];
/* Did we find the derivative before and in this record? */
first_found = field_wanted->first.field != field_enum_none && first_found &&
mask & field_wanted->first.mask;
if (first_found) {
/* Read the first derivative */
csds_particle_read_field(reader, offset, first_deriv_after, type,
field_wanted, /*derivative*/ 1, &mask,
&h_offset);
}
/* Deal with the second derivative. */
char second_deriv_after[field_wanted->second.size];
/* Did we find the derivative before and in this record? */
second_found = field_wanted->second.field != field_enum_none &&
second_found && mask & field_wanted->second.mask;
if (second_found) {
/* Read the second derivative */
csds_particle_read_field(reader, offset, second_deriv_after, type,
field_wanted, /* derivative */ 2, &mask,
&h_offset);
}
/* Get the time. */
// TODO reduce search interval
double time_after = time_array_get_time(&reader->log.times, offset);
/* Deal with the derivatives */
struct csds_reader_field before;
struct csds_reader_field after;
before.field = output;
before.first_deriv = first_found ? first_deriv : NULL;
before.second_deriv = second_found ? second_deriv : NULL;
after.field = output_after;
after.first_deriv = first_found ? first_deriv_after : NULL;
after.second_deriv = second_found ? second_deriv_after : NULL;
/* Interpolate the data. */
csds_particle_interpolate_field(time_before, &before, time_after, &after,
output, time, field_wanted, type,
&reader->params);
}
return 0;
}
/**
* @brief Convert the fields from global indexes to local.
*
* @param reader The #csds_reader.
* @param fields_wanted_wanted The fields to convert.
* @param fields_info_wanted (out) #field_information for the wanted fields
* (need to be allocated).
* @param n_fields_wanted Number of elements in fields_wanted,
* local_fields_wanted and its derivatives.
* @param type The type of the particle.
*/
void csds_reader_get_fields_wanted(
const struct csds_reader *reader, const enum field_enum *fields_wanted,
const struct field_information **fields_found, const int n_fields_wanted,
enum part_type type) {
const struct header *h = &reader->log.header;
/* Mark fields_found in order to check that the fields are found. */
for (int i = 0; i < n_fields_wanted; i++) {
fields_found[i] = NULL;
}
/* Find the corresponding fields */
for (int i = 0; i < n_fields_wanted; i++) {
for (int j = 0; j < h->number_fields[type]; j++) {
/* Copy the structure if the field is found. */
if (h->fields[type][j].field.field == fields_wanted[i]) {
fields_found[i] = &h->fields[type][j];
break;
}
}
}
/* Check that we found the fields */
for (int i = 0; i < n_fields_wanted; i++) {
if (fields_found[i] == NULL) {
const char *name = field_enum_get_name(fields_wanted[i]);
error_python("Field %s not found in particle type %s", name,
part_type_names[type]);
}
}
}
/**
* @brief Structure for the mapper function
* #csds_reader_read_all_particles_single_type_mapper.
*/
struct extra_data_read_all {
/*! The #csds_reader. */
struct csds_reader *reader;
/*! The requested time. */
double time;
/*! The interpolation type. */
enum csds_reader_type interp_type;
/*! The #field_information for all the fields requested. */
const struct field_information **fields_wanted;
/*! The number of fields requested. */
int n_fields_wanted;
/*! The array of output arrays. */
void **output;
/*! The number of particles at the requested time. */
const uint64_t *n_part;
/*! The type of particle to read. */
enum part_type type;
/*! The current position in the index file. */
size_t current_index;
/*! The current position in the output arrays. */
size_t current_output;
};
/**
* @brief Mapper function of #csds_reader_read_all_particles_single_type.
*
* @param map_data (size_t) The current position (not used)
* @param num_elements The number of elements to process with the current
* thread.
* @param extra_data Pointer to a #extra_data_read_all.
*/
void csds_reader_read_all_particles_single_type_mapper(void *map_data,
int num_elements,
void *extra_data) {
/* Extract the data */
struct extra_data_read_all *data_tp =
(struct extra_data_read_all *)extra_data;
struct csds_reader *reader = data_tp->reader;
double time = data_tp->time;
enum csds_reader_type interp_type = data_tp->interp_type;
const struct field_information **fields_wanted = data_tp->fields_wanted;
int n_fields_wanted = data_tp->n_fields_wanted;
void **output = data_tp->output;
const uint64_t *n_part = data_tp->n_part;
enum part_type type = data_tp->type;
/* Create a few variables for later */
const size_t size_index = reader->index.index_prev.nparts[type];
const size_t size_history =
csds_reader_count_number_new_particles(reader, type);
struct index_data *data =
csds_index_get_data(&reader->index.index_prev, type);
struct index_data *data_created =
csds_index_get_created_history(&reader->index.index_next, type);
/* Count the number of previous parts for the shift in output */
uint64_t prev_npart = 0;
for (int i = 0; i < type; i++) {
prev_npart += n_part[i];
}
/* Allocate the temporary memory. */
void **output_tmp = malloc(n_fields_wanted * sizeof(void *));
if (output_tmp == NULL) {
error_python("Failed to allocate the temporary output buffer");
}
for (int i = 0; i < n_fields_wanted; i++) {
output_tmp[i] = malloc(fields_wanted[i]->field.size);
if (output_tmp[i] == NULL) {
error_python("Failed to allocate the temporary output buffer");
}
}
/* Read the particles */
for (int i = 0; i < num_elements; i++) {
/* Get the next index. */
size_t current_index = atomic_inc(&data_tp->current_index);
/* Are we reading the history? */
int reading_history = current_index >= size_index;
/* Check if we still have some particles available. */
if (reading_history && current_index == size_history + size_index) {
error_python("The CSDS was not able to find enough particles.");
}
/* Get the offset */
const size_t current =
reading_history ? current_index - size_index : current_index;
size_t offset =
reading_history ? data_created[current].offset : data[current].offset;
/* Loop over each field. */
int particle_removed = 0;
for (int field = 0; field < n_fields_wanted; field++) {
/* Read the field. */
particle_removed = csds_reader_read_field(
reader, time, reader->time.time_offset, interp_type, offset,
fields_wanted[field], output_tmp[field], type);
/* Should we continue to read the fields of this particle? */
if (particle_removed) {
i--;
break;
}
}
/* Write the particle into the output if it was not removed */
if (!particle_removed) {
size_t current_output = atomic_inc(&data_tp->current_output);
for (int field = 0; field < n_fields_wanted; field++) {
/* Get the array */
const size_t size = fields_wanted[field]->field.size;
void *output_single =
(char *)output[field] + (current_output + prev_npart) * size;
/* Copy the temporary buffer into the global one */
memcpy(output_single, output_tmp[field], size);
}
}
}
/* Free the allocated memory */
for (int i = 0; i < n_fields_wanted; i++) {
free(output_tmp[i]);
}
free(output_tmp);
}
/**
* @brief Read all the particles of a given type from the index file.
*
* @param reader The #csds_reader.
* @param time The requested time for the particle.
* @param interp_type The type of interpolation.
* @param fields_wanted The fields requested.
* @param n_fields_wanted Number of field requested.
* @param output Pointer to the output array. Size: (n_fields_wanted,
* sum(n_part)).
* @param n_part Number of particles of each type.
* @param type The particle type
*/
void csds_reader_read_all_particles_single_type(
struct csds_reader *reader, double time, enum csds_reader_type interp_type,
const enum field_enum *fields_wanted, const int n_fields_wanted,
void **output, const uint64_t *n_part, enum part_type type) {
/* Allocate temporary memory. */
const struct field_information **fields_info_wanted =
(const struct field_information **)malloc(
sizeof(struct field_information *) * n_fields_wanted);
if (fields_info_wanted == NULL) {
error_python("Failed to allocate the field information.");
}
/* Convert fields into the local array. */
csds_reader_get_fields_wanted(reader, fields_wanted, fields_info_wanted,
n_fields_wanted, type);
/* Compact the data */
struct extra_data_read_all extra = {reader,
time,
interp_type,
fields_info_wanted,
n_fields_wanted,
output,
n_part,
type,
/* current_index */ 0,
/* current_output */ 0};
/* Create the threadpool */
struct threadpool tp;
threadpool_init(&tp, reader->params.number_threads);
/* Read the particles */
threadpool_map(&tp, csds_reader_read_all_particles_single_type_mapper, NULL,
n_part[type], 1, threadpool_auto_chunk_size, &extra);
/* Check if we have all the expected particles. */
if (n_part[type] != extra.current_output) {
error("The reader was not able to find all the expected particles.");
}
/* Free the memory */
threadpool_clean(&tp);
free(fields_info_wanted);
}
/**
* @brief Read all the particles from the index file.
*
* @param reader The #csds_reader.
* @param time The requested time for the particle.
* @param interp_type The type of interpolation.
* @param fields_wanted The fields requested.
* @param n_fields_wanted Number of field requested.
* @param output Pointer to the output array. Size: (n_fields_wanted,
* sum(n_part)).
* @param n_part Number of particles of each type.
*/
void csds_reader_read_all_particles(struct csds_reader *reader, double time,
enum csds_reader_type interp_type,
const enum field_enum *fields_wanted,
const int n_fields_wanted, void **output,
const uint64_t *n_part) {
const ticks tic = getticks();
/* Read the gas */
if (n_part[swift_type_gas] != 0) {
csds_reader_read_all_particles_single_type(reader, time, interp_type,
fields_wanted, n_fields_wanted,
output, n_part, swift_type_gas);
}
/* Read the dark matter. */
if (n_part[swift_type_dark_matter] != 0) {
csds_reader_read_all_particles_single_type(
reader, time, interp_type, fields_wanted, n_fields_wanted, output,
n_part, swift_type_dark_matter);
}
/* Read the dark matter background. */
if (n_part[swift_type_dark_matter_background] != 0) {
csds_reader_read_all_particles_single_type(
reader, time, interp_type, fields_wanted, n_fields_wanted, output,
n_part, swift_type_dark_matter_background);
}
/* Read the neutrino dark matter. */
if (n_part[swift_type_neutrino] != 0) {
csds_reader_read_all_particles_single_type(
reader, time, interp_type, fields_wanted, n_fields_wanted, output,
n_part, swift_type_neutrino);
}
/* Read the stars. */
if (n_part[swift_type_stars] != 0) {
csds_reader_read_all_particles_single_type(
reader, time, interp_type, fields_wanted, n_fields_wanted, output,
n_part, swift_type_stars);
}
/* Print the time */
if (reader->verbose > 0 || reader->verbose == CSDS_VERBOSE_TIMERS)
message("took %.3f %s", clocks_from_ticks(getticks() - tic),
clocks_getunit());
}
/**
* @brief Read the particles of a given type from the index file and their ids.
*
* @param reader The #csds_reader.
* @param time The requested time for the particle.
* @param interp_type The type of interpolation.
* @param fields_wanted The fields requested.
* @param n_fields_wanted Number of field requested.
* @param output Pointer to the output array. Size: (n_fields_wanted,
* sum(n_part)).
* @param n_part Number of particles of each type.
* Updated with the number of particles found.
* @param type The particle type
* @param ids The particles' ids.
* The ids not found are removed from this array.
*/
void csds_reader_read_particles_from_ids_single_type(
struct csds_reader *reader, double time, enum csds_reader_type interp_type,
const enum field_enum *fields_wanted, const int n_fields_wanted,
void **output, uint64_t *n_part, enum part_type type, long long *ids) {
/* Count the number of previous parts for the shift in output */
uint64_t prev_npart = 0;
for (int i = 0; i < type; i++) {
prev_npart += n_part[i];
}
/* Allocate temporary memory. */
const struct field_information **fields_info_wanted =
(const struct field_information **)malloc(
sizeof(struct field_information *) * n_fields_wanted);
if (fields_info_wanted == NULL) {
error_python("Failed to allocate the field information.");
}
/* Convert fields into the local array. */
csds_reader_get_fields_wanted(reader, fields_wanted, fields_info_wanted,
n_fields_wanted, type);
/* Read the particles */
for (size_t i = 0; i < n_part[type]; i++) {
/* Get the offset */
size_t offset = 0;
int found =
csds_index_get_offset(&reader->index.index_prev, ids[i], type, &offset);
/* Deal with the particles not found */
if (!found) {
if (i != n_part[type] - 1) ids[i] = ids[i + 1];
n_part[type] -= 1;
i -= 1;
continue;
}
/* Loop over each field. */
for (int field = 0; field < n_fields_wanted; field++) {
void *output_single =
output[field] +
(i + prev_npart) * fields_info_wanted[field]->field.size;
/* Read the field. */
int particle_removed = csds_reader_read_field(
reader, time, reader->time.time_offset, interp_type, offset,
fields_info_wanted[field], output_single, type);
/* Is the particle still present? */
if (particle_removed) {
if (i != n_part[type] - 1) ids[i] = ids[i + 1];
n_part[type] -= 1;
i -= 1;
break;
}
}
}
/* Free the memory */
free(fields_info_wanted);
}
/**
* @brief Read the particles from the index file and their ids.
*
* @param reader The #csds_reader.
* @param time The requested time for the particle.
* @param interp_type The type of interpolation.
* @param fields_wanted The fields requested.
* @param n_fields_wanted Number of field requested.
* @param output Pointer to the output array. Size: (n_fields_wanted,
* sum(n_part)).
* @param n_part Number of particles of each type.
* Updated with the number of particles found.
* @param ids The particles' ids.
* The ids not found are removed from this array.
*/
void csds_reader_read_particles_from_ids(
struct csds_reader *reader, double time, enum csds_reader_type interp_type,
const enum field_enum *fields_wanted, const int n_fields_wanted,
void **output, uint64_t *n_part, long long **ids) {
const ticks tic = getticks();
/* Read the gas */
if (n_part[swift_type_gas] != 0) {
csds_reader_read_particles_from_ids_single_type(
reader, time, interp_type, fields_wanted, n_fields_wanted, output,
n_part, swift_type_gas, ids[swift_type_gas]);
}
/* Read the dark matter. */
if (n_part[swift_type_dark_matter] != 0) {
csds_reader_read_particles_from_ids_single_type(
reader, time, interp_type, fields_wanted, n_fields_wanted, output,
n_part, swift_type_dark_matter, ids[swift_type_dark_matter]);
}
/* Read the dark matter background. */
if (n_part[swift_type_dark_matter_background] != 0) {
csds_reader_read_particles_from_ids_single_type(
reader, time, interp_type, fields_wanted, n_fields_wanted, output,
n_part, swift_type_dark_matter_background,
ids[swift_type_dark_matter_background]);
}
/* Read the stars. */
if (n_part[swift_type_stars] != 0) {
csds_reader_read_particles_from_ids_single_type(
reader, time, interp_type, fields_wanted, n_fields_wanted, output,
n_part, swift_type_stars, ids[swift_type_stars]);
}
/* Print the time */
if (reader->verbose > 0 || reader->verbose == CSDS_VERBOSE_TIMERS)
message("took %.3f %s", clocks_from_ticks(getticks() - tic),
clocks_getunit());
}
/**
* @brief Get the simulation initial time.
*
* @param reader The #csds_reader.
*
* @return The initial time
*/
double csds_reader_get_time_begin(struct csds_reader *reader) {
return reader->log.times.records[0].time;
}
/**
* @brief Get the simulation final time.
*
* @param reader The #csds_reader.
*
* @return The final time
*/
double csds_reader_get_time_end(struct csds_reader *reader) {
const size_t ind = reader->log.times.size;
return reader->log.times.records[ind - 1].time;
}
/**
* @brief Get the offset of the last timestamp before a given time.
*
* @param reader The #csds_reader.
* @param time The requested time.
*
* @return The offset of the timestamp.
*/
size_t csds_reader_get_next_offset_from_time(struct csds_reader *reader,
double time) {
size_t ind = time_array_get_index_from_time(&reader->log.times, time);
/* We do not want to have the sentiel */
if (reader->log.times.size - 2 == ind) {
ind -= 1;
}
return reader->log.times.records[ind + 1].offset;
}
/**
* @brief Read a record without knowing if it is a particle or a timestamp.
*
* WARNING This function asssumes that all the particles are hydro particles.
* Thus it should be used only for testing the code.
*
* @param reader The #csds_reader.
* @param output The already allocated buffer containing all the fields possible
* for an hydro particle. (out) The particle if the record is a particle
* @param time (out) The time if the record is a timestamp.
* @param is_particle (out) 1 if the record is a particle 0 otherwise.
* @param offset The offset of the record to read.
*
* @return The offset after the record.
*/
size_t csds_reader_read_record(struct csds_reader *reader, void **output,
double *time, int *is_particle, size_t offset) {
/* Get a few pointers. */
const struct header *h = &reader->log.header;
char *map = reader->log.log.map;
size_t mask = 0;
size_t h_offset = 0;
/* Read the record's mask. */
map = csds_loader_io_read_mask(h, map + offset, &mask, &h_offset);
*is_particle = !(mask & TIMESTAMP_MASK);
/* The record is a particle. */
if (*is_particle) {
size_t offset_tmp = offset;
for (int i = 0; i < h->number_fields[swift_type_gas]; i++) {
offset = csds_particle_read_field(
reader, offset_tmp, output[i], swift_type_gas,
&h->fields[swift_type_gas][i], /* derivative */ 0, &mask, &h_offset);
}
}
/* The record is a timestamp. */
else {
struct time_record time_record;
offset = time_read(&time_record, reader, offset);
*time = time_record.time;
}
return offset;
}