Commit 6e8bd998 authored by Loic Hausammann's avatar Loic Hausammann
Browse files

Logger: implementing strays communication

parent 9bc31c9a
......@@ -453,6 +453,21 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
/* Load the part and xpart into the proxy. */
proxy_parts_load(&e->proxies[pid], &s->parts[offset_parts + k],
&s->xparts[offset_parts + k], 1);
#ifdef WITH_LOGGER
/* Log the particle when leaving a rank. */
logger_log_part(e->log, s->parts[offset_parts + k],
logger_mask_data[logger_x].mask |
logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask |
logger_mask_data[logger_u].mask |
logger_mask_data[logger_h].mask |
logger_mask_data[logger_rho].mask |
logger_mask_data[logger_consts].mask |
logger_mask_data[logger_special_flags].mask,
s->xparts[offset_parts + k].logger_data.last_offset,
logger_generate_flag(logger_flag_mpi, node_id));
#endif
}
/* Put the sparts into the corresponding proxies. */
......@@ -488,6 +503,17 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
/* Load the spart into the proxy */
proxy_sparts_load(&e->proxies[pid], &s->sparts[offset_sparts + k], 1);
#ifdef WITH_LOGGER
/* Log the particle when leaving a rank. */
logger_log_spart(e->log, s->sparts[offset_sparts + k],
logger_mask_data[logger_x].mask |
logger_mask_data[logger_v].mask |
logger_mask_data[logger_consts].mask |
logger_mask_data[logger_special_flags].mask,
s->sparts[offset_parts + k].logger_data.last_offset,
logger_generate_flag(logger_flag_mpi, node_id));
#endif
}
/* Put the bparts into the corresponding proxies. */
......@@ -523,6 +549,10 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
/* Load the bpart into the proxy */
proxy_bparts_load(&e->proxies[pid], &s->bparts[offset_bparts + k], 1);
#ifdef WITH_LOGGER
error("TODO");
#endif
}
/* Put the gparts into the corresponding proxies. */
......@@ -552,6 +582,22 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
/* Load the gpart into the proxy */
proxy_gparts_load(&e->proxies[pid], &s->gparts[offset_gparts + k], 1);
#ifdef WITH_LOGGER
/* Write only the dark matter particles */
if (gp->type == swift_type_dark_matter) {
/* Log the particle when leaving a rank. */
logger_log_gpart(e->log, s->gparts[offset_gparts + k],
logger_mask_data[logger_x].mask |
logger_mask_data[logger_v].mask |
logger_mask_data[logger_a].mask |
logger_mask_data[logger_consts].mask |
logger_mask_data[logger_special_flags].mask,
s->sparts[offset_parts + k].logger_data.last_offset,
logger_generate_flag(logger_flag_mpi, node_id));
}
#endif
}
/* Launch the proxies. */
......@@ -771,6 +817,14 @@ void engine_exchange_strays(struct engine *e, const size_t offset_parts,
sizeof(struct spart) * prox->nr_sparts_in);
memcpy(&s->bparts[offset_bparts + count_bparts], prox->bparts_in,
sizeof(struct bpart) * prox->nr_bparts_in);
#ifdef WITH_LOGGER
logger_log_recv_strays(e->log, &s->parts[offset_parts + count_parts], prox->nr_parts_in,
&s->gparts[offset_gparts + count_gparts], prox->nr_gparts_in,
&s->sparts[offset_sparts + count_sparts], prox->nr_sparts_in,
&s->bparts[offset_bparts + count_bparts], prox->nr_bparts_in,
prox->nodeID);
#endif
/* for (int k = offset; k < offset + count; k++)
message(
"received particle %lli, x=[%.3e %.3e %.3e], h=%.3e, from node %i.",
......@@ -3297,7 +3351,7 @@ void engine_dump_snapshot(struct engine *e) {
*/
void engine_dump_index(struct engine *e) {
#if defined(WITH_LOGGER) && !defined(WITH_MPI)
#if defined(WITH_LOGGER)
struct clocks_time time1, time2;
clocks_gettime(&time1);
......
......@@ -976,6 +976,14 @@ void engine_redistribute(struct engine *e) {
for (int k = 0; k < nr_nodes; k++)
nr_bparts_new += b_counts[k * nr_nodes + nodeID];
#ifdef WITH_LOGGER
/* Log the particles before sending them out */
logger_log_before_communcations(s->parts, nr_parts_new, counts,
s->gparts, nr_gparts_new, g_counts,
s->sparts, nr_sparts_new, s_counts,
s->bparts, nr_bparts_new, b_counts);
#endif
/* Now exchange the particles, type by type to keep the memory required
* under control. */
......@@ -1028,6 +1036,14 @@ void engine_redistribute(struct engine *e) {
/* All particles have now arrived. Time for some final operations on the
stuff we just received */
#ifdef WITH_LOGGER
/* Log the received particles */
logger_log_after_communcations(s->parts, s->nr_parts, counts,
s->gparts, s->nr_gparts, gcounts,
s->sparts, s->nr_sparts, scounts,
s->bparts, s->nr_bparts, bcounts);
#endif
/* Restore the part<->gpart and spart<->gpart links.
* Generate indices and counts for threadpool tasks. Note we process a node
* at a time. */
......
......@@ -523,6 +523,15 @@ void logger_ensure_size(struct logger_writer *log, size_t total_nr_parts,
dump_ensure(&log->dump, limit, log->buffer_scale * limit);
}
/** @brief Generate the name of the dump files
*
* @param log The #logger_writer.
* @param filename The filename of the dump file.
*/
void logger_get_dump_name(struct logger_writer *log, char *filename) {
sprintf(filename, "%s_%04i.dump", log->base_name, engine_rank);
}
/**
* @brief intialize the logger structure
*
......@@ -548,8 +557,7 @@ void logger_init(struct logger_writer *log, struct swift_params *params) {
/* generate dump filename. */
char logger_name_file[PARSER_MAX_LINE_SIZE];
strcpy(logger_name_file, log->base_name);
strcat(logger_name_file, ".dump");
logger_get_dump_name(log, logger_name_file);
/* Compute max size for a particle chunk. */
int max_size = logger_offset_size + logger_mask_size;
......@@ -836,6 +844,86 @@ int logger_read_timestamp(unsigned long long int *t, double *time,
return mask;
}
#ifdef WITH_MPI
/**
* @brief Log all the particles leaving the current rank.
*
* @param parts The list of #part.
* @param nr_parts The number of parts.
* @param count The number of parts in each ranks.
* @param gparts The list of #gpart.
* @param nr_gparts The number of gparts.
* @param gcount The number of gparts in each ranks.
* @param sparts The list of #spart.
* @param nr_sparts The number of sparts.
* @param s_counts The number of sparts in each ranks.
* @param bparts The list of #bpart.
* @param nr_bparts The number of bparts.
* @param b_counts The number of bparts in each ranks.
*
*/
void logger_log_before_communcations(
struct part *parts, size_t nr_parts, int *counts,
struct gpart *gparts, size_t nr_gparts, int *g_counts,
struct spart *sparts, size_t nr_sparts, int *s_counts,
struct bpart *bparts, size_t nr_bparts, int *b_counts) {
error("TODO");
}
/**
* @brief Log all the particles arriving in the current rank.
*
* @param parts The list of #part.
* @param nr_parts The number of parts.
* @param count The number of parts in each ranks.
* @param gparts The list of #gpart.
* @param nr_gparts The number of gparts.
* @param gcount The number of gparts in each ranks.
* @param sparts The list of #spart.
* @param nr_sparts The number of sparts.
* @param s_counts The number of sparts in each ranks.
* @param bparts The list of #bpart.
* @param nr_bparts The number of bparts.
* @param b_counts The number of bparts in each ranks.
*
*/
void logger_log_after_communcations(
struct part *parts, size_t nr_parts, int *counts,
struct gpart *gparts, size_t nr_gparts, int *g_counts,
struct spart *sparts, size_t nr_sparts, int *s_counts,
struct bpart *bparts, size_t nr_bparts, int *b_counts) {
error("TODO");
}
/**
* @brief Log all the particles arriving in the current rank.
*
* @param parts The list of #part.
* @param nr_parts The number of parts.
* @param count The number of parts in each ranks.
* @param gparts The list of #gpart.
* @param nr_gparts The number of gparts.
* @param gcount The number of gparts in each ranks.
* @param sparts The list of #spart.
* @param nr_sparts The number of sparts.
* @param s_counts The number of sparts in each ranks.
* @param bparts The list of #bpart.
* @param nr_bparts The number of bparts.
* @param b_counts The number of bparts in each ranks.
*
*/
void logger_log_recv_strays(
struct logger_writer *log,
struct part *parts, size_t nr_parts, int *counts,
struct gpart *gparts, size_t nr_gparts, int *g_counts,
struct spart *sparts, size_t nr_sparts, int *s_counts,
struct bpart *bparts, size_t nr_bparts, int *b_counts) {
error("TODO");
}
#endif
/**
* @brief Write a swift_params struct to the given FILE as a stream of bytes.
*
......@@ -861,10 +949,9 @@ void logger_struct_restore(struct logger_writer *log, FILE *stream) {
/* generate dump filename */
char logger_name_file[PARSER_MAX_LINE_SIZE];
strcpy(logger_name_file, log->base_name);
strcat(logger_name_file, ".dump");
logger_get_dump_name(log, logger_name_file);
dump_restart(&log->dump, logger_name_file);
dump_restart(&log->dump, logger_name_file, log->dump.size);
}
#endif /* WITH_LOGGER */
......
......@@ -94,6 +94,11 @@ enum logger_masks_number {
logger_count_mask = 9, /* Need to be the last. */
} __attribute__((packed));
enum logger_special_flags {
logger_flag_change_type = 0,
logger_flag_mpi = 1,
} __attribute__((packed));
struct mask_data {
/* Number of bytes for a mask. */
int size;
......@@ -179,6 +184,30 @@ int logger_read_timestamp(unsigned long long int *t, double *time,
void logger_struct_dump(const struct logger_writer *log, FILE *stream);
void logger_struct_restore(struct logger_writer *log, FILE *stream);
int logger_generate_flag(enum logger_special_flags flag, int data) {
#ifdef SWIFT_DEBUG_CHECKS
if (flag & 0xFFFFFF00) {
error("The special flag in the logger cannot be larger than 1 byte.");
}
#endif
return (flag << (3 * 8)) | (data & 0xFFFFFF);
}
#ifdef WITH_MPI
void logger_log_before_communcations(
struct part *parts, size_t nr_parts, int *counts,
struct gpart *gparts, size_t nr_gparts, int *g_counts,
struct spart *sparts, size_t nr_sparts, int *s_counts,
struct bpart *bparts, size_t nr_bparts, int *b_counts);
void logger_log_after_communcations(
struct part *parts, size_t nr_parts, int *counts,
struct gpart *gparts, size_t nr_gparts, int *g_counts,
struct spart *sparts, size_t nr_sparts, int *s_counts,
struct bpart *bparts, size_t nr_bparts, int *b_counts);
#endif
/**
* @brief Initialize the logger data for a particle.
*
......
......@@ -190,8 +190,8 @@ void logger_write_index_file(struct logger_writer* log, struct engine* e) {
/* File name */
char fileName[FILENAME_BUFFER_SIZE];
snprintf(fileName, FILENAME_BUFFER_SIZE, "%.100s_%04i.index",
e->logger->base_name, outputCount);
snprintf(fileName, FILENAME_BUFFER_SIZE, "%.100s_%04i_%04i.index",
e->logger->base_name, engine_rank, outputCount);
/* Open file */
FILE* f = NULL;
......@@ -366,6 +366,10 @@ void logger_write_index_file(struct logger_writer* log, struct engine* e) {
* @params e The #engine.
*/
void logger_write_description(struct logger_writer* log, struct engine* e) {
/* Only the master writes the description */
if (engine_rank != 0) {
return;
}
/* const struct unit_system *internal_units = e->internal_units; */
/* const struct unit_system *snapshot_units = e->snapshot_units; */
......
......@@ -325,7 +325,7 @@ void runner_do_star_formation(struct runner *r, struct cell *c, int timer) {
logger_mask_data[logger_consts].mask |
logger_mask_data[logger_special_flags].mask,
&xp->logger_data.last_offset,
/* special flags */ swift_type_stars);
logger_generate_flag(logger_flag_change_type, swift_type_stars));
#endif
/* Convert the gas particle to a star particle */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment