Skip to content
Snippets Groups Projects
csds_reader.c 36.25 KiB
/*******************************************************************************
 * This file is part of CSDS.
 * Copyright (c) 2019 Loic Hausammann (loic.hausammann@epfl.ch)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 ******************************************************************************/

/* Include corresponding header */
#include "csds_reader.h"

/* Include CSDS headers */
#include "csds_fields.h"
#include "csds_parameters.h"

/* Include standard library */
#include <sys/sysinfo.h>
#include <unistd.h>

/* Include local headers */
#include "threadpool.h"

#define nr_threads get_nprocs()

/**
 * @brief Initialize the reader.
 *
 * @param reader The #csds_reader.
 * @param basename The basename of the csds files.
 * @param verbose The verbose level.
 * @param number_threads The number of threads to use.
 * @param number_index The number of index files to write.
 * @param restart_init Are we restarting the initial reading?
 */
void csds_reader_init(struct csds_reader *reader, const char *basename,
                      int verbose, int number_threads, int number_index,
                      int restart_init) {
  if (verbose > 1) message("Initializing the reader.");

  /* Set the variable to the default values */
  reader->time.time = -1.;
  reader->time.int_time = 0;
  reader->time.time_offset = 0;

  /* Copy the base name */
  strcpy(reader->basename, basename);

  /* Initialize the reader variables. */
  reader->verbose = verbose;
  csds_parameters_init(&reader->params, reader, number_threads);

  /* Generate the logfile filename */
  char logfile_name[STRING_SIZE];
  sprintf(logfile_name, "%s.dump", basename);

  /* Initialize the log file. */
  csds_logfile_init_from_file(&reader->log, logfile_name, reader,
                              /* only_header */ 0);
  /* Initialize the index files and creates them if needed */
  csds_reader_init_index(reader, number_index, restart_init);

  /* Save the time array */
  time_array_save(&reader->log.times, &reader->log);

  if (verbose > 1) message("Initialization done.");
}

/**
 * @brief Initialize the index part of the reader.
 *
 * @param reader The #csds_reader.
 * @param number_index Number of requested index files.
 * @param restart Are we restarting the generation of index files?
 */
void csds_reader_init_index(struct csds_reader *reader, int number_index,
                            int restart) {
  /* Initialize the csds_index */
  csds_index_init(&reader->index.index_prev, reader);
  csds_index_init(&reader->index.index_next, reader);

  /* Count the number of files */
  int count = 0;
  while (1) {
    char filename[STRING_SIZE + 50];
    sprintf(filename, "%s_%04i.index", reader->basename, count);

    /* Check if file exists */
    if (access(filename, F_OK) != -1) {
      count++;
    } else {
      break;
    }
  }

  /* Ensures that we do not have an unexpected situation */
  if (number_index == 1 || (count == 1 && !restart)) {
    error(
        "The CSDS cannot run with only one index file."
        " Did you forget to restart their generation?");
  }

  /* Check if need to generate the index files or not. */
  if ((count != number_index && restart) || count == 0) {
    csds_reader_generate_index_files(reader, number_index, count);
    count = number_index;
  } else if (reader->verbose > 0 && count != number_index) {
    message(
        "Found %i index files. If you wish to have the"
        " requested %i files, please delete the old ones.",
        count, number_index);
  }

  reader->index.n_files = count;

  /* Initialize the arrays */
  if ((reader->index.times = (double *)malloc(count * sizeof(double))) ==
      NULL) {
    error_python("Failed to allocate the list of times");
  }
  if ((reader->index.int_times =
           (integertime_t *)malloc(count * sizeof(integertime_t))) == NULL) {
    error_python("Failed to allocate the list of times");
  }

  /* Get the information contained in the headers */
  for (int i = 0; i < reader->index.n_files; i++) {
    char filename[STRING_SIZE + 50];
    sprintf(filename, "%s_%04i.index", reader->basename, i);

    /* Read the header */
    csds_index_read_header(&reader->index.index_prev, filename);

    /* Save the required information */
    reader->index.times[i] = reader->index.index_prev.time;
    reader->index.int_times[i] = reader->index.index_prev.integer_time;
  }
}

/**
 * @brief Free the reader.
 *
 * @param reader The #csds_reader.
 */
void csds_reader_free(struct csds_reader *reader) {
  /* Free the log. */
  csds_logfile_free(&reader->log);

  if (reader->time.time != -1.) {
    csds_index_free(&reader->index.index_prev);
  }

  free(reader->index.int_times);
  free(reader->index.times);
}

/**
 * @brief Set the reader to a given time and read the correct index file.
 *
 * @param reader The #csds_reader.
 * @param time The requested time.
 */
void csds_reader_set_time(struct csds_reader *reader, double time) {
  /* Set the time */
  reader->time.time = time;

  unsigned int left = 0;
  unsigned int right = reader->index.n_files - 1;

  /* Check if the time is meaningful */
  if (reader->index.times[left] > time || reader->index.times[right] < time)
    error("The requested time (%f) is not within the index limits [%f, %f]",
          time, reader->index.times[left], reader->index.times[right]);

  /* Find the correct index */
  while (left != right) {
    /* Do a ceil - division */
    unsigned int m = (left + right + 1) / 2;
    if (reader->index.times[m] > time) {
      right = m - 1;
    } else {
      left = m;
    }
  }

  /* Deal with the final time */
  if (left == (unsigned int)reader->index.n_files - 1) {
    left -= 1;
  }

  /* Generate the filename */
  char filename_prev[STRING_SIZE + 50];
  sprintf(filename_prev, "%s_%04u.index", reader->basename, left);
  char filename_next[STRING_SIZE + 50];
  sprintf(filename_next, "%s_%04u.index", reader->basename, left + 1);

  /* Check if the file is already mapped */
  if (reader->index.index_prev.index.map != NULL) {
    csds_index_free(&reader->index.index_prev);
  }
  if (reader->index.index_next.index.map != NULL) {
    csds_index_free(&reader->index.index_next);
  }

  /* Read the file */
  csds_index_read_header(&reader->index.index_prev, filename_prev);
  csds_index_map_file(&reader->index.index_prev, filename_prev,
                      /* sorted */ 1);

  csds_index_read_header(&reader->index.index_next, filename_next);
  csds_index_map_file(&reader->index.index_next, filename_next,
                      /* sorted */ 1);

  /* Get the offset of the time chunk */
  size_t ind = time_array_get_index_from_time(&reader->log.times, time);
  /* ind == 0 and ind == 1 are the same time, but when reading we need
     data before the initial time. */
  if (ind == 0) {
    ind = 1;
  }

  /* Check if we requested exactly a time step  */
  if (reader->log.times.records[ind].time != time) {
    /* In order to interpolate, we need to be above and not below the time */
    ind += 1;
  }

  /* Save the values */
  reader->time.index = ind;
  reader->time.int_time = reader->log.times.records[ind].int_time;
  reader->time.time_offset = reader->log.times.records[ind].offset;
}

/**
 * @brief Count the number of new particles at the time set since last index
 * file.
 *
 * @param reader The #csds_reader.
 * @param part_type The index corresponding to the particle type.
 *
 * @param The number of new particles.
 */
size_t csds_reader_count_number_new_particles(struct csds_reader *reader,
                                              enum part_type part_type) {

  const size_t threshold = reader->time.time_offset;

  /* Get the history of created particles. */
  struct index_data *data =
      csds_index_get_created_history(&reader->index.index_next, part_type);

  /* Do we have any new particle? */
  if (reader->index.index_next.nparts_created[part_type] == 0 ||
      threshold < data[0].offset) {
    return 0;
  }

  /* Perform a binary search */
  size_t right = reader->index.index_next.nparts_created[part_type] - 1;
  size_t left = 0;
  while (left <= right) {
    size_t m = (left + right) / 2;
    if (data[m].offset < threshold) {
      left = m + 1;
    } else if (data[m].offset > threshold) {
      right = m - 1;
    } else {
      return m;
    }
  }

  /* Compute the return value. */
  size_t ret = (left + right) / 2;

  /* Check if the binary search is correct. */
  if (data[ret].offset > threshold) {
    error_python("The binary search has failed.");
  }

  /* We need the count, not the index. */
  return ret + 1;
}

/**
 * @brief Count the number of removed particles at the time set since last index
 * file.
 *
 * @param reader The #csds_reader.
 * @param n_type (output) The number of particle type possible.
 * @param part_type The index corresponding to the particle type.
 *
 * @param The number of removed particles.
 */
size_t csds_reader_count_number_removed_particles(struct csds_reader *reader,
                                                  enum part_type part_type) {

  const size_t threshold = reader->time.time_offset;

  /* Get the history of created particles. */
  struct index_data *data =
      csds_index_get_removed_history(&reader->index.index_next, part_type);

  /* Do we have any new particle? */
  if (reader->index.index_next.nparts_removed[part_type] == 0 ||
      threshold < data[0].offset) {
    return 0;
  }

  /* Perform a binary search */
  size_t right = reader->index.index_next.nparts_removed[part_type] - 1;
  size_t left = 0;
  while (left <= right) {
    size_t m = (left + right) / 2;
    if (data[m].offset < threshold) {
      left = m + 1;
    } else if (data[m].offset > threshold) {
      right = m - 1;
    } else {
      return m;
    }
  }

  /* Compute the return value. */
  size_t ret = (left + right) / 2;

  /* Check if the binary search is correct. */
  if (data[ret].offset > threshold) {
    error_python("The binary search has failed.");
  }

  /* We need the count, not the index. */
  return ret + 1;
}

/**
 * @brief Provides the number of particle (per type) from the index file.
 *
 * @param reader The #csds_reader.
 * @param n_parts (out) Number of particles at the time set in the reader.
 * @param read_types Should we read this type of particles?
 */
void csds_reader_get_number_particles(struct csds_reader *reader,
                                      uint64_t *n_parts,
                                      const int *read_types) {
  for (enum part_type i = (enum part_type)0; i < swift_type_count; i++) {
    /* Should we skip this type of particles? */
    if (read_types[i] == 0) {
      n_parts[i] = 0;
      continue;
    }

    /* Count the number of particles present in the last index file. */
    const uint64_t count_prev = reader->index.index_prev.nparts[i];
    /* Count the number of particles that have been created since last index. */
    const uint64_t count_new =
        csds_reader_count_number_new_particles(reader, i);
    /* Count the number of particles that have been removed since last index. */
    const uint64_t count_removed =
        csds_reader_count_number_removed_particles(reader, i);
    n_parts[i] = (count_prev + count_new) - count_removed;
  }
}

/**
 * @brief Read a single field for a single part from the csds and interpolate
 it if needed.
 * This function will jump from the last known full record until the requested
 time and then
 * read the last record containing the requested field.
 * If an interpolation is required, the function will continue jumping until
 finding a record
 * with the requested field. If the first or second derivative is present in the
 last record before /
 * first record after the requested time, it will use it for the interpolation.
 *
 * @param reader The #csds_reader.
 * @param time The requested time.
 * @param offset_time The offset of the corresponding time record.
 * @param interp_type The type of interpolation requested.
 * @param offset_last_full_record The offset of this particle last record
 containing all the fields.
 * @param field_wanted The field to read.
 * @param output Pointers to the output array
 * @param type The #part_type.
 *
 * @return Is the particle removed from the logfile?
 */
int csds_reader_read_field(struct csds_reader *reader, double time,
                           size_t offset_time,
                           enum csds_reader_type interp_type,
                           const size_t offset_last_full_record,
                           const struct field_information *field_wanted,
                           void *output, enum part_type type) {

  const struct header *h = &reader->log.header;
  size_t offset = offset_last_full_record;

  /* Check if the offsets are correct. */
  if (offset > offset_time) {
    error_python("Last offset is larger than the requested time.");
  }

  /* Offset of the field read before the requested time. */
  size_t offset_before = offset_last_full_record;

  /* Record's header information */
  size_t mask, h_offset;

  /* Find the data for the previous record.
     As we start from a full record,
     no need to check if the field is found.
  */
  while (offset < offset_time) {
    /* Read the particle. */
    csds_loader_io_read_mask(h, reader->log.log.map + offset, &mask, &h_offset);

    /* Is the particle removed from the logfile? */
    if (mask & SPECIAL_FLAGS_MASK) {
      int data = 0;
      int part_type = 0;
      enum csds_special_flags flag = csds_particle_read_special_flag(
          reader, offset, &mask, &h_offset, &data, &part_type);

#ifdef SWIFT_DEBUG_CHECKS
      if (part_type != type) {
        error("Found particles that do not correspond.");
      }
#endif

      if (flag == csds_flag_change_type || flag == csds_flag_mpi_exit ||
          flag == csds_flag_delete) {
        return 1;
      }
    }

    /* Check if there is a next record (avoid infinite loop). */
    if (h_offset == 0) {
      error_python("No next record found.");
    }

    /* Check if the field is present */
    if (mask & field_wanted->field.mask) {
      offset_before = offset;
    }

    /* Go to the next record. */
    offset += h_offset;
  }

  /* Read the field */
  csds_particle_read_field(reader, offset_before, output, type, field_wanted,
                           /* derivative */ 0, &mask, &h_offset);

  /* Deal with the first derivative. */
  int first_found = field_wanted->first.field != field_enum_none &&
                    mask & field_wanted->first.mask;
  char first_deriv[field_wanted->first.size];

  if (first_found) {
    /* Read the first derivative */
    csds_particle_read_field(reader, offset_before, first_deriv, type,
                             field_wanted, /* derivative */ 1, &mask,
                             &h_offset);
  }

  /* Deal with the second derivative. */

  int second_found = field_wanted->second.field != field_enum_none &&
                     mask & field_wanted->second.mask;
  char second_deriv[field_wanted->second.size];

  if (second_found) {
    /* Read the first derivative */
    csds_particle_read_field(reader, offset_before, second_deriv, type,
                             field_wanted, /* derivative */ 2, &mask,
                             &h_offset);
  }

  /* Get the time. */
  // TODO reduce search interval
  double time_before = time_array_get_time(&reader->log.times, offset_before);

  /* When interpolating, we need to get the next record after
     the requested time.
  */
  if (interp_type != csds_reader_const) {

    /* Loop over the records until having all the fields. */
    while (1) {

      /* Read the particle. */
      csds_loader_io_read_mask(h, reader->log.log.map + offset, &mask,
                               &h_offset);

      /* Do we have the field? */
      if (mask & field_wanted->field.mask) {
        break;
      }

      /* Check if we can still move forward */
      if (h_offset == 0) {
        error_python("There is no record after the current one");
      }

      /* Go to the next record. */
      offset += h_offset;
    }

    /* Output after the requested time. */
    char output_after[field_wanted->field.size];

    /* Read the field */
    csds_particle_read_field(reader, offset, output_after, type, field_wanted,
                             /* derivative */ 0, &mask, &h_offset);

    /* Deal with the first derivative. */
    char first_deriv_after[field_wanted->first.size];

    /* Did we find the derivative before and in this record? */
    first_found = field_wanted->first.field != field_enum_none && first_found &&
                  mask & field_wanted->first.mask;
    if (first_found) {
      /* Read the first derivative */
      csds_particle_read_field(reader, offset, first_deriv_after, type,
                               field_wanted, /*derivative*/ 1, &mask,
                               &h_offset);
    }

    /* Deal with the second derivative. */
    char second_deriv_after[field_wanted->second.size];

    /* Did we find the derivative before and in this record? */
    second_found = field_wanted->second.field != field_enum_none &&
                   second_found && mask & field_wanted->second.mask;
    if (second_found) {
      /* Read the second derivative */
      csds_particle_read_field(reader, offset, second_deriv_after, type,
                               field_wanted, /* derivative */ 2, &mask,
                               &h_offset);
    }

    /* Get the time. */
    // TODO reduce search interval
    double time_after = time_array_get_time(&reader->log.times, offset);

    /* Deal with the derivatives */
    struct csds_reader_field before;
    struct csds_reader_field after;
    before.field = output;
    before.first_deriv = first_found ? first_deriv : NULL;
    before.second_deriv = second_found ? second_deriv : NULL;
    after.field = output_after;
    after.first_deriv = first_found ? first_deriv_after : NULL;
    after.second_deriv = second_found ? second_deriv_after : NULL;

    /* Interpolate the data. */
    csds_particle_interpolate_field(time_before, &before, time_after, &after,
                                    output, time, field_wanted, type,
                                    &reader->params);
  }
  return 0;
}

/**
 * @brief Convert the fields from global indexes to local.
 *
 * @param reader The #csds_reader.
 * @param fields_wanted_wanted The fields to convert.
 * @param fields_info_wanted (out) #field_information for the wanted fields
 * (need to be allocated).
 * @param n_fields_wanted Number of elements in fields_wanted,
 * local_fields_wanted and its derivatives.
 * @param type The type of the particle.
 */
void csds_reader_get_fields_wanted(
    const struct csds_reader *reader, const enum field_enum *fields_wanted,
    const struct field_information **fields_found, const int n_fields_wanted,
    enum part_type type) {

  const struct header *h = &reader->log.header;

  /* Mark fields_found in order to check that the fields are found. */
  for (int i = 0; i < n_fields_wanted; i++) {
    fields_found[i] = NULL;
  }

  /* Find the corresponding fields */
  for (int i = 0; i < n_fields_wanted; i++) {

    for (int j = 0; j < h->number_fields[type]; j++) {
      /* Copy the structure if the field is found.  */
      if (h->fields[type][j].field.field == fields_wanted[i]) {
        fields_found[i] = &h->fields[type][j];
        break;
      }
    }
  }

  /* Check that we found the fields */
  for (int i = 0; i < n_fields_wanted; i++) {
    if (fields_found[i] == NULL) {
      const char *name = field_enum_get_name(fields_wanted[i]);
      error_python("Field %s not found in particle type %s", name,
                   part_type_names[type]);
    }
  }
}

/**
 * @brief Structure for the mapper function
 * #csds_reader_read_all_particles_single_type_mapper.
 */
struct extra_data_read_all {

  /*! The #csds_reader. */
  struct csds_reader *reader;

  /*! The requested time. */
  double time;

  /*! The interpolation type. */
  enum csds_reader_type interp_type;

  /*! The #field_information for all the fields requested. */
  const struct field_information **fields_wanted;

  /*! The number of fields requested. */
  int n_fields_wanted;

  /*! The array of output arrays. */
  void **output;

  /*! The number of particles at the requested time.  */
  const uint64_t *n_part;

  /*! The type of particle to read. */
  enum part_type type;

  /*! The current position in the index file. */
  size_t current_index;

  /*! The current position in the output arrays. */
  size_t current_output;
};

/**
 * @brief Mapper function of #csds_reader_read_all_particles_single_type.
 *
 * @param map_data (size_t) The current position (not used)
 * @param num_elements The number of elements to process with the current
 * thread.
 * @param extra_data Pointer to a #extra_data_read_all.
 */
void csds_reader_read_all_particles_single_type_mapper(void *map_data,
                                                       int num_elements,
                                                       void *extra_data) {

  /* Extract the data */
  struct extra_data_read_all *data_tp =
      (struct extra_data_read_all *)extra_data;
  struct csds_reader *reader = data_tp->reader;
  double time = data_tp->time;
  enum csds_reader_type interp_type = data_tp->interp_type;
  const struct field_information **fields_wanted = data_tp->fields_wanted;
  int n_fields_wanted = data_tp->n_fields_wanted;
  void **output = data_tp->output;
  const uint64_t *n_part = data_tp->n_part;
  enum part_type type = data_tp->type;

  /* Create a few variables for later */
  const size_t size_index = reader->index.index_prev.nparts[type];
  const size_t size_history =
      csds_reader_count_number_new_particles(reader, type);

  struct index_data *data =
      csds_index_get_data(&reader->index.index_prev, type);
  struct index_data *data_created =
      csds_index_get_created_history(&reader->index.index_next, type);

  /* Count the number of previous parts for the shift in output */
  uint64_t prev_npart = 0;
  for (int i = 0; i < type; i++) {
    prev_npart += n_part[i];
  }

  /* Allocate the temporary memory. */
  void **output_tmp = malloc(n_fields_wanted * sizeof(void *));
  if (output_tmp == NULL) {
    error_python("Failed to allocate the temporary output buffer");
  }
  for (int i = 0; i < n_fields_wanted; i++) {
    output_tmp[i] = malloc(fields_wanted[i]->field.size);
    if (output_tmp[i] == NULL) {
      error_python("Failed to allocate the temporary output buffer");
    }
  }

  /* Read the particles */
  for (int i = 0; i < num_elements; i++) {

    /* Get the next index. */
    size_t current_index = atomic_inc(&data_tp->current_index);

    /* Are we reading the history? */
    int reading_history = current_index >= size_index;

    /* Check if we still have some particles available. */
    if (reading_history && current_index == size_history + size_index) {
      error_python("The CSDS was not able to find enough particles.");
    }

    /* Get the offset */
    const size_t current =
        reading_history ? current_index - size_index : current_index;
    size_t offset =
        reading_history ? data_created[current].offset : data[current].offset;

    /* Loop over each field. */
    int particle_removed = 0;
    for (int field = 0; field < n_fields_wanted; field++) {

      /* Read the field. */
      particle_removed = csds_reader_read_field(
          reader, time, reader->time.time_offset, interp_type, offset,
          fields_wanted[field], output_tmp[field], type);

      /* Should we continue to read the fields of this particle? */
      if (particle_removed) {
        i--;
        break;
      }
    }

    /* Write the particle into the output if it was not removed */
    if (!particle_removed) {
      size_t current_output = atomic_inc(&data_tp->current_output);
      for (int field = 0; field < n_fields_wanted; field++) {

        /* Get the array */
        const size_t size = fields_wanted[field]->field.size;
        void *output_single =
            (char *)output[field] + (current_output + prev_npart) * size;

        /* Copy the temporary buffer into the global one */
        memcpy(output_single, output_tmp[field], size);
      }
    }
  }

  /* Free the allocated memory */
  for (int i = 0; i < n_fields_wanted; i++) {
    free(output_tmp[i]);
  }
  free(output_tmp);
}

/**
 * @brief Read all the particles of a given type from the index file.
 *
 * @param reader The #csds_reader.
 * @param time The requested time for the particle.
 * @param interp_type The type of interpolation.
 * @param fields_wanted The fields requested.
 * @param n_fields_wanted Number of field requested.
 * @param output Pointer to the output array. Size: (n_fields_wanted,
 * sum(n_part)).
 * @param n_part Number of particles of each type.
 * @param type The particle type
 */
void csds_reader_read_all_particles_single_type(
    struct csds_reader *reader, double time, enum csds_reader_type interp_type,
    const enum field_enum *fields_wanted, const int n_fields_wanted,
    void **output, const uint64_t *n_part, enum part_type type) {

  /* Allocate temporary memory. */
  const struct field_information **fields_info_wanted =
      (const struct field_information **)malloc(
          sizeof(struct field_information *) * n_fields_wanted);
  if (fields_info_wanted == NULL) {
    error_python("Failed to allocate the field information.");
  }

  /* Convert fields into the local array. */
  csds_reader_get_fields_wanted(reader, fields_wanted, fields_info_wanted,
                                n_fields_wanted, type);

  /* Compact the data */
  struct extra_data_read_all extra = {reader,
                                      time,
                                      interp_type,
                                      fields_info_wanted,
                                      n_fields_wanted,
                                      output,
                                      n_part,
                                      type,
                                      /* current_index */ 0,
                                      /* current_output */ 0};

  /* Create the threadpool */
  struct threadpool tp;
  threadpool_init(&tp, reader->params.number_threads);

  /* Read the particles */
  threadpool_map(&tp, csds_reader_read_all_particles_single_type_mapper, NULL,
                 n_part[type], 1, threadpool_auto_chunk_size, &extra);

  /* Check if we have all the expected particles. */
  if (n_part[type] != extra.current_output) {
    error("The reader was not able to find all the expected particles.");
  }

  /* Free the memory */
  threadpool_clean(&tp);
  free(fields_info_wanted);
}

/**
 * @brief Read all the particles from the index file.
 *
 * @param reader The #csds_reader.
 * @param time The requested time for the particle.
 * @param interp_type The type of interpolation.
 * @param fields_wanted The fields requested.
 * @param n_fields_wanted Number of field requested.
 * @param output Pointer to the output array. Size: (n_fields_wanted,
 * sum(n_part)).
 * @param n_part Number of particles of each type.
 */
void csds_reader_read_all_particles(struct csds_reader *reader, double time,
                                    enum csds_reader_type interp_type,
                                    const enum field_enum *fields_wanted,
                                    const int n_fields_wanted, void **output,
                                    const uint64_t *n_part) {
  const ticks tic = getticks();

  /* Read the gas */
  if (n_part[swift_type_gas] != 0) {
    csds_reader_read_all_particles_single_type(reader, time, interp_type,
                                               fields_wanted, n_fields_wanted,
                                               output, n_part, swift_type_gas);
  }

  /* Read the dark matter. */
  if (n_part[swift_type_dark_matter] != 0) {
    csds_reader_read_all_particles_single_type(
        reader, time, interp_type, fields_wanted, n_fields_wanted, output,
        n_part, swift_type_dark_matter);
  }
  /* Read the dark matter background. */
  if (n_part[swift_type_dark_matter_background] != 0) {
    csds_reader_read_all_particles_single_type(
        reader, time, interp_type, fields_wanted, n_fields_wanted, output,
        n_part, swift_type_dark_matter_background);
  }
  /* Read the neutrino dark matter. */
  if (n_part[swift_type_neutrino] != 0) {
    csds_reader_read_all_particles_single_type(
        reader, time, interp_type, fields_wanted, n_fields_wanted, output,
        n_part, swift_type_neutrino);
  }
  /* Read the stars. */
  if (n_part[swift_type_stars] != 0) {
    csds_reader_read_all_particles_single_type(
        reader, time, interp_type, fields_wanted, n_fields_wanted, output,
        n_part, swift_type_stars);
  }

  /* Print the time */
  if (reader->verbose > 0 || reader->verbose == CSDS_VERBOSE_TIMERS)
    message("took %.3f %s", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
}

/**
 * @brief Read the particles of a given type from the index file and their ids.
 *
 * @param reader The #csds_reader.
 * @param time The requested time for the particle.
 * @param interp_type The type of interpolation.
 * @param fields_wanted The fields requested.
 * @param n_fields_wanted Number of field requested.
 * @param output Pointer to the output array. Size: (n_fields_wanted,
 * sum(n_part)).
 * @param n_part Number of particles of each type.
 * Updated with the number of particles found.
 * @param type The particle type
 * @param ids The particles' ids.
 * The ids not found are removed from this array.
 */
void csds_reader_read_particles_from_ids_single_type(
    struct csds_reader *reader, double time, enum csds_reader_type interp_type,
    const enum field_enum *fields_wanted, const int n_fields_wanted,
    void **output, uint64_t *n_part, enum part_type type, long long *ids) {

  /* Count the number of previous parts for the shift in output */
  uint64_t prev_npart = 0;
  for (int i = 0; i < type; i++) {
    prev_npart += n_part[i];
  }

  /* Allocate temporary memory. */
  const struct field_information **fields_info_wanted =
      (const struct field_information **)malloc(
          sizeof(struct field_information *) * n_fields_wanted);
  if (fields_info_wanted == NULL) {
    error_python("Failed to allocate the field information.");
  }

  /* Convert fields into the local array. */
  csds_reader_get_fields_wanted(reader, fields_wanted, fields_info_wanted,
                                n_fields_wanted, type);

  /* Read the particles */
  for (size_t i = 0; i < n_part[type]; i++) {

    /* Get the offset */
    size_t offset = 0;
    int found =
        csds_index_get_offset(&reader->index.index_prev, ids[i], type, &offset);

    /* Deal with the particles not found */
    if (!found) {
      if (i != n_part[type] - 1) ids[i] = ids[i + 1];
      n_part[type] -= 1;
      i -= 1;
      continue;
    }

    /* Loop over each field. */
    for (int field = 0; field < n_fields_wanted; field++) {
      void *output_single =
          output[field] +
          (i + prev_npart) * fields_info_wanted[field]->field.size;

      /* Read the field. */
      int particle_removed = csds_reader_read_field(
          reader, time, reader->time.time_offset, interp_type, offset,
          fields_info_wanted[field], output_single, type);

      /* Is the particle still present? */
      if (particle_removed) {
        if (i != n_part[type] - 1) ids[i] = ids[i + 1];
        n_part[type] -= 1;
        i -= 1;
        break;
      }
    }
  }

  /* Free the memory */
  free(fields_info_wanted);
}

/**
 * @brief Read the particles from the index file and their ids.
 *
 * @param reader The #csds_reader.
 * @param time The requested time for the particle.
 * @param interp_type The type of interpolation.
 * @param fields_wanted The fields requested.
 * @param n_fields_wanted Number of field requested.
 * @param output Pointer to the output array. Size: (n_fields_wanted,
 * sum(n_part)).
 * @param n_part Number of particles of each type.
 * Updated with the number of particles found.
 * @param ids The particles' ids.
 * The ids not found are removed from this array.
 */
void csds_reader_read_particles_from_ids(
    struct csds_reader *reader, double time, enum csds_reader_type interp_type,
    const enum field_enum *fields_wanted, const int n_fields_wanted,
    void **output, uint64_t *n_part, long long **ids) {

  const ticks tic = getticks();

  /* Read the gas */
  if (n_part[swift_type_gas] != 0) {
    csds_reader_read_particles_from_ids_single_type(
        reader, time, interp_type, fields_wanted, n_fields_wanted, output,
        n_part, swift_type_gas, ids[swift_type_gas]);
  }

  /* Read the dark matter. */
  if (n_part[swift_type_dark_matter] != 0) {
    csds_reader_read_particles_from_ids_single_type(
        reader, time, interp_type, fields_wanted, n_fields_wanted, output,
        n_part, swift_type_dark_matter, ids[swift_type_dark_matter]);
  }
  /* Read the dark matter background. */
  if (n_part[swift_type_dark_matter_background] != 0) {
    csds_reader_read_particles_from_ids_single_type(
        reader, time, interp_type, fields_wanted, n_fields_wanted, output,
        n_part, swift_type_dark_matter_background,
        ids[swift_type_dark_matter_background]);
  }
  /* Read the stars. */
  if (n_part[swift_type_stars] != 0) {
    csds_reader_read_particles_from_ids_single_type(
        reader, time, interp_type, fields_wanted, n_fields_wanted, output,
        n_part, swift_type_stars, ids[swift_type_stars]);
  }

  /* Print the time */
  if (reader->verbose > 0 || reader->verbose == CSDS_VERBOSE_TIMERS)
    message("took %.3f %s", clocks_from_ticks(getticks() - tic),
            clocks_getunit());
}

/**
 * @brief Get the simulation initial time.
 *
 * @param reader The #csds_reader.
 *
 * @return The initial time
 */
double csds_reader_get_time_begin(struct csds_reader *reader) {
  return reader->log.times.records[0].time;
}

/**
 * @brief Get the simulation final time.
 *
 * @param reader The #csds_reader.
 *
 * @return The final time
 */
double csds_reader_get_time_end(struct csds_reader *reader) {
  const size_t ind = reader->log.times.size;
  return reader->log.times.records[ind - 1].time;
}

/**
 * @brief Get the offset of the last timestamp before a given time.
 *
 * @param reader The #csds_reader.
 * @param time The requested time.
 *
 * @return The offset of the timestamp.
 */
size_t csds_reader_get_next_offset_from_time(struct csds_reader *reader,
                                             double time) {
  size_t ind = time_array_get_index_from_time(&reader->log.times, time);
  /* We do not want to have the sentiel */
  if (reader->log.times.size - 2 == ind) {
    ind -= 1;
  }
  return reader->log.times.records[ind + 1].offset;
}

/**
 * @brief Read a record without knowing if it is a particle or a timestamp.
 *
 * WARNING This function asssumes that all the particles are hydro particles.
 * Thus it should be used only for testing the code.
 *
 * @param reader The #csds_reader.
 * @param output The already allocated buffer containing all the fields possible
 * for an hydro particle. (out) The particle if the record is a particle
 * @param time (out) The time if the record is a timestamp.
 * @param is_particle (out) 1 if the record is a particle 0 otherwise.
 * @param offset The offset of the record to read.
 *
 * @return The offset after the record.
 */
size_t csds_reader_read_record(struct csds_reader *reader, void **output,
                               double *time, int *is_particle, size_t offset) {

  /* Get a few pointers. */
  const struct header *h = &reader->log.header;
  char *map = reader->log.log.map;

  size_t mask = 0;
  size_t h_offset = 0;

  /* Read the record's mask. */
  map = csds_loader_io_read_mask(h, map + offset, &mask, &h_offset);

  *is_particle = !(mask & TIMESTAMP_MASK);
  /* The record is a particle. */
  if (*is_particle) {
    size_t offset_tmp = offset;
    for (int i = 0; i < h->number_fields[swift_type_gas]; i++) {
      offset = csds_particle_read_field(
          reader, offset_tmp, output[i], swift_type_gas,
          &h->fields[swift_type_gas][i], /* derivative */ 0, &mask, &h_offset);
    }
  }
  /* The record is a timestamp. */
  else {
    struct time_record time_record;
    offset = time_read(&time_record, reader, offset);
    *time = time_record.time;
  }

  return offset;
}