Skip to content
Snippets Groups Projects
csds_reader_generate_index.c 24.10 KiB
/*******************************************************************************
 * This file is part of CSDS.
 * Copyright (c) 2021 Loic Hausammann (loic.hausammann@epfl.ch)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 ******************************************************************************/

/* Include corresponding header */
#include "csds_reader.h"

/* Include local headers */
#include "csds_hashmap.h"
#include "csds_index.h"
#include "csds_logfile.h"
#include "csds_openmp.h"
#include "csds_part_type.h"

/**
 * @brief Structure that contains all the information
 * required to write an index file for a single particle type.
 */
struct index_writer {
  /* Number of elements currently stored */
  uint64_t size;

  /* Size of the current buffer */
  size_t capacity;

  /* Buffer containing the particles */
  struct index_data *data;

  /* Initial size of the writer */
  size_t init_size;

  /* Number of tagged particles */
  size_t number_tag;

  /* Maximal fraction of tagged particles */
  float max_frac_tag;
};

/**
 * @brief Initialize the structure for the first time.
 *
 * @param writer The #index_writer.
 * @param init_size The initial size of the writer.
 * @param max_frac_tag The maximal fraction of tagged particles.
 */
void index_writer_init(struct index_writer *writer, size_t init_size,
                       float max_frac_tag) {

  /* Set the counters to their initial value */
  writer->size = 0;
  writer->capacity = init_size;
  writer->init_size = init_size;
  writer->number_tag = 0;
  writer->max_frac_tag = max_frac_tag;
  if (init_size == 0) {
    writer->data = NULL;
    return;
  }

  /* Allocate the memory */
  writer->data =
      (struct index_data *)malloc(sizeof(struct index_data) * init_size);
  if (writer->data == NULL) {
    error("Failed to allocate memory for the index_writer.");
  }
}

/**
 * @brief Reset the structure.
 *
 * @param writer The #index_writer.
 */
void index_writer_reset(struct index_writer *writer) {
  if (writer->size == 0) return;

  free(writer->data);
  index_writer_init(writer, writer->init_size, writer->max_frac_tag);
}

/**
 * @brief Free the structure.
 *
 * @param hist The #index_writer.
 */
void index_writer_free(struct index_writer *writer) {
  /* Set the counters to 0 */
  writer->size = 0;
  writer->capacity = 0;

  /* Free the memory */
  if (writer->data != NULL) {
    free(writer->data);
    writer->data = NULL;
  }
}

/**
 * @brief Log a the particle information into the #index_writer.
 *
 * @param writer The #index_writer.
 * @param id The ID of the particle.
 * @param last_offset The last offset of the particle.
 */
void index_writer_log(struct index_writer *writer, const int64_t id,
                      const uint64_t last_offset) {

  /* Ensure that it was correctly allocated */
  if (writer->capacity == 0) {
    error(
        "Trying to add a particle to an empty index_writer."
        " If your are sure that your particle type is implemented, "
        "please increase the value of ReaderOptions:Number*.");
  }

#ifdef CSDS_DEBUG_CHECKS
  if (id < 0) {
    error("Negative ID for a particle.");
  }
#endif

  const struct index_data data = {id, last_offset};

  /* Check if enough space is left */
  if (writer->size == writer->capacity) {
    /* Compute the previous amount of memory */
    const size_t memsize = sizeof(struct index_data) * writer->capacity;

    /* Increase the capacity of the array */
    writer->capacity *= 2;

    /* Allocate the new array and copy the content of the previous one */
    struct index_data *tmp = (struct index_data *)malloc(2 * memsize);

    memcpy(tmp, writer->data, memsize);

    /* Free the previous array and switch the pointers */
    free(writer->data);
    writer->data = tmp;
  }

  /* Save the new particle */
  writer->data[writer->size] = data;

  /* Increase the element counter */
  writer->size += 1;
}

/**
 *@brief Check if we have some unimplemented particles.
 */
void index_writer_check_implemented(const struct index_writer *writers) {
  if (writers[csds_type_sink].size != 0 ||
      writers[csds_type_black_hole].size != 0 ||
      writers[csds_type_neutrino].size != 0) {
    error("TODO implement additional particle types");
  }
}

/**
 * @brief Write the number of particles and their data into the index file
 */
void index_writer_write_in_index(const struct index_writer *writers, FILE *f) {

  /* Write the number of particles */
  uint64_t N_total[csds_type_count];
  for (int type = 0; type < csds_type_count; type++) {
    N_total[type] = writers[type].size;
  }
  fwrite(N_total, sizeof(uint64_t), csds_type_count, f);

  /* Write the index data */
  for (int type = 0; type < csds_type_count; type++) {
    if (N_total[type] == 0) continue;

    fwrite(writers[type].data, sizeof(struct index_data), writers[type].size,
           f);
  }
}

/**
 * @brief Write an index file.
 *
 * @param reader The #csds_reader.
 * @param current_state The arrays containing the current
 * state with the last full offset (size given by csds_type_count).
 * @param parts_created The arrays containing the first reference
 * (since last index file) of created particles
 * (size given by csds_type_count).
 * @param parts_removed The arrays containing the last reference
 * (since last index file) of removed particles (size given by
 * csds_type_count).
 * @param time The time corresponding to the current index file.
 * @param file_number The current file number.
 */
void csds_reader_write_index(const struct csds_reader *reader,
                             struct csds_hashmap **current_state,
                             struct index_writer *parts_created,
                             struct index_writer *parts_removed,
                             const struct time_record *time, int file_number) {

  /* Get the filename */
  char filename[CSDS_STRING_SIZE + 15];
  sprintf(filename, "%s_%04i.index", reader->basename, file_number);

  /* Check that we have only implemented particles */
  if (csds_hashmap_count(current_state[csds_type_sink]) != 0 ||
      csds_hashmap_count(current_state[csds_type_black_hole]) != 0 ||
      csds_hashmap_count(current_state[csds_type_neutrino]) != 0)
    error("Not implemented");
  index_writer_check_implemented(parts_created);
  index_writer_check_implemented(parts_removed);

  /* Open file */
  FILE *f = NULL;
  f = fopen(filename, "wb");

  if (f == NULL) {
    error("Failed to open file %s", filename);
  }
  /* Write double time */
  fwrite(&time->time, sizeof(double), 1, f);

  /* Write integer time */
  fwrite(&time->int_time, sizeof(integertime_t), 1, f);

  /* Write number of particles */
  uint64_t N_total[csds_type_count];
  for (int type = 0; type < csds_type_count; type++) {
    N_total[type] = csds_hashmap_count(current_state[type]);
  }
  fwrite(N_total, sizeof(uint64_t), csds_type_count, f);

  /* Write if the file is sorted */
  // TODO change it if the array is sorted
  const char sorted = 0;
  fwrite(&sorted, sizeof(char), 1, f);

  /* Ensure the data to be aligned */
  size_t cur_pos = ftell(f);
  size_t d_align = ((cur_pos + 7) & ~7) - cur_pos;
  if (d_align > 0) {
    long int tmp = 0;
    /* Fill the memory with garbage */
    fwrite(&tmp, d_align, 1, f);
  }

  /* Write the current state */
  for (int type = 0; type < csds_type_count; type++) {
    if (N_total[type] == 0) continue;

    // TODO memory map the file
    csds_hashmap_write(current_state[type], f);
  }

  /* Now do the same with the particles created / removed */
  index_writer_write_in_index(parts_created, f);
  index_writer_write_in_index(parts_removed, f);

  /* Close the file */
  fclose(f);

  /* Cleanup the arrays */
  for (int type = 0; type < csds_type_count; type++) {
    index_writer_reset(&parts_created[type]);
    index_writer_reset(&parts_removed[type]);
  }
}

/**
 * @brief Get the initial state of the simulation.
 *
 * @param reader The #csds_reader.
 * @param current_state The arrays that will be updated with (size
 * csds_type_count).
 * @param time_record (output) The first #time_record in the logfile (for
 * writing the index file).
 *
 * @return The offset of the second #time_record
 * (the first record that does not correspond to the IC).
 */
size_t csds_reader_get_initial_state(const struct csds_reader *reader,
                                     struct csds_hashmap **current_state,
                                     struct time_record *time_record) {
  const ticks tic = getticks();
  /* Get a few variables. */
  const struct csds_logfile *log = &reader->log;
  const struct header *h = &log->header;
  const int size_record_header = CSDS_MASK_SIZE + CSDS_OFFSET_SIZE;

  /* Warn the OS that we will read in a sequential way */
  CSDS_ADVICE_SEQUENTIAL(log->log);

  /* Get the offset after the dump of all the particles
     and the time information of the first time record. */
  if (log->times.size < 2) {
    error("The time array is not large enough");
  }
  const size_t offset_max = log->times.records[1].offset;
  *time_record = log->times.records[0];

  /* Get the offset of the first particle record */
  size_t offset_first = h->offset_first_record;

  /* Skip the time record */
  mask_type time_mask = 0;
  csds_loader_io_read_mask(log->log.map + offset_first, &time_mask, NULL);
  const int time_size = header_get_record_size_from_mask(h, time_mask);
  offset_first += time_size + size_record_header;

  /* Get the initial state */
  for (size_t offset = offset_first; offset < offset_max;) {
    /* Get the particle type */
    mask_type mask = 0;
    size_t prev_offset = 0;
    int part_type = 0;
    int data = 0;

    enum csds_special_flags flag = csds_particle_read_special_flag(
        offset, &mask, &prev_offset, &data, &part_type, log->log.map);
    if (flag != csds_flag_create) {
      error("Reading a particle from ICs without the created flag.");
    }

    /* TODO Implement missing particle types */
    if (part_type == csds_type_neutrino || part_type == csds_type_sink ||
        part_type == csds_type_black_hole) {
      error("Particle type not implemented (%s)", part_type_names[part_type]);
    }

    /* Get the mask for the IDs */
    const struct field_information *field_id =
      header_get_field_from_name(h, "ParticleIDs",
                                 (enum part_type) part_type);
    /* Get the particle ID */
    if (!(mask & field_id->field.mask)) {
      error("The particle ID is missing in the first log");
    }

    /* Read the particle ID */
    int64_t id = 0;
    csds_particle_read_field(offset, &id, field_id,
                             /* derivative */ 0, &mask, &prev_offset,
                             h->fields[part_type], h->number_fields[part_type],
                             reader->log.log.map);

    /* Log the particle */
    struct index_data item = {id, offset};
    void *p = (void *)csds_hashmap_set(current_state[part_type], &item);
    if (p != NULL) {
      error("Already found a particle with the same ID");
    }

    /* Increment the offset */
    const int record_size = header_get_record_size_from_mask(h, mask);
    offset += record_size + size_record_header;
  }

  /* Print the time */
  if (reader->verbose > 0 || reader->verbose == CSDS_VERBOSE_TIMERS)
    message("took %.3f %s", clocks_from_ticks(getticks() - tic),
            clocks_getunit());

  /* Go back to normal */
  CSDS_ADVICE_NORMAL(log->log);

  return offset_max;
}

/**
 * @brief Get the last offset of a record before a given offset.
 *
 * @param reader The #csds_reader.
 * @param data The #index_data of the particle.
 * @param offset_limit The upper limit of the offset to get.
 *
 * @return The last offset before an index file.
 */
size_t csds_reader_get_last_offset_before(const struct csds_reader *reader,
                                          const struct index_data *data,
                                          size_t offset_limit) {

  /* Get a the logfile */
  const struct csds_logfile *log = &reader->log;

  size_t current_offset = data->offset;

  /* Get the full mask */
  mask_type full_mask = 0;
  size_t h_offset = 0;
  csds_loader_io_read_mask(log->log.map + current_offset, &full_mask,
                           &h_offset);

  /* Ensures that a special flag is present in the mask */
  full_mask |= CSDS_SPECIAL_FLAGS_MASK;

  /* Now remove it */
  full_mask = full_mask ^ CSDS_SPECIAL_FLAGS_MASK;

  /* Find the last offset before the current time */
  size_t last_full_offset = current_offset;
  current_offset += h_offset;
  while (1) {
    /* Get the mask */
    mask_type mask = 0;
    h_offset = 0;
    csds_loader_io_read_mask(log->log.map + current_offset, &mask, &h_offset);

    /* update the offset */
    current_offset += h_offset;
    if (current_offset > offset_limit) {
      break;
    }

    /* The particle should not have a special flag
       due to the previous loop */
    if (mask & CSDS_SPECIAL_FLAGS_MASK) {
      error("Found a special flag when updating the particles");
    }

    /* Update the last full offset */
    if (full_mask == mask) {
      last_full_offset = current_offset;
    }

    /* Are we at the end of the file? */
    if (h_offset == 0) {
      break;
    }
  }

  return last_full_offset;
}

/**
 * @brief Update the state of the simulation until the next index file along
 * with the history.
 *
 * @param reader The #csds_reader.
 * @param init_offset The initial offset to read.
 * @param time_record The #time_record of the next index file.
 * @param current_state The arrays containing the state of the simulation (size
 * csds_type_count).
 * @param parts_created The arrays containing the particles created since last
 * index (size csds_type_count).
 * @param parts_removed The arrays containing the particles removed since last
 * index (size csds_type_count).
 *
 * @return The starting offset for the update.
 */
size_t csds_reader_update_state_to_next_index(
    const struct csds_reader *reader, size_t init_offset,
    struct time_record time_record, struct csds_hashmap **current_state,
    struct index_writer *parts_created, struct index_writer *parts_removed) {
  const struct csds_logfile *log = &reader->log;
  const struct header *h = &log->header;
  const int size_record_header = CSDS_MASK_SIZE + CSDS_OFFSET_SIZE;

  /* Look for all the created / removed particles */
  size_t offset = init_offset;
  int step = 0;
  if (reader->verbose > 0) printf("\n");

  /* Record the initial time */
  const ticks tic = getticks();

  /* Get the mask for the IDs */
  const struct field_information *field_id =
    header_get_field_from_name(h, "ParticleIDs", csds_type_dark_matter);

  /* Warn the OS that we will read in a sequential way */
  CSDS_ADVICE_SEQUENTIAL(log->log);

  while (offset < time_record.offset) {

    /* Print status */
    if (reader->verbose > 0) {
      step += 1;
      if (step % 1000 == 0) {
        step = 0;

        /* Get the percentage */
        float percent = offset - init_offset;
        percent /= time_record.offset - init_offset;
        percent *= 100.f;

        /* Get the remaining time */
        const int current_time =
            clocks_diff_ticks(getticks(), clocks_start_ticks) / 1000.0;
        const int init_time =
            clocks_diff_ticks(tic, clocks_start_ticks) / 1000.0;

        const int remaining_time =
            (current_time - init_time) * (100. - percent) / percent;

        tools_print_progress(percent, remaining_time,
                             "Looking for new or removed particles");
      }
    }
    mask_type mask = 0;
    size_t h_offset = 0;
    csds_loader_io_read_mask(log->log.map + offset, &mask, &h_offset);

    /* Go to the next record */
    const size_t old_offset = offset;
    offset += header_get_record_size_from_mask(h, mask);
    offset += size_record_header;

    /* Check if we have a particle with a flag */
    if (mask & CSDS_TIMESTAMP_MASK) {
      continue;
    }

    /* Read the ID */
    int64_t id = 0;
    csds_particle_read_field(old_offset, &id, field_id,
                             /* derivative */ 0, &mask, &h_offset,
                             h->fields[csds_type_dark_matter],
                             h->number_fields[csds_type_dark_matter],
                             reader->log.log.map);
    struct index_data item = {id, old_offset};

    if (!(mask & CSDS_SPECIAL_FLAGS_MASK)) {
      void *p = csds_hashmap_set(current_state[csds_type_dark_matter], &item);
      if (p == NULL)
        error("Found a new particle without mask");
      continue;
    }

    /* Get the special flag */
    int data = 0;
    int part_type = -1;
    enum csds_special_flags flag = csds_particle_read_special_flag(
        old_offset, &mask, &h_offset, &data, &part_type, log->log.map);
    if (part_type != csds_type_dark_matter)
      error("Wrong type");

#ifdef CSDS_DEBUG_CHECKS
    if (flag == csds_flag_none) {
      error(
          "A record should not have a mask "
          "for a flag and the flag set to 0");
    }
#endif
    /* Check if we have a meaningful particle type */
    if (part_type < 0) {
      error("Found a special flag without a particle type");
    }

    /* Add the particle to the arrays */
    if (flag == csds_flag_change_type || flag == csds_flag_mpi_exit ||
        flag == csds_flag_delete) {
      index_writer_log(&parts_removed[part_type], id, old_offset);
      if (csds_hashmap_delete(current_state[part_type], id) == NULL) {
        error("Failed to remove a particle");
      };
    } else if (flag == csds_flag_create || flag == csds_flag_mpi_enter) {
      index_writer_log(&parts_created[part_type], id, old_offset);
      void *p = (void *)csds_hashmap_set(current_state[part_type], &item);
      if (p != NULL) {
        error("Already found a particle with the same ID");
      }
    }
  }

  /* Go back to normal */
  CSDS_ADVICE_NORMAL(log->log);

  /* Cleanup output */
  if (reader->verbose > 0) {
    printf("\n");
  }

  /* Print the time */
  if (reader->verbose > 0 || reader->verbose == CSDS_VERBOSE_TIMERS)
    message("Updating particles took %.3f %s",
            clocks_from_ticks(getticks() - tic), clocks_getunit());

  return offset;
}

/**
 * @brief Generate the index files from the logfile.
 *
 * @param reader The #csds_reader.
 * @param number_index The number of index to generate.
 * @param current_index Current index to write (> 0 if restart)
 */
void csds_reader_generate_index_files(const struct csds_reader *reader,
                                      int number_index, int current_index) {
  /* Get a few pointers */
  const struct csds_logfile *log = &reader->log;
  const struct header *h = &log->header;

  /* Write a quick message */
  if (reader->verbose > 0) {
    message("Generating %i index files", number_index);
    if (current_index) {
      message("Restarting from index %i", current_index);
    }
  }

  /* Check that the number of index is meaningful */
  if (number_index < 2) {
    error("The CSDS requires at least 2 index files.");
  }

  /* Ensure that the offset are in the assumed direction */
  if (!header_is_forward(h)) {
    error("The offset are not in the expected direction");
  }

  /* Create the different arrays that will store the information */
  struct csds_hashmap *current_state[csds_type_count];
  struct index_writer parts_created[csds_type_count];
  struct index_writer parts_removed[csds_type_count];
  const size_t default_size = 1024;

  /* Allocate the arrays for new / removed particles */
  for (int i = 0; i < csds_type_count; i++) {
    index_writer_init(&parts_created[i], default_size,
                      reader->params.arrays_maximal_tagged_fraction);
    index_writer_init(&parts_removed[i], default_size,
                      reader->params.arrays_maximal_tagged_fraction);
  }

  /* Current offset to read */
  size_t offset = 0;

  /* Variables for the time of each index files */
  const double t_min = log->times.records[0].time;
  const double t_max = log->times.records[log->times.size - 1].time;
  const double dt = (t_max - t_min) / (number_index - 1);

  /* Are we restarting? If not allocate and get the initial state */
  if (current_index == 0) {

    /* Allocate the arrays for the current state */
    for (int i = 0; i < csds_type_count; i++) {
      current_state[i] =
          csds_hashmap_new(hashmap_overallocation *
                           reader->params.approximate_number_particles[i]);
      if (current_state[i] == NULL) {
        error("Failed to initialize the hashmap");
      }
    }

    /* Get the initial state */
    struct time_record time_record;
    offset = csds_reader_get_initial_state(reader, current_state, &time_record);
    /* Write the first index file */
    csds_reader_write_index(reader, current_state, parts_created, parts_removed,
                            &time_record, /* file_number */ 0);

  }
  /* We are restarting => read state from file */
  else {

    /* Initialize the index file */
    struct csds_index index;
    csds_index_init(&index, reader);

    /* Get its name */
    char filename[CSDS_STRING_SIZE + 50];
    sprintf(filename, "%s_%04u.index", reader->basename, current_index - 1);

    /* Read it */
    csds_index_read_header(&index, filename);
    csds_index_map_file(&index, filename, /* sorted */ 1);

    /* Loop over all the particle types */
    for (int i = 0; i < csds_type_count; i++) {
      /* Allocate the array for the current state */
      current_state[i] =
          csds_hashmap_new(hashmap_overallocation * index.nparts[i]);
      if (current_state[i] == NULL) {
        error("Failed to initialize the hashmap");
      }

      /* Copy the index file into the arrays. */
      for (size_t p = 0; p < index.nparts[i]; p++) {
        struct index_data *data = csds_index_get_data(&index, i);
        void *out = (void *)csds_hashmap_set(current_state[i], data + p);
        if (out != NULL) {
          error("Already found a particle with the same ID");
        }
      }
    }

    /* Set the last offset read */
    const double current_approximate_time = t_min + (current_index - 1) * dt;
    const size_t index_time =
        time_array_get_index_from_time(&log->times, current_approximate_time);
    struct time_record index_time_record = log->times.records[index_time];
    offset = index_time_record.offset;

    /* Check if we are reading the correct file */
    if (index_time_record.int_time != index.integer_time) {
      error("The time in the index file and the expected one do not match");
    }

    /* Cleanup */
    csds_index_free(&index);
  }

  /* Compute the state of all the other files and write them. */
  for (int file_number = 1; file_number < number_index; file_number++) {

    /* Skip the index files that have already been written. */
    if (file_number < current_index) continue;

    /* Get the corresponding time record.
     * The index files are only here to speedup the code,
     * no need to have the exact time. */
    const double current_approximate_time = t_min + file_number * dt;
    const size_t index_time =
        time_array_get_index_from_time(&log->times, current_approximate_time);
    struct time_record time_record = log->times.records[index_time];

    /* Ensure that we really have the final time (rounding error). */
    if (file_number == number_index - 1) {
      time_record = log->times.records[log->times.size - 1];
    }

    /* Update the state until the next index file. */
    offset = csds_reader_update_state_to_next_index(
        reader, offset, time_record, current_state, parts_created,
        parts_removed);

    /* Write the index file */
    csds_reader_write_index(reader, current_state, parts_created, parts_removed,
                            &time_record, file_number);
  }

  /* Free the memory */
  for (int type = 0; type < csds_type_count; type++) {
    csds_hashmap_free(current_state[type]);
    index_writer_free(&parts_created[type]);
    index_writer_free(&parts_removed[type]);
  }

  if (reader->verbose > 0) {
    message("Generation done");
  }
}