reader_generate_index.cpp 24.21 KiB
/*******************************************************************************
* This file is part of CSDS.
* Copyright (c) 2021 Loic Hausammann (loic.hausammann@epfl.ch)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/* Standard headers */
#include <fstream>
#include <iostream>
/* Include corresponding header */
#include "reader.hpp"
/* Include local headers */
#include "index_file.hpp"
#include "logfile.hpp"
#include "openmp.hpp"
#include "particle_type.hpp"
#define hashmap_overallocation 1.25
using namespace std;
/**
* @brief Structure that contains all the information
* required to write an index file for a single particle type.
*/
struct index_writer {
/* Number of elements currently stored */
uint64_t size;
/* Size of the current buffer */
size_t capacity;
/* Buffer containing the particles */
struct index_data *data;
/* Initial size of the writer */
size_t init_size;
};
/**
* @brief Initialize the structure for the first time.
*
* @param writer The #index_writer.
* @param init_size The initial size of the writer.
*/
void index_writer_init(struct index_writer *writer, size_t init_size) {
/* Set the counters to their initial value */
writer->size = 0;
writer->capacity = init_size;
writer->init_size = init_size;
if (init_size == 0) {
writer->data = NULL;
return;
}
/* Allocate the memory */
writer->data =
(struct index_data *)malloc(sizeof(struct index_data) * init_size);
if (writer->data == NULL) {
csds_error("Failed to allocate memory for the index_writer.");
}
}
/**
* @brief Reset the structure.
*
* @param writer The #index_writer.
*/
void index_writer_reset(struct index_writer *writer) {
if (writer->size == 0) return;
free(writer->data);
index_writer_init(writer, writer->init_size);
}
/**
* @brief Free the structure.
*
* @param hist The #index_writer.
*/
void index_writer_free(struct index_writer *writer) {
/* Set the counters to 0 */
writer->size = 0;
writer->capacity = 0;
/* Free the memory */
if (writer->data != NULL) {
free(writer->data);
writer->data = NULL;
}
}
/**
* @brief Log a the particle information into the #index_writer.
*
* @param writer The #index_writer.
* @param id The ID of the particle.
* @param last_offset The last offset of the particle.
*/
void index_writer_log(struct index_writer *writer, const int64_t id,
const uint64_t last_offset) {
/* Ensure that it was correctly allocated */
if (writer->capacity == 0) {
csds_error(
"Trying to add a particle to an empty index_writer."
" If your are sure that your particle type is implemented, "
"please increase the value of ReaderOptions:Number*.");
}
#ifdef CSDS_DEBUG_CHECKS
if (id < 0) {
csds_error("Negative ID for a particle.");
}
#endif
const struct index_data data = {id, last_offset};
/* Check if enough space is left */
if (writer->size == writer->capacity) {
/* Compute the previous amount of memory */
const size_t memsize = sizeof(struct index_data) * writer->capacity;
/* Increase the capacity of the array */
writer->capacity *= 2;
/* Allocate the new array and copy the content of the previous one */
struct index_data *tmp = (struct index_data *)malloc(2 * memsize);
memcpy(tmp, writer->data, memsize);
/* Free the previous array and switch the pointers */
free(writer->data);
writer->data = tmp;
}
/* Save the new particle */
writer->data[writer->size] = data;
/* Increase the element counter */
writer->size += 1;
}
/**
* @brief Write the number of particles and their data into the index file
*/
void index_writer_write_in_index(const struct index_writer *writers,
ofstream &f) {
/* Write the number of particles */
uint64_t N_total[csds_type_count];
for (int type = 0; type < csds_type_count; type++) {
N_total[type] = writers[type].size;
}
f.write((char *)N_total, sizeof(uint64_t) * csds_type_count);
/* Write the index data */
for (int type = 0; type < csds_type_count; type++) {
if (N_total[type] == 0) continue;
const size_t size = sizeof(struct index_data) * writers[type].size;
f.write((const char *)writers[type].data, size);
}
}
/**
* @brief Write an index file.
*
* @param reader The #csds_reader.
* @param current_state The arrays containing the current
* state with the last full offset (size given by csds_type_count).
* @param parts_created The arrays containing the first reference
* (since last index file) of created particles
* (size given by csds_type_count).
* @param parts_removed The arrays containing the last reference
* (since last index file) of removed particles (size given by
* csds_type_count).
* @param time The time corresponding to the current index file.
* @param file_number The current file number.
*/
void Reader::WriteIndex(CsdsUnorderedMap ¤t_state,
struct index_writer *parts_created,
struct index_writer *parts_removed,
const struct time_record *time, int file_number) {
/* Get the filename */
string filename = this->GetIndexName(file_number);
/* Open file */
std::ofstream f(filename, std::ofstream::out | std::ofstream::binary);
if (!f.good()) {
csds_error("Failed to open the file: " << filename);
}
/* Write the time */
f.write((char *)&time->time, sizeof(double));
f.write((char *)&time->int_time, sizeof(integertime_t));
/* Write number of particles */
uint64_t N_total[csds_type_count];
for (int type = 0; type < csds_type_count; type++) {
N_total[type] = current_state[type].size();
}
f.write((char *)N_total, sizeof(uint64_t) * csds_type_count);
/* Write if the file is sorted */
// TODO change it if the array is sorted
const char sorted = 0;
f.write((char *)&sorted, sizeof(char));
/* Ensure the data to be aligned */
size_t cur_pos = f.tellp();
size_t d_align = ((cur_pos + 7) & ~7) - cur_pos;
if (d_align > 0) {
long int tmp = 0;
/* Fill the memory with garbage */
f.write((const char *)&tmp, d_align);
}
/* Write the current state */
for (int type = 0; type < csds_type_count; type++) {
if (N_total[type] == 0) continue;
for (auto const ¤t : current_state[type]) {
f.write((const char *)¤t.first, sizeof(id_type));
f.write((const char *)¤t.second, sizeof(uint64_t));
}
}
/* Now do the same with the particles created / removed */
index_writer_write_in_index(parts_created, f);
index_writer_write_in_index(parts_removed, f);
/* Cleanup the arrays */
for (int type = 0; type < csds_type_count; type++) {
index_writer_reset(&parts_created[type]);
index_writer_reset(&parts_removed[type]);
}
}
/**
* @brief Get the initial state of the simulation.
*
* @param reader The #csds_reader.
* @param current_state The arrays that will be updated with (size
* csds_type_count).
* @param time_record (output) The first #time_record in the logfile (for
* writing the index file).
*
* @return The offset of the second #time_record
* (the first record that does not correspond to the IC).
*/
size_t Reader::GetInitialState(CsdsUnorderedMap ¤t_state,
struct time_record *time_record) {
const high_resolution_clock::time_point init = high_resolution_clock::now();
/* Get a few variables. */
LogFile *log = this->mLog;
const struct header *h = &log->header;
const int size_record_header = CSDS_MASK_SIZE + CSDS_OFFSET_SIZE;
/* Warn the OS that we will read in a sequential way */
log->log->AdviceSequentialReading();
/* Get the offset after the dump of all the particles
and the time information of the first time record. */
if (log->times.Size() < 2) {
csds_error("The time array is not large enough");
}
const size_t offset_max = log->times[1].offset;
/* Here we cheat a bit by using the 0.
* The index files provide the last position known at a given offset.
* For the first index at t=0, the particles are not written yet.
* In order to be able to read t=0, we change a bit the behavior
* for the first index file.
*/
*time_record = log->times[0];
/* Get the offset of the first particle record */
size_t offset_first = h->offset_first_record;
/* Skip the time record */
struct record_header header;
log->log->ReadRecordHeader(offset_first, header);
const int time_size = header_get_record_size_from_mask(h, header.mask);
offset_first += time_size + size_record_header;
/* Get the initial state */
float next_percent = 0;
for (size_t offset = offset_first; offset < offset_max;) {
/* Get the particle type */
int part_type = 0;
int data = 0;
enum csds_special_flags flag = csds_particle_read_special_flag(
offset, header, &data, &part_type, *log->log);
if (flag != csds_flag_create) {
csds_error("Reading a particle from ICs without the created flag.");
}
/* Get the mask for the IDs */
const Field &field_id =
header_get_field_from_name(h, "ParticleIDs", (enum part_type)part_type);
/* Get the particle ID */
if (!(field_id.GetField() & header.mask)) {
csds_error("The particle ID is missing in the first log");
}
/* Read the particle ID */
int64_t id = 0;
csds_particle_read_field(offset, &id, field_id,
/* derivative */ 0, header, h->fields[part_type],
*this->mLog->log);
/* Log the particle */
if (current_state[part_type].count(id) != 0) {
csds_error("Already found a particle with the same ID");
}
current_state[part_type][id] = offset;
/* Increment the offset */
const int record_size = header_get_record_size_from_mask(h, header.mask);
offset += record_size + size_record_header;
/* Print the progress */
if (this->mVerbose > 0) {
float percent =
(float)(offset - offset_first) / (float)(offset_max - offset_first);
percent *= 100;
if (percent > next_percent) {
next_percent += 0.5;
tools_print_progress(percent, init, "Getting initial state");
}
}
}
/* Close progressbar */
if (this->mVerbose > 0) {
printf("\n");
}
/* Print the time */
if (this->mVerbose > 0 || this->mVerbose == CSDS_VERBOSE_TIMERS)
message("took " << GetDeltaTime(init) << "ms");
/* Go back to normal */
log->log->AdviceNormalReading();
return offset_max;
}
/**
* @brief Get the last offset of a record before a given offset.
*
* @param reader The #csds_reader.
* @param data The #index_data of the particle.
* @param offset_limit The upper limit of the offset to get.
*
* @return The last offset before an index file.
*/
size_t Reader::GetLastOffsetBefore(const struct index_data &data,
size_t offset_limit) {
/* Get a the logfile */
LogFile *log = this->mLog;
size_t current_offset = data.offset;
/* Get the full mask */
struct record_header last_header;
log->log->ReadRecordHeader(current_offset, last_header);
/* Ensures that a special flag is present in the mask */
last_header.mask |= CSDS_SPECIAL_FLAGS_MASK;
/* Now remove it */
last_header.mask = last_header.mask ^ CSDS_SPECIAL_FLAGS_MASK;
/* Find the last offset before the current time */
size_t last_full_offset = current_offset;
current_offset += last_header.offset;
while (1) {
/* Get the mask */
struct record_header cur_header;
log->log->ReadRecordHeader(current_offset, cur_header);
/* update the offset */
current_offset += cur_header.offset;
if (current_offset > offset_limit) {
break;
}
/* The particle should not have a special flag
due to the previous loop */
if (cur_header.mask & CSDS_SPECIAL_FLAGS_MASK) {
csds_error("Found a special flag when updating the particles");
}
/* Update the last full offset */
if (last_header.mask == cur_header.mask) {
last_full_offset = current_offset;
}
/* Are we at the end of the file? */
if (cur_header.offset == 0) {
break;
}
}
return last_full_offset;
}
/**
* @brief Update the state of the simulation until the next index file along
* with the history.
*
* @param reader The #csds_reader.
* @param init_offset The initial offset to read.
* @param time_record The #time_record of the next index file.
* @param current_state The arrays containing the state of the simulation (size
* csds_type_count).
* @param parts_created The arrays containing the particles created since last
* index (size csds_type_count).
* @param parts_removed The arrays containing the particles removed since last
* index (size csds_type_count).
*
* @return The starting offset for the update.
*/
size_t Reader::UpdateStateToNextIndex(size_t init_offset,
struct time_record time_record,
CsdsUnorderedMap ¤t_state,
struct index_writer *parts_created,
struct index_writer *parts_removed) {
LogFile *log = this->mLog;
const struct header *h = &log->header;
const int size_record_header = CSDS_MASK_SIZE + CSDS_OFFSET_SIZE;
/* Look for all the created / removed particles */
size_t offset = init_offset;
int step = 0;
/* Record the initial time */
const high_resolution_clock::time_point init = high_resolution_clock::now();
/* Warn the OS that we will read in a sequential way */
log->log->AdviceSequentialReading();
while (offset < time_record.offset) {
/* Print status */
if (this->mVerbose > 0) {
step += 1;
if (step % 100 == 0) {
step = 0;
/* Get the percentage */
float percent = offset - init_offset;
percent /= time_record.offset - init_offset;
percent *= 100.f;
tools_print_progress(percent, init,
"Looking for new or removed particles");
}
}
int part_type = -1; // only available if the record is flagged.
int data = 0;
/* Get the mask */
struct record_header header;
log->log->ReadRecordHeader(offset, header);
/* Go to the next record */
const size_t old_offset = offset;
offset += header_get_record_size_from_mask(h, header.mask);
offset += size_record_header;
/* Check if we have a particle with a flag */
if (header.mask & CSDS_TIMESTAMP_MASK ||
!(header.mask & CSDS_SPECIAL_FLAGS_MASK)) {
continue;
}
/* Get the special flag */
enum csds_special_flags flag = csds_particle_read_special_flag(
old_offset, header, &data, &part_type, *log->log);
#ifdef CSDS_DEBUG_CHECKS
if (flag == csds_flag_none) {
csds_error(
"A record should not have a mask "
"for a flag and the flag set to 0");
}
#endif
/* Check if we have a meaningful particle type */
if (part_type < 0) {
csds_error("Found a special flag without a particle type");
}
/* Get the mask for the IDs */
// TODO create an array outside the loop
const Field &field_id =
header_get_field_from_name(h, "ParticleIDs", (enum part_type)part_type);
/* Read the ID */
int64_t id = 0;
csds_particle_read_field(old_offset, &id, field_id,
/* derivative */ 0, header, h->fields[part_type],
*this->mLog->log);
/* Add the particle to the arrays */
if (flag == csds_flag_change_type || flag == csds_flag_mpi_exit ||
flag == csds_flag_delete) {
index_writer_log(&parts_removed[part_type], id, old_offset);
if (current_state[part_type].count(id) != 1) {
csds_error("Failed to remove a particle");
};
current_state[part_type].erase(id);
} else if (flag == csds_flag_create || flag == csds_flag_mpi_enter) {
index_writer_log(&parts_created[part_type], id, old_offset);
if (current_state[part_type].count(id) != 0) {
csds_error("Already found a particle with the same ID");
}
current_state[part_type][id] = old_offset;
}
}
/* Go back to normal */
log->log->AdviceNormalReading();
/* Cleanup output */
if (this->mVerbose > 0) {
printf("\n");
}
/* Print the time */
if (this->mVerbose > 0 || this->mVerbose == CSDS_VERBOSE_TIMERS)
message("Finding new/removed particles took " << GetDeltaTime(init)
<< "ms");
/* Initialize the total number of particles for the progress bar */
size_t total_number_particles = 0;
for (int type = 0; type < csds_type_count; type++) {
total_number_particles += current_state[type].size();
}
/* Record the time */
const high_resolution_clock::time_point init2 = high_resolution_clock::now();
size_t current_number = 0;
int local_counter = 0;
const int local_update_limit = 10000;
/* Update the offsets of current_state
* No need to update the others as they contain
* data about when particles are removed/created*/
for (int type = 0; type < csds_type_count; type++) {
/* Loop over the buckets ~ particles */
#pragma omp parallel for firstprivate(local_counter)
for (size_t bucket = 0; bucket < current_state[type].bucket_count();
bucket++) {
for (auto cur = current_state[type].begin(bucket);
cur != current_state[type].end(bucket); cur++) {
struct index_data index_data = {
.id = cur->first,
.offset = cur->second,
};
/* Update the offset */
cur->second = this->GetLastOffsetBefore(index_data, time_record.offset);
/* Are we done or should we print something? */
if (!(this->mVerbose > 0)) continue;
/* Update the counters */
local_counter++;
if (local_counter < local_update_limit) continue;
/* Add the local counter to the global one */
#pragma omp atomic
current_number += local_counter;
local_counter = 0;
/* Only the master is printing */
int current_thread = csds_get_thread_num();
if (current_thread != 0) continue;
float ratio = (float)current_number / (float)total_number_particles;
/* Print the message */
tools_print_progress(100 * ratio, init2, "Updating offsets");
}
}
}
/* Cleanup the output */
if (this->mVerbose > 0) printf("\n");
/* Print the time */
if (this->mVerbose > 0 || this->mVerbose == CSDS_VERBOSE_TIMERS)
message("Updating particles took " << GetDeltaTime(init2) << "ms");
return offset;
}
/**
* @brief Generate the index files from the logfile.
*
* @param reader The #csds_reader.
* @param number_index The number of index to generate.
* @param current_index Current index to write (> 0 if restart)
*/
void Reader::GenerateIndexFiles(int number_index, int current_index) {
/* Get a few pointers */
LogFile *log = this->mLog;
const struct header *h = &log->header;
/* Write a quick message */
if (this->mVerbose > 0) {
message("Generating " << number_index << " index files");
if (current_index) {
message("Restarting from index: " << current_index);
}
}
/* Check that the number of index is meaningful */
if (number_index < 2) {
csds_error("The CSDS requires at least 2 index files.");
}
/* Ensure that the offset are in the assumed direction */
if (!header_is_forward(h)) {
csds_error("The offset are not in the expected direction");
}
/* Create the different arrays that will store the information */
CsdsUnorderedMap current_state;
struct index_writer parts_created[csds_type_count];
struct index_writer parts_removed[csds_type_count];
const size_t default_size = 1024;
/* Allocate the arrays for new / removed particles */
for (int i = 0; i < csds_type_count; i++) {
index_writer_init(&parts_created[i], default_size);
index_writer_init(&parts_removed[i], default_size);
}
/* Current offset to read */
size_t offset = 0;
/* Variables for the time of each index files */
const double t_min = log->times[0].time;
const double t_max = log->times[log->times.Size() - 1].time;
const double dt = (t_max - t_min) / (number_index - 1);
/* Are we restarting? If not allocate and get the initial state */
if (current_index == 0) {
/* Allocate the arrays for the current state */
for (int i = 0; i < csds_type_count; i++) {
current_state[i].reserve(mParams.GetApproximateNumberParticles()[i] *
hashmap_overallocation);
}
/* Get the initial state */
struct time_record time_record;
offset = this->GetInitialState(current_state, &time_record);
/* Write the first index file */
this->WriteIndex(current_state, parts_created, parts_removed, &time_record,
/* file_number */ 0);
}
/* We are restarting => read state from file */
else {
/* Get the index file name */
string filename = this->GetIndexName(current_index - 1);
/* Initialize the index file */
IndexFile index(filename, this->mVerbose);
index.MapFile(filename, /* sorted */ 1, this->mVerbose);
/* Loop over all the particle types */
for (int i = 0; i < csds_type_count; i++) {
/* Allocate the array for the current state */
current_state[i].reserve(hashmap_overallocation *
index.GetNumberParticles((part_type)i));
/* Copy the index file into the arrays. */
for (size_t p = 0; p < index.GetNumberParticles((part_type)i); p++) {
struct index_data *data = index.GetData((part_type)i);
if (current_state[i].count(data->id) != 0) {
csds_error("Already found a particle with the same ID");
}
current_state[i][data->id] = data->offset;
}
}
/* Set the last offset read */
if (current_index == 1) {
/* In this case, we need to cheat a bit (see
* GetInitialState) */
offset = log->times[1].offset;
} else {
const double current_approximate_time = t_min + (current_index - 1) * dt;
const size_t index_time =
log->times.GetIndexFromTime(current_approximate_time);
struct time_record index_time_record = log->times[index_time];
offset = index_time_record.offset;
/* Check if we are reading the correct file */
if (index_time_record.int_time != index.GetIntegerTime()) {
csds_error(
"The time in the index file and the expected one do not match");
}
}
}
/* Compute the state of all the other files and write them. */
for (int file_number = 1; file_number < number_index; file_number++) {
/* Skip the index files that have already been written. */
if (file_number < current_index) continue;
/* Get the corresponding time record.
* The index files are only here to speedup the code,
* no need to have the exact time. */
const double current_approximate_time = t_min + file_number * dt;
const size_t index_time =
log->times.GetIndexFromTime(current_approximate_time);
struct time_record time_record = log->times[index_time];
/* Ensure that we really have the final time (rounding error). */
if (file_number == number_index - 1) {
time_record = log->times[log->times.Size() - 1];
}
/* Update the state until the next index file. */
offset = this->UpdateStateToNextIndex(offset, time_record, current_state,
parts_created, parts_removed);
/* Write the index file */
this->WriteIndex(current_state, parts_created, parts_removed, &time_record,
file_number);
}
/* Free the memory */
for (int type = 0; type < csds_type_count; type++) {
index_writer_free(&parts_created[type]);
index_writer_free(&parts_removed[type]);
}
if (this->mVerbose > 0) {
message("Generation done");
}
}