csds_reader_generate_index.c 24.10 KiB
/*******************************************************************************
* This file is part of CSDS.
* Copyright (c) 2021 Loic Hausammann (loic.hausammann@epfl.ch)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/* Include corresponding header */
#include "csds_reader.h"
/* Include local headers */
#include "csds_hashmap.h"
#include "csds_index.h"
#include "csds_logfile.h"
#include "csds_openmp.h"
#include "csds_part_type.h"
/**
* @brief Structure that contains all the information
* required to write an index file for a single particle type.
*/
struct index_writer {
/* Number of elements currently stored */
uint64_t size;
/* Size of the current buffer */
size_t capacity;
/* Buffer containing the particles */
struct index_data *data;
/* Initial size of the writer */
size_t init_size;
/* Number of tagged particles */
size_t number_tag;
/* Maximal fraction of tagged particles */
float max_frac_tag;
};
/**
* @brief Initialize the structure for the first time.
*
* @param writer The #index_writer.
* @param init_size The initial size of the writer.
* @param max_frac_tag The maximal fraction of tagged particles.
*/
void index_writer_init(struct index_writer *writer, size_t init_size,
float max_frac_tag) {
/* Set the counters to their initial value */
writer->size = 0;
writer->capacity = init_size;
writer->init_size = init_size;
writer->number_tag = 0;
writer->max_frac_tag = max_frac_tag;
if (init_size == 0) {
writer->data = NULL;
return;
}
/* Allocate the memory */
writer->data =
(struct index_data *)malloc(sizeof(struct index_data) * init_size);
if (writer->data == NULL) {
error("Failed to allocate memory for the index_writer.");
}
}
/**
* @brief Reset the structure.
*
* @param writer The #index_writer.
*/
void index_writer_reset(struct index_writer *writer) {
if (writer->size == 0) return;
free(writer->data);
index_writer_init(writer, writer->init_size, writer->max_frac_tag);
}
/**
* @brief Free the structure.
*
* @param hist The #index_writer.
*/
void index_writer_free(struct index_writer *writer) {
/* Set the counters to 0 */
writer->size = 0;
writer->capacity = 0;
/* Free the memory */
if (writer->data != NULL) {
free(writer->data);
writer->data = NULL;
}
}
/**
* @brief Log a the particle information into the #index_writer.
*
* @param writer The #index_writer.
* @param id The ID of the particle.
* @param last_offset The last offset of the particle.
*/
void index_writer_log(struct index_writer *writer, const int64_t id,
const uint64_t last_offset) {
/* Ensure that it was correctly allocated */
if (writer->capacity == 0) {
error(
"Trying to add a particle to an empty index_writer."
" If your are sure that your particle type is implemented, "
"please increase the value of ReaderOptions:Number*.");
}
#ifdef CSDS_DEBUG_CHECKS
if (id < 0) {
error("Negative ID for a particle.");
}
#endif
const struct index_data data = {id, last_offset};
/* Check if enough space is left */
if (writer->size == writer->capacity) {
/* Compute the previous amount of memory */
const size_t memsize = sizeof(struct index_data) * writer->capacity;
/* Increase the capacity of the array */
writer->capacity *= 2;
/* Allocate the new array and copy the content of the previous one */
struct index_data *tmp = (struct index_data *)malloc(2 * memsize);
memcpy(tmp, writer->data, memsize);
/* Free the previous array and switch the pointers */
free(writer->data);
writer->data = tmp;
}
/* Save the new particle */
writer->data[writer->size] = data;
/* Increase the element counter */
writer->size += 1;
}
/**
*@brief Check if we have some unimplemented particles.
*/
void index_writer_check_implemented(const struct index_writer *writers) {
if (writers[csds_type_sink].size != 0 ||
writers[csds_type_black_hole].size != 0 ||
writers[csds_type_neutrino].size != 0) {
error("TODO implement additional particle types");
}
}
/**
* @brief Write the number of particles and their data into the index file
*/
void index_writer_write_in_index(const struct index_writer *writers, FILE *f) {
/* Write the number of particles */
uint64_t N_total[csds_type_count];
for (int type = 0; type < csds_type_count; type++) {
N_total[type] = writers[type].size;
}
fwrite(N_total, sizeof(uint64_t), csds_type_count, f);
/* Write the index data */
for (int type = 0; type < csds_type_count; type++) {
if (N_total[type] == 0) continue;
fwrite(writers[type].data, sizeof(struct index_data), writers[type].size,
f);
}
}
/**
* @brief Write an index file.
*
* @param reader The #csds_reader.
* @param current_state The arrays containing the current
* state with the last full offset (size given by csds_type_count).
* @param parts_created The arrays containing the first reference
* (since last index file) of created particles
* (size given by csds_type_count).
* @param parts_removed The arrays containing the last reference
* (since last index file) of removed particles (size given by
* csds_type_count).
* @param time The time corresponding to the current index file.
* @param file_number The current file number.
*/
void csds_reader_write_index(const struct csds_reader *reader,
struct csds_hashmap **current_state,
struct index_writer *parts_created,
struct index_writer *parts_removed,
const struct time_record *time, int file_number) {
/* Get the filename */
char filename[CSDS_STRING_SIZE + 15];
sprintf(filename, "%s_%04i.index", reader->basename, file_number);
/* Check that we have only implemented particles */
if (csds_hashmap_count(current_state[csds_type_sink]) != 0 ||
csds_hashmap_count(current_state[csds_type_black_hole]) != 0 ||
csds_hashmap_count(current_state[csds_type_neutrino]) != 0)
error("Not implemented");
index_writer_check_implemented(parts_created);
index_writer_check_implemented(parts_removed);
/* Open file */
FILE *f = NULL;
f = fopen(filename, "wb");
if (f == NULL) {
error("Failed to open file %s", filename);
}
/* Write double time */
fwrite(&time->time, sizeof(double), 1, f);
/* Write integer time */
fwrite(&time->int_time, sizeof(integertime_t), 1, f);
/* Write number of particles */
uint64_t N_total[csds_type_count];
for (int type = 0; type < csds_type_count; type++) {
N_total[type] = csds_hashmap_count(current_state[type]);
}
fwrite(N_total, sizeof(uint64_t), csds_type_count, f);
/* Write if the file is sorted */
// TODO change it if the array is sorted
const char sorted = 0;
fwrite(&sorted, sizeof(char), 1, f);
/* Ensure the data to be aligned */
size_t cur_pos = ftell(f);
size_t d_align = ((cur_pos + 7) & ~7) - cur_pos;
if (d_align > 0) {
long int tmp = 0;
/* Fill the memory with garbage */
fwrite(&tmp, d_align, 1, f);
}
/* Write the current state */
for (int type = 0; type < csds_type_count; type++) {
if (N_total[type] == 0) continue;
// TODO memory map the file
csds_hashmap_write(current_state[type], f);
}
/* Now do the same with the particles created / removed */
index_writer_write_in_index(parts_created, f);
index_writer_write_in_index(parts_removed, f);
/* Close the file */
fclose(f);
/* Cleanup the arrays */
for (int type = 0; type < csds_type_count; type++) {
index_writer_reset(&parts_created[type]);
index_writer_reset(&parts_removed[type]);
}
}
/**
* @brief Get the initial state of the simulation.
*
* @param reader The #csds_reader.
* @param current_state The arrays that will be updated with (size
* csds_type_count).
* @param time_record (output) The first #time_record in the logfile (for
* writing the index file).
*
* @return The offset of the second #time_record
* (the first record that does not correspond to the IC).
*/
size_t csds_reader_get_initial_state(const struct csds_reader *reader,
struct csds_hashmap **current_state,
struct time_record *time_record) {
const ticks tic = getticks();
/* Get a few variables. */
const struct csds_logfile *log = &reader->log;
const struct header *h = &log->header;
const int size_record_header = CSDS_MASK_SIZE + CSDS_OFFSET_SIZE;
/* Warn the OS that we will read in a sequential way */
CSDS_ADVICE_SEQUENTIAL(log->log);
/* Get the offset after the dump of all the particles
and the time information of the first time record. */
if (log->times.size < 2) {
error("The time array is not large enough");
}
const size_t offset_max = log->times.records[1].offset;
*time_record = log->times.records[0];
/* Get the offset of the first particle record */
size_t offset_first = h->offset_first_record;
/* Skip the time record */
mask_type time_mask = 0;
csds_loader_io_read_mask(log->log.map + offset_first, &time_mask, NULL);
const int time_size = header_get_record_size_from_mask(h, time_mask);
offset_first += time_size + size_record_header;
/* Get the initial state */
for (size_t offset = offset_first; offset < offset_max;) {
/* Get the particle type */
mask_type mask = 0;
size_t prev_offset = 0;
int part_type = 0;
int data = 0;
enum csds_special_flags flag = csds_particle_read_special_flag(
offset, &mask, &prev_offset, &data, &part_type, log->log.map);
if (flag != csds_flag_create) {
error("Reading a particle from ICs without the created flag.");
}
/* TODO Implement missing particle types */
if (part_type == csds_type_neutrino || part_type == csds_type_sink ||
part_type == csds_type_black_hole) {
error("Particle type not implemented (%s)", part_type_names[part_type]);
}
/* Get the mask for the IDs */
const struct field_information *field_id =
header_get_field_from_name(h, "ParticleIDs",
(enum part_type) part_type);
/* Get the particle ID */
if (!(mask & field_id->field.mask)) {
error("The particle ID is missing in the first log");
}
/* Read the particle ID */
int64_t id = 0;
csds_particle_read_field(offset, &id, field_id,
/* derivative */ 0, &mask, &prev_offset,
h->fields[part_type], h->number_fields[part_type],
reader->log.log.map);
/* Log the particle */
struct index_data item = {id, offset};
void *p = (void *)csds_hashmap_set(current_state[part_type], &item);
if (p != NULL) {
error("Already found a particle with the same ID");
}
/* Increment the offset */
const int record_size = header_get_record_size_from_mask(h, mask);
offset += record_size + size_record_header;
}
/* Print the time */
if (reader->verbose > 0 || reader->verbose == CSDS_VERBOSE_TIMERS)
message("took %.3f %s", clocks_from_ticks(getticks() - tic),
clocks_getunit());
/* Go back to normal */
CSDS_ADVICE_NORMAL(log->log);
return offset_max;
}
/**
* @brief Get the last offset of a record before a given offset.
*
* @param reader The #csds_reader.
* @param data The #index_data of the particle.
* @param offset_limit The upper limit of the offset to get.
*
* @return The last offset before an index file.
*/
size_t csds_reader_get_last_offset_before(const struct csds_reader *reader,
const struct index_data *data,
size_t offset_limit) {
/* Get a the logfile */
const struct csds_logfile *log = &reader->log;
size_t current_offset = data->offset;
/* Get the full mask */
mask_type full_mask = 0;
size_t h_offset = 0;
csds_loader_io_read_mask(log->log.map + current_offset, &full_mask,
&h_offset);
/* Ensures that a special flag is present in the mask */
full_mask |= CSDS_SPECIAL_FLAGS_MASK;
/* Now remove it */
full_mask = full_mask ^ CSDS_SPECIAL_FLAGS_MASK;
/* Find the last offset before the current time */
size_t last_full_offset = current_offset;
current_offset += h_offset;
while (1) {
/* Get the mask */
mask_type mask = 0;
h_offset = 0;
csds_loader_io_read_mask(log->log.map + current_offset, &mask, &h_offset);
/* update the offset */
current_offset += h_offset;
if (current_offset > offset_limit) {
break;
}
/* The particle should not have a special flag
due to the previous loop */
if (mask & CSDS_SPECIAL_FLAGS_MASK) {
error("Found a special flag when updating the particles");
}
/* Update the last full offset */
if (full_mask == mask) {
last_full_offset = current_offset;
}
/* Are we at the end of the file? */
if (h_offset == 0) {
break;
}
}
return last_full_offset;
}
/**
* @brief Update the state of the simulation until the next index file along
* with the history.
*
* @param reader The #csds_reader.
* @param init_offset The initial offset to read.
* @param time_record The #time_record of the next index file.
* @param current_state The arrays containing the state of the simulation (size
* csds_type_count).
* @param parts_created The arrays containing the particles created since last
* index (size csds_type_count).
* @param parts_removed The arrays containing the particles removed since last
* index (size csds_type_count).
*
* @return The starting offset for the update.
*/
size_t csds_reader_update_state_to_next_index(
const struct csds_reader *reader, size_t init_offset,
struct time_record time_record, struct csds_hashmap **current_state,
struct index_writer *parts_created, struct index_writer *parts_removed) {
const struct csds_logfile *log = &reader->log;
const struct header *h = &log->header;
const int size_record_header = CSDS_MASK_SIZE + CSDS_OFFSET_SIZE;
/* Look for all the created / removed particles */
size_t offset = init_offset;
int step = 0;
if (reader->verbose > 0) printf("\n");
/* Record the initial time */
const ticks tic = getticks();
/* Get the mask for the IDs */
const struct field_information *field_id =
header_get_field_from_name(h, "ParticleIDs", csds_type_dark_matter);
/* Warn the OS that we will read in a sequential way */
CSDS_ADVICE_SEQUENTIAL(log->log);
while (offset < time_record.offset) {
/* Print status */
if (reader->verbose > 0) {
step += 1;
if (step % 1000 == 0) {
step = 0;
/* Get the percentage */
float percent = offset - init_offset;
percent /= time_record.offset - init_offset;
percent *= 100.f;
/* Get the remaining time */
const int current_time =
clocks_diff_ticks(getticks(), clocks_start_ticks) / 1000.0;
const int init_time =
clocks_diff_ticks(tic, clocks_start_ticks) / 1000.0;
const int remaining_time =
(current_time - init_time) * (100. - percent) / percent;
tools_print_progress(percent, remaining_time,
"Looking for new or removed particles");
}
}
mask_type mask = 0;
size_t h_offset = 0;
csds_loader_io_read_mask(log->log.map + offset, &mask, &h_offset);
/* Go to the next record */
const size_t old_offset = offset;
offset += header_get_record_size_from_mask(h, mask);
offset += size_record_header;
/* Check if we have a particle with a flag */
if (mask & CSDS_TIMESTAMP_MASK) {
continue;
}
/* Read the ID */
int64_t id = 0;
csds_particle_read_field(old_offset, &id, field_id,
/* derivative */ 0, &mask, &h_offset,
h->fields[csds_type_dark_matter],
h->number_fields[csds_type_dark_matter],
reader->log.log.map);
struct index_data item = {id, old_offset};
if (!(mask & CSDS_SPECIAL_FLAGS_MASK)) {
void *p = csds_hashmap_set(current_state[csds_type_dark_matter], &item);
if (p == NULL)
error("Found a new particle without mask");
continue;
}
/* Get the special flag */
int data = 0;
int part_type = -1;
enum csds_special_flags flag = csds_particle_read_special_flag(
old_offset, &mask, &h_offset, &data, &part_type, log->log.map);
if (part_type != csds_type_dark_matter)
error("Wrong type");
#ifdef CSDS_DEBUG_CHECKS
if (flag == csds_flag_none) {
error(
"A record should not have a mask "
"for a flag and the flag set to 0");
}
#endif
/* Check if we have a meaningful particle type */
if (part_type < 0) {
error("Found a special flag without a particle type");
}
/* Add the particle to the arrays */
if (flag == csds_flag_change_type || flag == csds_flag_mpi_exit ||
flag == csds_flag_delete) {
index_writer_log(&parts_removed[part_type], id, old_offset);
if (csds_hashmap_delete(current_state[part_type], id) == NULL) {
error("Failed to remove a particle");
};
} else if (flag == csds_flag_create || flag == csds_flag_mpi_enter) {
index_writer_log(&parts_created[part_type], id, old_offset);
void *p = (void *)csds_hashmap_set(current_state[part_type], &item);
if (p != NULL) {
error("Already found a particle with the same ID");
}
}
}
/* Go back to normal */
CSDS_ADVICE_NORMAL(log->log);
/* Cleanup output */
if (reader->verbose > 0) {
printf("\n");
}
/* Print the time */
if (reader->verbose > 0 || reader->verbose == CSDS_VERBOSE_TIMERS)
message("Updating particles took %.3f %s",
clocks_from_ticks(getticks() - tic), clocks_getunit());
return offset;
}
/**
* @brief Generate the index files from the logfile.
*
* @param reader The #csds_reader.
* @param number_index The number of index to generate.
* @param current_index Current index to write (> 0 if restart)
*/
void csds_reader_generate_index_files(const struct csds_reader *reader,
int number_index, int current_index) {
/* Get a few pointers */
const struct csds_logfile *log = &reader->log;
const struct header *h = &log->header;
/* Write a quick message */
if (reader->verbose > 0) {
message("Generating %i index files", number_index);
if (current_index) {
message("Restarting from index %i", current_index);
}
}
/* Check that the number of index is meaningful */
if (number_index < 2) {
error("The CSDS requires at least 2 index files.");
}
/* Ensure that the offset are in the assumed direction */
if (!header_is_forward(h)) {
error("The offset are not in the expected direction");
}
/* Create the different arrays that will store the information */
struct csds_hashmap *current_state[csds_type_count];
struct index_writer parts_created[csds_type_count];
struct index_writer parts_removed[csds_type_count];
const size_t default_size = 1024;
/* Allocate the arrays for new / removed particles */
for (int i = 0; i < csds_type_count; i++) {
index_writer_init(&parts_created[i], default_size,
reader->params.arrays_maximal_tagged_fraction);
index_writer_init(&parts_removed[i], default_size,
reader->params.arrays_maximal_tagged_fraction);
}
/* Current offset to read */
size_t offset = 0;
/* Variables for the time of each index files */
const double t_min = log->times.records[0].time;
const double t_max = log->times.records[log->times.size - 1].time;
const double dt = (t_max - t_min) / (number_index - 1);
/* Are we restarting? If not allocate and get the initial state */
if (current_index == 0) {
/* Allocate the arrays for the current state */
for (int i = 0; i < csds_type_count; i++) {
current_state[i] =
csds_hashmap_new(hashmap_overallocation *
reader->params.approximate_number_particles[i]);
if (current_state[i] == NULL) {
error("Failed to initialize the hashmap");
}
}
/* Get the initial state */
struct time_record time_record;
offset = csds_reader_get_initial_state(reader, current_state, &time_record);
/* Write the first index file */
csds_reader_write_index(reader, current_state, parts_created, parts_removed,
&time_record, /* file_number */ 0);
}
/* We are restarting => read state from file */
else {
/* Initialize the index file */
struct csds_index index;
csds_index_init(&index, reader);
/* Get its name */
char filename[CSDS_STRING_SIZE + 50];
sprintf(filename, "%s_%04u.index", reader->basename, current_index - 1);
/* Read it */
csds_index_read_header(&index, filename);
csds_index_map_file(&index, filename, /* sorted */ 1);
/* Loop over all the particle types */
for (int i = 0; i < csds_type_count; i++) {
/* Allocate the array for the current state */
current_state[i] =
csds_hashmap_new(hashmap_overallocation * index.nparts[i]);
if (current_state[i] == NULL) {
error("Failed to initialize the hashmap");
}
/* Copy the index file into the arrays. */
for (size_t p = 0; p < index.nparts[i]; p++) {
struct index_data *data = csds_index_get_data(&index, i);
void *out = (void *)csds_hashmap_set(current_state[i], data + p);
if (out != NULL) {
error("Already found a particle with the same ID");
}
}
}
/* Set the last offset read */
const double current_approximate_time = t_min + (current_index - 1) * dt;
const size_t index_time =
time_array_get_index_from_time(&log->times, current_approximate_time);
struct time_record index_time_record = log->times.records[index_time];
offset = index_time_record.offset;
/* Check if we are reading the correct file */
if (index_time_record.int_time != index.integer_time) {
error("The time in the index file and the expected one do not match");
}
/* Cleanup */
csds_index_free(&index);
}
/* Compute the state of all the other files and write them. */
for (int file_number = 1; file_number < number_index; file_number++) {
/* Skip the index files that have already been written. */
if (file_number < current_index) continue;
/* Get the corresponding time record.
* The index files are only here to speedup the code,
* no need to have the exact time. */
const double current_approximate_time = t_min + file_number * dt;
const size_t index_time =
time_array_get_index_from_time(&log->times, current_approximate_time);
struct time_record time_record = log->times.records[index_time];
/* Ensure that we really have the final time (rounding error). */
if (file_number == number_index - 1) {
time_record = log->times.records[log->times.size - 1];
}
/* Update the state until the next index file. */
offset = csds_reader_update_state_to_next_index(
reader, offset, time_record, current_state, parts_created,
parts_removed);
/* Write the index file */
csds_reader_write_index(reader, current_state, parts_created, parts_removed,
&time_record, file_number);
}
/* Free the memory */
for (int type = 0; type < csds_type_count; type++) {
csds_hashmap_free(current_state[type]);
index_writer_free(&parts_created[type]);
index_writer_free(&parts_removed[type]);
}
if (reader->verbose > 0) {
message("Generation done");
}
}