Commit ea1c32a7 authored by Loic Hausammann's avatar Loic Hausammann

Make required change to logger

parent dbeeeb42
......@@ -57,7 +57,7 @@ AC_C_RESTRICT
# logger
AC_ARG_ENABLE([logger],
[AS_HELP_STRING([--enable-logger],
[enable the logger output format]
[enable the particle logger]
)],
[with_logger="${enableval}"],
[with_logger="no"]
......@@ -1622,7 +1622,7 @@ AC_MSG_RESULT([
CPU profiler : $have_profiler
Pthread barriers : $have_pthread_barrier
VELOCIraptor enabled : $have_velociraptor
Logger : $with_logger
Particle Logger : $with_logger
Hydro scheme : $with_hydro
Dimensionality : $with_dimension
......
......@@ -34,9 +34,3 @@ InitialConditions:
file_name: ./sedov.hdf5
periodic: 1
smoothing_length_scaling: 3.33
# Parameters governing the logger snapshot system
Logger:
delta_step: 10 # (Optional) Update the particle log every this many updates
mmaped_buffer_size: .05 # buffer size in GB
basename: indice # Common part of the filenames
......@@ -1008,7 +1008,6 @@ int main(int argc, char *argv[]) {
/* Write the state of the system before starting time integration. */
#ifdef WITH_LOGGER
logger_ensure_size(e.log, e.total_nr_parts, e.total_nr_gparts, 0);
logger_log_all(e.log, &e);
engine_dump_index(&e);
#endif
......
......@@ -93,10 +93,10 @@ Snapshots:
# Parameters governing the logger snapshot system
Logger:
delta_step: 10 # (Optional) Update the particle log every this many updates
mmaped_buffer_size: .01 # buffer size in GB
buffer_scale: 1.2 # (Optional) When buffer size is too small, update it with required memory times buffer_scale
basename: indice # Common part of the filenames
delta_step: 10 # Update the particle log every this many updates
initial_buffer_size: 1 # buffer size in GB
buffer_scale: 10 # (Optional) When buffer size is too small, update it with required memory times buffer_scale
basename: index # Common part of the filenames
# Parameters governing the conserved quantities statistics
Statistics:
......
......@@ -57,11 +57,15 @@ void *dump_get(struct dump *d, size_t count, size_t *offset) {
/**
* @brief Ensure that at least size bytes are available in the #dump.
*
* @param d The #dump.
* @param required_size The required size for the #dump
* @param increase_size If not enough size, increase by this amount
*/
void dump_ensure(struct dump *d, size_t size) {
void dump_ensure(struct dump *d, size_t required_size, size_t increase_size) {
/* If we have enough space already, just bail. */
if (d->size - d->count > size) return;
if (d->size - d->count > required_size) return;
/* Unmap the current data. */
if (munmap(d->data, d->size) != 0) {
......@@ -73,7 +77,7 @@ void dump_ensure(struct dump *d, size_t size) {
const size_t trunc_count = d->count & d->page_mask;
d->file_offset += trunc_count;
d->count -= trunc_count;
d->size = (size * dump_grow_ensure_factor + ~d->page_mask) & d->page_mask;
d->size = (increase_size + ~d->page_mask) & d->page_mask;
/* Re-allocate the file size. */
if (posix_fallocate(d->fd, d->file_offset, d->size) != 0) {
......
......@@ -27,9 +27,6 @@
/* Standard headers */
#include <stdlib.h>
/* Some constants. */
#define dump_grow_ensure_factor 10
/** The dump struct. */
struct dump {
......@@ -54,7 +51,7 @@ struct dump {
/* Function prototypes. */
void dump_init(struct dump *d, const char *filename, size_t size);
void dump_ensure(struct dump *d, size_t size);
void dump_ensure(struct dump *d, size_t required_size, size_t increase_size);
void dump_sync(struct dump *d);
void dump_close(struct dump *d);
void *dump_get(struct dump *d, size_t count, size_t *offset);
......
......@@ -114,8 +114,7 @@ const char *engine_policy_names[] = {"none",
"stars",
"structure finding",
"star formation",
"feedback",
"logger"};
"feedback"};
/** The rank of the engine as a global variable (for messages). */
int engine_rank;
......@@ -5213,7 +5212,10 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs,
space_init_sparts(s, e->verbose);
#ifdef WITH_LOGGER
/* Mark the first time step in the particle logger file. */
logger_log_timestamp(e->log, e->ti_current, &e->log->timestamp_offset);
/* Make sure that we have enough space in the particle logger file
* to store the particles in current time step. */
logger_ensure_size(e->log, e->total_nr_parts, e->total_nr_gparts, 0);
#endif
......@@ -5467,7 +5469,10 @@ void engine_step(struct engine *e) {
e->forcerebuild = 1;
#ifdef WITH_LOGGER
/* Mark the current time step in the particle logger file. */
logger_log_timestamp(e->log, e->ti_current, &e->log->timestamp_offset);
/* Make sure that we have enough space in the particle logger file
* to store the particles in current time step. */
logger_ensure_size(e->log, e->total_nr_parts, e->total_nr_gparts, 0);
#endif
......@@ -5622,6 +5627,7 @@ void engine_check_for_dumps(struct engine *e) {
/* Dump everything */
engine_print_stats(e);
#ifdef WITH_LOGGER
/* Write a file containing the offsets in the particle logger. */
engine_dump_index(e);
#else
engine_dump_snapshot(e);
......@@ -5656,6 +5662,7 @@ void engine_check_for_dumps(struct engine *e) {
/* Dump snapshot */
#ifdef WITH_LOGGER
/* Write a file containing the offsets in the particle logger. */
engine_dump_index(e);
#else
engine_dump_snapshot(e);
......@@ -5678,6 +5685,7 @@ void engine_check_for_dumps(struct engine *e) {
/* Dump snapshot */
#ifdef WITH_LOGGER
/* Write a file containing the offsets in the particle logger. */
engine_dump_index(e);
#else
engine_dump_snapshot(e);
......@@ -5717,6 +5725,7 @@ void engine_check_for_dumps(struct engine *e) {
/* Dump... */
#ifdef WITH_LOGGER
/* Write a file containing the offsets in the particle logger. */
engine_dump_index(e);
#else
engine_dump_snapshot(e);
......@@ -6506,23 +6515,6 @@ void engine_dump_index(struct engine *e) {
struct clocks_time time1, time2;
clocks_gettime(&time1);
#ifdef SWIFT_DEBUG_CHECKS
/* Check that all cells have been drifted to the current time.
* That can include cells that have not
* previously been active on this rank. */
space_check_drift_point(e->s, e->ti_current,
e->policy & engine_policy_self_gravity);
/* Be verbose about this */
if (e->nodeID == 0) {
if (e->policy & engine_policy_cosmology)
message("Writing index at a=%e",
exp(e->ti_current * e->time_base) * e->cosmology->a_begin);
else
message("Writing index at t=%e",
e->ti_current * e->time_base + e->time_begin);
}
#else
if (e->verbose) {
if (e->policy & engine_policy_cosmology)
message("Writing index at a=%e",
......@@ -6531,14 +6523,13 @@ void engine_dump_index(struct engine *e) {
message("Writing index at t=%e",
e->ti_current * e->time_base + e->time_begin);
}
#endif
/* Dump... */
write_index_single(e, e->log->base_name, e->internal_units,
e->snapshot_units);
/* Flag that we dumped a snapshot */
e->step_props |= engine_step_prop_snapshot;
e->step_props |= engine_step_prop_logger_index;
clocks_gettime(&time2);
if (e->verbose)
......
......@@ -78,7 +78,7 @@ enum engine_policy {
engine_policy_feedback = (1 << 18)
};
#define engine_maxpolicy 18
extern const char *engine_policy_names[];
extern const char *engine_policy_names[engine_maxpolicy+2];
/**
* @brief The different unusual events that can take place in a time-step.
......@@ -90,7 +90,8 @@ enum engine_step_properties {
engine_step_prop_repartition = (1 << 2),
engine_step_prop_statistics = (1 << 3),
engine_step_prop_snapshot = (1 << 4),
engine_step_prop_restarts = (1 << 5)
engine_step_prop_restarts = (1 << 5),
engine_step_prop_logger_index = (1 << 6)
};
/* Some constants */
......
......@@ -751,10 +751,6 @@ __attribute__((always_inline)) INLINE static void hydro_first_init_part(
xp->a_grav[2] = 0.f;
xp->entropy_full = p->entropy;
#ifdef WITH_LOGGER
logger_part_data_init(&xp->logger_data);
#endif
hydro_reset_acceleration(p);
hydro_init_part(p, NULL);
}
......@@ -776,19 +772,4 @@ hydro_set_init_internal_energy(struct part *p, float u_init) {
p->entropy = u_init;
}
#ifdef WITH_LOGGER
/**
* @brief Should this particle write its data now ?
*
* @param xp The #xpart.
* @param e The #engine containing information about the current time.
* @return 1 if the #part should write, 0 otherwise.
*/
__attribute__((always_inline)) INLINE static int xpart_should_write(
const struct xpart *xp, const struct engine *e) {
return (xp->logger_data.last_output > e->log->delta_step);
}
#endif
#endif /* SWIFT_GADGET2_HYDRO_H */
......@@ -188,33 +188,6 @@ INLINE static void hydro_write_particles(const struct part* parts,
#endif
}
/**
* @brief Specifies which particle fields to write to a dataset
*
* @param parts The particle array.
* @param list The list of i/o properties to write.
* @param num_fields The number of i/o fields to write.
*
* In this version, we only want the ids and the offset.
*/
__attribute__((always_inline)) INLINE static void hydro_write_index(
const struct part* parts, const struct xpart* xparts, struct io_props* list,
int* num_fields) {
#ifdef WITH_LOGGER
*num_fields = 2;
/* List what we want to write */
list[0] = io_make_output_field("ParticleIDs", ULONGLONG, 1,
UNIT_CONV_NO_UNITS, parts, id);
list[1] = io_make_output_field("Offset", ULONGLONG, 1, UNIT_CONV_NO_UNITS,
xparts, logger_data.last_offset);
#else
error("Cannot write index without logger");
#endif
}
/**
* @brief Writes the current model of SPH to the file
* @param h_grpsph The HDF5 group in which to write
......
......@@ -57,6 +57,7 @@ struct xpart {
struct cooling_xpart_data cooling_data;
#ifdef WITH_LOGGER
/* Additional data for the particle logger */
struct logger_part_data logger_data;
#endif
......
......@@ -48,7 +48,9 @@ const unsigned int logger_datatype_size[logger_data_count] = {
};
/**
* @brief Write the header of a chunk (offset + mask)
* @brief Write the header of a chunk (offset + mask).
*
* This is maybe broken for big(?) endian.
*
* @param buff The writing buffer
* @param mask The mask to write
......@@ -91,6 +93,8 @@ void logger_write_data(struct dump *d, size_t *offset, size_t size,
/**
* @brief Write a parameter to the file
*
* TODO Make it thread safe or remove it.
*
* write data in the following order: name, data type, data.
* It should be used only for the file header.
*
......@@ -179,6 +183,13 @@ int logger_compute_chunk_size(unsigned int mask) {
* @param e The #engine
*/
void logger_log_all(struct logger *log, const struct engine *e) {
/* Ensure that enough space is available */
logger_ensure_size(log, e->total_nr_parts, e->total_nr_gparts, 0);
#ifdef SWIFT_DEBUG_CHECKS
message("Need to implement stars");
#endif
/* some constants */
const struct space *s = e->s;
const unsigned int mask = logger_mask_x | logger_mask_v | logger_mask_a |
......@@ -189,7 +200,7 @@ void logger_log_all(struct logger *log, const struct engine *e) {
for (long long i = 0; i < e->total_nr_parts; i++) {
logger_log_part(log, &s->parts[i], mask,
&s->xparts[i].logger_data.last_offset);
s->xparts[i].logger_data.last_output = 0;
s->xparts[i].logger_data.steps_since_last_output = 0;
}
/* loop over all gparts */
......@@ -384,21 +395,10 @@ void logger_ensure_size(struct logger *log, size_t total_nr_parts,
struct logger_parameters *log_params = log->params;
/* count part memory */
size_t limit = log_params->offset_size + log_params->mask_size;
for (size_t i = 0; i < log_params->nber_mask; i++) {
if (log_params->masks[i] != logger_mask_timestamp)
limit += log_params->masks_data_size[i];
}
size_t limit = log_params->total_size;
limit *= total_nr_parts;
/* check if enough place for all particles */
if (log->buffer_size < limit) {
log->buffer_size = log->buffer_scale * limit;
message("Increasing buffer size to %.3f GB", log->buffer_size * 1e-9);
}
/* count gpart memory */
if (total_nr_gparts > 0) error("Not implemented");
......@@ -406,7 +406,7 @@ void logger_ensure_size(struct logger *log, size_t total_nr_parts,
if (total_nr_sparts > 0) error("Not implemented");
/* ensure enough space in dump */
dump_ensure(log->dump, log->buffer_size);
dump_ensure(log->dump, limit, log->buffer_scale * limit);
}
/**
......@@ -418,10 +418,9 @@ void logger_ensure_size(struct logger *log, size_t total_nr_parts,
void logger_init(struct logger *log, struct swift_params *params) {
/* read parameters */
log->delta_step = parser_get_param_int(params, "Logger:delta_step");
log->buffer_size =
parser_get_param_float(params, "Logger:mmaped_buffer_size") * 1e9;
size_t buffer_size = parser_get_param_float(params, "Logger:initial_buffer_size") * 1e9;
log->buffer_scale =
parser_get_opt_param_float(params, "Logger:buffer_scale", 1.2);
parser_get_opt_param_float(params, "Logger:buffer_scale", 10);
parser_get_param_string(params, "Logger:basename", log->base_name);
/* set initial value of parameters */
......@@ -441,10 +440,8 @@ void logger_init(struct logger *log, struct swift_params *params) {
log->dump = malloc(sizeof(struct dump));
struct dump *dump_file = log->dump;
dump_init(dump_file, logger_name_file, log->buffer_size);
dump_init(dump_file, logger_name_file, buffer_size);
/* ensure enough place in dump */
dump_ensure(dump_file, log->buffer_size);
}
/**
......@@ -490,7 +487,7 @@ void logger_write_file_header(struct logger *log, const struct engine *e) {
logger_write_data(dump, &file_offset, logger_datatype_size[logger_data_bool],
&reversed);
/* will write the offset of the first particle here */
/* placeholder to write the offset of the first log here */
char *skip_header = dump_get(dump, log_params.offset_size, &file_offset);
/* write number of bytes used for names */
......@@ -507,11 +504,11 @@ void logger_write_file_header(struct logger *log, const struct engine *e) {
/* write number of masks */
logger_write_data(dump, &file_offset, log_params.number_size,
&log_params.nber_mask);
&log_params.number_mask);
/* write masks */
// loop over all mask type
for (size_t i = 0; i < log_params.nber_mask; i++) {
for (size_t i = 0; i < log_params.number_mask; i++) {
// mask name
size_t j = i * log_params.label_size;
logger_write_data(dump, &file_offset, log_params.label_size,
......@@ -544,8 +541,6 @@ void logger_write_file_header(struct logger *log, const struct engine *e) {
/* free memory */
free(name);
dump_ensure(log->dump, log->buffer_size);
}
/**
......@@ -561,10 +556,10 @@ void logger_parameters_init(struct logger_parameters *log_params) {
log_params->number_size = 1;
log_params->data_type_size = 1;
log_params->nber_mask = 8;
log_params->number_mask = 8;
/* set masks array */
log_params->masks = malloc(sizeof(size_t) * log_params->nber_mask);
log_params->masks = malloc(sizeof(size_t) * log_params->number_mask);
log_params->masks[0] = logger_mask_x;
log_params->masks[1] = logger_mask_v;
log_params->masks[2] = logger_mask_a;
......@@ -575,7 +570,7 @@ void logger_parameters_init(struct logger_parameters *log_params) {
log_params->masks[7] = logger_mask_timestamp;
/* set the mask names */
size_t block_size = log_params->label_size * log_params->nber_mask;
size_t block_size = log_params->label_size * log_params->number_mask;
log_params->masks_name = malloc(block_size);
char *cur_name = log_params->masks_name;
......@@ -613,7 +608,7 @@ void logger_parameters_init(struct logger_parameters *log_params) {
cur_name += log_params->label_size;
/* set the data size */
log_params->masks_data_size = malloc(sizeof(size_t) * log_params->nber_mask);
log_params->masks_data_size = malloc(sizeof(size_t) * log_params->number_mask);
log_params->masks_data_size[0] = 3 * sizeof(double);
log_params->masks_data_size[1] = 3 * sizeof(float);
log_params->masks_data_size[2] = 3 * sizeof(float);
......@@ -623,6 +618,14 @@ void logger_parameters_init(struct logger_parameters *log_params) {
log_params->masks_data_size[6] = sizeof(float) + sizeof(long long);
log_params->masks_data_size[7] = sizeof(integertime_t);
/* Compute the size of a chunk if all the mask are activated */
log_params->total_size = logger_offset_size + logger_mask_size;
for (size_t i = 0; i < log_params->number_mask; i++) {
if (log_params->masks[i] != logger_mask_timestamp)
log_params->total_size += log_params->masks_data_size[i];
}
// todo masks_type
}
......
......@@ -29,8 +29,9 @@
/* Forward declaration */
struct dump;
struct part;
struct gpart;
struct part;
/* TODO remove dependency */
struct engine;
/**
......@@ -120,7 +121,7 @@ struct logger_parameters {
size_t data_type_size;
/* number of different mask */
size_t nber_mask;
size_t number_mask;
/* value of each masks */
size_t *masks;
......@@ -130,6 +131,9 @@ struct logger_parameters {
/* label of each mask */
char *masks_name;
/* Size of a chunk if every mask are activated */
size_t total_size;
};
/* structure containing global data */
......@@ -146,9 +150,6 @@ struct logger {
/* timestamp offset for logger*/
size_t timestamp_offset;
/* size of the buffer */
size_t buffer_size;
/* scaling factor when buffer is too small */
float buffer_scale;
......@@ -160,17 +161,12 @@ struct logger {
/* required structure for each particle type */
struct logger_part_data {
/* Number of particle updates since last output */
short int last_output;
int steps_since_last_output;
/* offset of last particle log entry */
size_t last_offset;
};
INLINE static void logger_part_data_init(struct logger_part_data *logger) {
logger->last_offset = 0;
logger->last_output = SHRT_MAX;
}
enum logger_datatype {
logger_data_int,
logger_data_float,
......@@ -205,6 +201,29 @@ int logger_read_timestamp(unsigned long long int *t, size_t *offset,
void logger_parameters_init(struct logger_parameters *log_params);
void logger_parameters_clean(struct logger_parameters *log_params);
/**
* @brief Initialize the logger data for a particle.
*
* @param logger The #logger_part_data.
*/
INLINE static void logger_part_data_init(struct logger_part_data *logger) {
logger->last_offset = 0;
logger->steps_since_last_output = SHRT_MAX;
}
/**
* @brief Should this particle write its data now ?
*
* @param xp The #xpart.
* @param e The #engine containing information about the current time.
* @return 1 if the #part should write, 0 otherwise.
*/
__attribute__((always_inline)) INLINE static int logger_should_write(
const struct logger_part_data *logger_data, const struct logger *log) {
return (logger_data->steps_since_last_output > log->delta_step);
}
#endif /* WITH_LOGGER */
#endif /* SWIFT_LOGGER_H */
......@@ -26,12 +26,37 @@
/* Includes. */
#include "engine.h"
#include "io_properties.h"
#include "part.h"
#include "units.h"
void write_index_single(struct engine* e, const char* baseName,
const struct unit_system* internal_units,
const struct unit_system* snapshot_units);
/**
* @brief Specifies which particle fields to write to a dataset
*
* @param parts The particle array.
* @param list The list of i/o properties to write.
* @param num_fields The number of i/o fields to write.
*
* In this version, we only want the ids and the offset.
*/
__attribute__((always_inline)) INLINE static void hydro_write_index(
const struct part* parts, const struct xpart* xparts, struct io_props* list,
int* num_fields) {
*num_fields = 2;
/* List what we want to write */
list[0] = io_make_output_field("ParticleIDs", ULONGLONG, 1,
UNIT_CONV_NO_UNITS, parts, id);
list[1] = io_make_output_field("Offset", ULONGLONG, 1, UNIT_CONV_NO_UNITS,
xparts, logger_data.last_offset);
}
#endif
#endif /* SWIFT_LOGGER_IO_H */
......@@ -431,8 +431,9 @@ void prepareArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile,
io_write_attribute_s(h_data, "Conversion factor", buffer);
/* Add a line to the XMF */
xmf_write_line(xmfFile, fileName, partTypeGroupName, props.name, N_total,
props.dimension, props.type);
if (xmfFile != NULL)
xmf_write_line(xmfFile, fileName, partTypeGroupName, props.name, N_total,
props.dimension, props.type);
/* Close everything */
H5Pclose(h_plist_id);
......
......@@ -2713,7 +2713,7 @@ void runner_do_logger(struct runner *r, struct cell *c, int timer) {
/* Anything to do here? */
if (!cell_is_starting_hydro(c, e) && !cell_is_starting_gravity(c, e)) return;
/* Recurse? */
/* Recurse? Avoid spending too much time in useless cells. */
if (c->split) {
for (int k = 0; k < 8; k++)
if (c->progeny[k] != NULL) runner_do_logger(r, c->progeny[k], 0);
......@@ -2726,22 +2726,25 @@ void runner_do_logger(struct runner *r, struct cell *c, int timer) {
struct part *restrict p = &parts[k];
struct xpart *restrict xp = &xparts[k];
/* If particle needs to be kicked */
/* If particle needs to be log */
/* This is the same function than part_is_active, except for
* debugging checks */
if (part_is_starting(p, e)) {
if (xpart_should_write(xp, e)) {
if (logger_should_write(&xp->logger_data, e->log)) {
/* Write particle */
/* Currently writing everything, should adapt it through time */
logger_log_part(e->log, p,
logger_mask_x | logger_mask_v | logger_mask_a |
logger_mask_u | logger_mask_h | logger_mask_rho |
logger_mask_consts,
&xp->logger_data.last_offset);
// message("Offset: %lu", p->last_offset);
/* Set counter back to zero */
xp->logger_data.last_output = 0;
xp->logger_data.steps_since_last_output = 0;
} else
/* Update counter */
xp->logger_data.last_output += 1;
xp->logger_data.steps_since_last_output += 1;
}
}
}
......
......@@ -300,8 +300,9 @@ void prepareArray(const struct engine* e, hid_t grp, char* fileName,
if (h_data < 0) error("Error while creating dataspace '%s'.", props.name);
/* Write XMF description for this data set */
xmf_write_line(xmfFile, fileName, partTypeGroupName, props.name, N_total,
props.dimension, props.type);
if (xmfFile != NULL)
xmf_write_line(xmfFile, fileName, partTypeGroupName, props.name, N_total,
props.dimension, props.type);
/* Write unit conversion factors for this data set */
char buffer[FIELD_BUFFER_SIZE];
......
......@@ -2696,6 +2696,10 @@ void space_first_init_parts_mapper(void *restrict map_data, int count,
for (int k = 0; k < count; k++) {
hydro_first_init_part(&p[k], &xp[k]);
#ifdef WITH_LOGGER
logger_part_data_init(&xp[k].logger_data);
#endif
/* Overwrite the internal energy? */
if (u_init > 0.f) hydro_set_init_internal_energy(&p[k], u_init);
......
# Parameters governing the logger snapshot system
Logger:
delta_step: 10 # (Optional) Update the particle log every this many updates
mmaped_buffer_size: .01 # buffer size in GB
initial_buffer_size: .1 # buffer size in GB
basename: indice # Common part of the filenames
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment