Skip to content
Snippets Groups Projects
reader_generate_index.cpp 24.21 KiB
/*******************************************************************************
 * This file is part of CSDS.
 * Copyright (c) 2021 Loic Hausammann (loic.hausammann@epfl.ch)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 ******************************************************************************/

/* Standard headers */
#include <fstream>
#include <iostream>

/* Include corresponding header */
#include "reader.hpp"

/* Include local headers */
#include "index_file.hpp"
#include "logfile.hpp"
#include "openmp.hpp"
#include "particle_type.hpp"

#define hashmap_overallocation 1.25

using namespace std;

/**
 * @brief Structure that contains all the information
 * required to write an index file for a single particle type.
 */
struct index_writer {
  /* Number of elements currently stored */
  uint64_t size;

  /* Size of the current buffer */
  size_t capacity;

  /* Buffer containing the particles */
  struct index_data *data;

  /* Initial size of the writer */
  size_t init_size;
};

/**
 * @brief Initialize the structure for the first time.
 *
 * @param writer The #index_writer.
 * @param init_size The initial size of the writer.
 */
void index_writer_init(struct index_writer *writer, size_t init_size) {

  /* Set the counters to their initial value */
  writer->size = 0;
  writer->capacity = init_size;
  writer->init_size = init_size;

  if (init_size == 0) {
    writer->data = NULL;
    return;
  }

  /* Allocate the memory */
  writer->data =
      (struct index_data *)malloc(sizeof(struct index_data) * init_size);
  if (writer->data == NULL) {
    csds_error("Failed to allocate memory for the index_writer.");
  }
}

/**
 * @brief Reset the structure.
 *
 * @param writer The #index_writer.
 */
void index_writer_reset(struct index_writer *writer) {
  if (writer->size == 0) return;

  free(writer->data);
  index_writer_init(writer, writer->init_size);
}

/**
 * @brief Free the structure.
 *
 * @param hist The #index_writer.
 */
void index_writer_free(struct index_writer *writer) {
  /* Set the counters to 0 */
  writer->size = 0;
  writer->capacity = 0;

  /* Free the memory */
  if (writer->data != NULL) {
    free(writer->data);
    writer->data = NULL;
  }
}

/**
 * @brief Log a the particle information into the #index_writer.
 *
 * @param writer The #index_writer.
 * @param id The ID of the particle.
 * @param last_offset The last offset of the particle.
 */
void index_writer_log(struct index_writer *writer, const int64_t id,
                      const uint64_t last_offset) {

  /* Ensure that it was correctly allocated */
  if (writer->capacity == 0) {
    csds_error(
        "Trying to add a particle to an empty index_writer."
        " If your are sure that your particle type is implemented, "
        "please increase the value of ReaderOptions:Number*.");
  }

#ifdef CSDS_DEBUG_CHECKS
  if (id < 0) {
    csds_error("Negative ID for a particle.");
  }
#endif

  const struct index_data data = {id, last_offset};

  /* Check if enough space is left */
  if (writer->size == writer->capacity) {
    /* Compute the previous amount of memory */
    const size_t memsize = sizeof(struct index_data) * writer->capacity;
    /* Increase the capacity of the array */
    writer->capacity *= 2;

    /* Allocate the new array and copy the content of the previous one */
    struct index_data *tmp = (struct index_data *)malloc(2 * memsize);

    memcpy(tmp, writer->data, memsize);

    /* Free the previous array and switch the pointers */
    free(writer->data);
    writer->data = tmp;
  }

  /* Save the new particle */
  writer->data[writer->size] = data;

  /* Increase the element counter */
  writer->size += 1;
}

/**
 * @brief Write the number of particles and their data into the index file
 */
void index_writer_write_in_index(const struct index_writer *writers,
                                 ofstream &f) {

  /* Write the number of particles */
  uint64_t N_total[csds_type_count];
  for (int type = 0; type < csds_type_count; type++) {
    N_total[type] = writers[type].size;
  }
  f.write((char *)N_total, sizeof(uint64_t) * csds_type_count);

  /* Write the index data */
  for (int type = 0; type < csds_type_count; type++) {
    if (N_total[type] == 0) continue;

    const size_t size = sizeof(struct index_data) * writers[type].size;
    f.write((const char *)writers[type].data, size);
  }
}

/**
 * @brief Write an index file.
 *
 * @param reader The #csds_reader.
 * @param current_state The arrays containing the current
 * state with the last full offset (size given by csds_type_count).
 * @param parts_created The arrays containing the first reference
 * (since last index file) of created particles
 * (size given by csds_type_count).
 * @param parts_removed The arrays containing the last reference
 * (since last index file) of removed particles (size given by
 * csds_type_count).
 * @param time The time corresponding to the current index file.
 * @param file_number The current file number.
 */
void Reader::WriteIndex(CsdsUnorderedMap &current_state,
                        struct index_writer *parts_created,
                        struct index_writer *parts_removed,
                        const struct time_record *time, int file_number) {

  /* Get the filename */
  string filename = this->GetIndexName(file_number);

  /* Open file */
  std::ofstream f(filename, std::ofstream::out | std::ofstream::binary);
  if (!f.good()) {
    csds_error("Failed to open the file: " << filename);
  }

  /* Write the time */
  f.write((char *)&time->time, sizeof(double));
  f.write((char *)&time->int_time, sizeof(integertime_t));

  /* Write number of particles */
  uint64_t N_total[csds_type_count];
  for (int type = 0; type < csds_type_count; type++) {
    N_total[type] = current_state[type].size();
  }
  f.write((char *)N_total, sizeof(uint64_t) * csds_type_count);

  /* Write if the file is sorted */
  // TODO change it if the array is sorted
  const char sorted = 0;
  f.write((char *)&sorted, sizeof(char));

  /* Ensure the data to be aligned */
  size_t cur_pos = f.tellp();
  size_t d_align = ((cur_pos + 7) & ~7) - cur_pos;
  if (d_align > 0) {
    long int tmp = 0;
    /* Fill the memory with garbage */
    f.write((const char *)&tmp, d_align);
  }

  /* Write the current state */
  for (int type = 0; type < csds_type_count; type++) {
    if (N_total[type] == 0) continue;

    for (auto const &current : current_state[type]) {
      f.write((const char *)&current.first, sizeof(id_type));
      f.write((const char *)&current.second, sizeof(uint64_t));
    }
  }

  /* Now do the same with the particles created / removed */
  index_writer_write_in_index(parts_created, f);
  index_writer_write_in_index(parts_removed, f);

  /* Cleanup the arrays */
  for (int type = 0; type < csds_type_count; type++) {
    index_writer_reset(&parts_created[type]);
    index_writer_reset(&parts_removed[type]);
  }
}

/**
 * @brief Get the initial state of the simulation.
 *
 * @param reader The #csds_reader.
 * @param current_state The arrays that will be updated with (size
 * csds_type_count).
 * @param time_record (output) The first #time_record in the logfile (for
 * writing the index file).
 *
 * @return The offset of the second #time_record
 * (the first record that does not correspond to the IC).
 */
size_t Reader::GetInitialState(CsdsUnorderedMap &current_state,
                               struct time_record *time_record) {
  const high_resolution_clock::time_point init = high_resolution_clock::now();

  /* Get a few variables. */
  LogFile *log = this->mLog;
  const struct header *h = &log->header;
  const int size_record_header = CSDS_MASK_SIZE + CSDS_OFFSET_SIZE;

  /* Warn the OS that we will read in a sequential way */
  log->log->AdviceSequentialReading();

  /* Get the offset after the dump of all the particles
     and the time information of the first time record. */
  if (log->times.Size() < 2) {
    csds_error("The time array is not large enough");
  }
  const size_t offset_max = log->times[1].offset;
  /* Here we cheat a bit by using the 0.
   * The index files provide the last position known at a given offset.
   * For the first index at t=0, the particles are not written yet.
   * In order to be able to read t=0, we change a bit the behavior
   * for the first index file.
   */
  *time_record = log->times[0];

  /* Get the offset of the first particle record */
  size_t offset_first = h->offset_first_record;

  /* Skip the time record */
  struct record_header header;
  log->log->ReadRecordHeader(offset_first, header);
  const int time_size = header_get_record_size_from_mask(h, header.mask);
  offset_first += time_size + size_record_header;

  /* Get the initial state */
  float next_percent = 0;
  for (size_t offset = offset_first; offset < offset_max;) {
    /* Get the particle type */
    int part_type = 0;
    int data = 0;

    enum csds_special_flags flag = csds_particle_read_special_flag(
        offset, header, &data, &part_type, *log->log);
    if (flag != csds_flag_create) {
      csds_error("Reading a particle from ICs without the created flag.");
    }

    /* Get the mask for the IDs */
    const Field &field_id =
        header_get_field_from_name(h, "ParticleIDs", (enum part_type)part_type);

    /* Get the particle ID */
    if (!(field_id.GetField() & header.mask)) {
      csds_error("The particle ID is missing in the first log");
    }

    /* Read the particle ID */
    int64_t id = 0;
    csds_particle_read_field(offset, &id, field_id,
                             /* derivative */ 0, header, h->fields[part_type],
                             *this->mLog->log);

    /* Log the particle */
    if (current_state[part_type].count(id) != 0) {
      csds_error("Already found a particle with the same ID");
    }
    current_state[part_type][id] = offset;

    /* Increment the offset */
    const int record_size = header_get_record_size_from_mask(h, header.mask);
    offset += record_size + size_record_header;

    /* Print the progress */
    if (this->mVerbose > 0) {
      float percent =
          (float)(offset - offset_first) / (float)(offset_max - offset_first);
      percent *= 100;
      if (percent > next_percent) {
        next_percent += 0.5;
        tools_print_progress(percent, init, "Getting initial state");
      }
    }
  }

  /* Close progressbar */
  if (this->mVerbose > 0) {
    printf("\n");
  }

  /* Print the time */
  if (this->mVerbose > 0 || this->mVerbose == CSDS_VERBOSE_TIMERS)
    message("took " << GetDeltaTime(init) << "ms");

  /* Go back to normal */
  log->log->AdviceNormalReading();

  return offset_max;
}

/**
 * @brief Get the last offset of a record before a given offset.
 *
 * @param reader The #csds_reader.
 * @param data The #index_data of the particle.
 * @param offset_limit The upper limit of the offset to get.
 *
 * @return The last offset before an index file.
 */
size_t Reader::GetLastOffsetBefore(const struct index_data &data,
                                   size_t offset_limit) {

  /* Get a the logfile */
  LogFile *log = this->mLog;

  size_t current_offset = data.offset;

  /* Get the full mask */
  struct record_header last_header;
  log->log->ReadRecordHeader(current_offset, last_header);

  /* Ensures that a special flag is present in the mask */
  last_header.mask |= CSDS_SPECIAL_FLAGS_MASK;

  /* Now remove it */
  last_header.mask = last_header.mask ^ CSDS_SPECIAL_FLAGS_MASK;

  /* Find the last offset before the current time */
  size_t last_full_offset = current_offset;
  current_offset += last_header.offset;
  while (1) {
    /* Get the mask */
    struct record_header cur_header;
    log->log->ReadRecordHeader(current_offset, cur_header);

    /* update the offset */
    current_offset += cur_header.offset;
    if (current_offset > offset_limit) {
      break;
    }

    /* The particle should not have a special flag
       due to the previous loop */
    if (cur_header.mask & CSDS_SPECIAL_FLAGS_MASK) {
      csds_error("Found a special flag when updating the particles");
    }

    /* Update the last full offset */
    if (last_header.mask == cur_header.mask) {
      last_full_offset = current_offset;
    }

    /* Are we at the end of the file? */
    if (cur_header.offset == 0) {
      break;
    }
  }

  return last_full_offset;
}

/**
 * @brief Update the state of the simulation until the next index file along
 * with the history.
 *
 * @param reader The #csds_reader.
 * @param init_offset The initial offset to read.
 * @param time_record The #time_record of the next index file.
 * @param current_state The arrays containing the state of the simulation (size
 * csds_type_count).
 * @param parts_created The arrays containing the particles created since last
 * index (size csds_type_count).
 * @param parts_removed The arrays containing the particles removed since last
 * index (size csds_type_count).
 *
 * @return The starting offset for the update.
 */
size_t Reader::UpdateStateToNextIndex(size_t init_offset,
                                      struct time_record time_record,
                                      CsdsUnorderedMap &current_state,
                                      struct index_writer *parts_created,
                                      struct index_writer *parts_removed) {
  LogFile *log = this->mLog;
  const struct header *h = &log->header;
  const int size_record_header = CSDS_MASK_SIZE + CSDS_OFFSET_SIZE;

  /* Look for all the created / removed particles */
  size_t offset = init_offset;
  int step = 0;

  /* Record the initial time */
  const high_resolution_clock::time_point init = high_resolution_clock::now();

  /* Warn the OS that we will read in a sequential way */
  log->log->AdviceSequentialReading();

  while (offset < time_record.offset) {

    /* Print status */
    if (this->mVerbose > 0) {
      step += 1;
      if (step % 100 == 0) {
        step = 0;

        /* Get the percentage */
        float percent = offset - init_offset;
        percent /= time_record.offset - init_offset;
        percent *= 100.f;

        tools_print_progress(percent, init,
                             "Looking for new or removed particles");
      }
    }
    int part_type = -1;  // only available if the record is flagged.
    int data = 0;

    /* Get the mask */
    struct record_header header;
    log->log->ReadRecordHeader(offset, header);

    /* Go to the next record */
    const size_t old_offset = offset;
    offset += header_get_record_size_from_mask(h, header.mask);
    offset += size_record_header;

    /* Check if we have a particle with a flag */
    if (header.mask & CSDS_TIMESTAMP_MASK ||
        !(header.mask & CSDS_SPECIAL_FLAGS_MASK)) {
      continue;
    }

    /* Get the special flag */
    enum csds_special_flags flag = csds_particle_read_special_flag(
        old_offset, header, &data, &part_type, *log->log);

#ifdef CSDS_DEBUG_CHECKS
    if (flag == csds_flag_none) {
      csds_error(
          "A record should not have a mask "
          "for a flag and the flag set to 0");
    }
#endif
    /* Check if we have a meaningful particle type */
    if (part_type < 0) {
      csds_error("Found a special flag without a particle type");
    }

    /* Get the mask for the IDs */
    // TODO create an array outside the loop
    const Field &field_id =
        header_get_field_from_name(h, "ParticleIDs", (enum part_type)part_type);

    /* Read the ID */
    int64_t id = 0;
    csds_particle_read_field(old_offset, &id, field_id,
                             /* derivative */ 0, header, h->fields[part_type],
                             *this->mLog->log);

    /* Add the particle to the arrays */
    if (flag == csds_flag_change_type || flag == csds_flag_mpi_exit ||
        flag == csds_flag_delete) {
      index_writer_log(&parts_removed[part_type], id, old_offset);
      if (current_state[part_type].count(id) != 1) {
        csds_error("Failed to remove a particle");
      };
      current_state[part_type].erase(id);
    } else if (flag == csds_flag_create || flag == csds_flag_mpi_enter) {
      index_writer_log(&parts_created[part_type], id, old_offset);
      if (current_state[part_type].count(id) != 0) {
        csds_error("Already found a particle with the same ID");
      }
      current_state[part_type][id] = old_offset;
    }
  }

  /* Go back to normal */
  log->log->AdviceNormalReading();

  /* Cleanup output */
  if (this->mVerbose > 0) {
    printf("\n");
  }

  /* Print the time */
  if (this->mVerbose > 0 || this->mVerbose == CSDS_VERBOSE_TIMERS)
    message("Finding new/removed particles took " << GetDeltaTime(init)
                                                  << "ms");

  /* Initialize the total number of particles for the progress bar */
  size_t total_number_particles = 0;
  for (int type = 0; type < csds_type_count; type++) {
    total_number_particles += current_state[type].size();
  }

  /* Record the time */
  const high_resolution_clock::time_point init2 = high_resolution_clock::now();
  size_t current_number = 0;
  int local_counter = 0;
  const int local_update_limit = 10000;

  /* Update the offsets of current_state
   * No need to update the others as they contain
   * data about when particles are removed/created*/
  for (int type = 0; type < csds_type_count; type++) {
    /* Loop over the buckets ~ particles */
#pragma omp parallel for firstprivate(local_counter)
    for (size_t bucket = 0; bucket < current_state[type].bucket_count();
         bucket++) {
      for (auto cur = current_state[type].begin(bucket);
           cur != current_state[type].end(bucket); cur++) {
        struct index_data index_data = {
            .id = cur->first,
            .offset = cur->second,
        };

        /* Update the offset */
        cur->second = this->GetLastOffsetBefore(index_data, time_record.offset);

        /* Are we done or should we print something? */
        if (!(this->mVerbose > 0)) continue;

        /* Update the counters */
        local_counter++;
        if (local_counter < local_update_limit) continue;

          /* Add the local counter to the global one */
#pragma omp atomic
        current_number += local_counter;
        local_counter = 0;

        /* Only the master is printing */
        int current_thread = csds_get_thread_num();
        if (current_thread != 0) continue;

        float ratio = (float)current_number / (float)total_number_particles;

        /* Print the message */
        tools_print_progress(100 * ratio, init2, "Updating offsets");
      }
    }
  }

  /* Cleanup the output */
  if (this->mVerbose > 0) printf("\n");

  /* Print the time */
  if (this->mVerbose > 0 || this->mVerbose == CSDS_VERBOSE_TIMERS)
    message("Updating particles took " << GetDeltaTime(init2) << "ms");

  return offset;
}

/**
 * @brief Generate the index files from the logfile.
 *
 * @param reader The #csds_reader.
 * @param number_index The number of index to generate.
 * @param current_index Current index to write (> 0 if restart)
 */
void Reader::GenerateIndexFiles(int number_index, int current_index) {
  /* Get a few pointers */
  LogFile *log = this->mLog;
  const struct header *h = &log->header;

  /* Write a quick message */
  if (this->mVerbose > 0) {
    message("Generating " << number_index << " index files");
    if (current_index) {
      message("Restarting from index: " << current_index);
    }
  }

  /* Check that the number of index is meaningful */
  if (number_index < 2) {
    csds_error("The CSDS requires at least 2 index files.");
  }

  /* Ensure that the offset are in the assumed direction */
  if (!header_is_forward(h)) {
    csds_error("The offset are not in the expected direction");
  }

  /* Create the different arrays that will store the information */
  CsdsUnorderedMap current_state;
  struct index_writer parts_created[csds_type_count];
  struct index_writer parts_removed[csds_type_count];
  const size_t default_size = 1024;

  /* Allocate the arrays for new / removed particles */
  for (int i = 0; i < csds_type_count; i++) {
    index_writer_init(&parts_created[i], default_size);
    index_writer_init(&parts_removed[i], default_size);
  }

  /* Current offset to read */
  size_t offset = 0;

  /* Variables for the time of each index files */
  const double t_min = log->times[0].time;
  const double t_max = log->times[log->times.Size() - 1].time;
  const double dt = (t_max - t_min) / (number_index - 1);

  /* Are we restarting? If not allocate and get the initial state */
  if (current_index == 0) {

    /* Allocate the arrays for the current state */
    for (int i = 0; i < csds_type_count; i++) {
      current_state[i].reserve(mParams.GetApproximateNumberParticles()[i] *
                               hashmap_overallocation);
    }

    /* Get the initial state */
    struct time_record time_record;
    offset = this->GetInitialState(current_state, &time_record);
    /* Write the first index file */
    this->WriteIndex(current_state, parts_created, parts_removed, &time_record,
                     /* file_number */ 0);

  }
  /* We are restarting => read state from file */
  else {
    /* Get the index file name */
    string filename = this->GetIndexName(current_index - 1);

    /* Initialize the index file */
    IndexFile index(filename, this->mVerbose);
    index.MapFile(filename, /* sorted */ 1, this->mVerbose);

    /* Loop over all the particle types */
    for (int i = 0; i < csds_type_count; i++) {
      /* Allocate the array for the current state */
      current_state[i].reserve(hashmap_overallocation *
                               index.GetNumberParticles((part_type)i));

      /* Copy the index file into the arrays. */
      for (size_t p = 0; p < index.GetNumberParticles((part_type)i); p++) {
        struct index_data *data = index.GetData((part_type)i);
        if (current_state[i].count(data->id) != 0) {
          csds_error("Already found a particle with the same ID");
        }
        current_state[i][data->id] = data->offset;
      }
    }

    /* Set the last offset read */
    if (current_index == 1) {
      /* In this case, we need to cheat a bit (see
       * GetInitialState) */
      offset = log->times[1].offset;
    } else {
      const double current_approximate_time = t_min + (current_index - 1) * dt;
      const size_t index_time =
          log->times.GetIndexFromTime(current_approximate_time);
      struct time_record index_time_record = log->times[index_time];
      offset = index_time_record.offset;

      /* Check if we are reading the correct file */
      if (index_time_record.int_time != index.GetIntegerTime()) {
        csds_error(
            "The time in the index file and the expected one do not match");
      }
    }
  }

  /* Compute the state of all the other files and write them. */
  for (int file_number = 1; file_number < number_index; file_number++) {

    /* Skip the index files that have already been written. */
    if (file_number < current_index) continue;

    /* Get the corresponding time record.
     * The index files are only here to speedup the code,
     * no need to have the exact time. */
    const double current_approximate_time = t_min + file_number * dt;
    const size_t index_time =
        log->times.GetIndexFromTime(current_approximate_time);
    struct time_record time_record = log->times[index_time];

    /* Ensure that we really have the final time (rounding error). */
    if (file_number == number_index - 1) {
      time_record = log->times[log->times.Size() - 1];
    }

    /* Update the state until the next index file. */
    offset = this->UpdateStateToNextIndex(offset, time_record, current_state,
                                          parts_created, parts_removed);

    /* Write the index file */
    this->WriteIndex(current_state, parts_created, parts_removed, &time_record,
                     file_number);
  }

  /* Free the memory */
  for (int type = 0; type < csds_type_count; type++) {
    index_writer_free(&parts_created[type]);
    index_writer_free(&parts_removed[type]);
  }

  if (this->mVerbose > 0) {
    message("Generation done");
  }
}