Commit c4d7c5e0 authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Merge branch 'threadpool_splitting' into 'master'

Use the threadpool to parallelize operations in the particle-splitting code

Closes #641

See merge request !1001
parents 753d3c84 8eb197d9
......@@ -27,6 +27,8 @@
/* Config parameters. */
#include "../config.h"
#include "inline.h"
/* Import the right black holes definition */
#if defined(BLACK_HOLES_NONE)
#include "./black_holes/Default/black_holes_struct.h"
......
......@@ -3107,17 +3107,10 @@ void engine_split(struct engine *e, struct partition *initial_partition) {
swift_free("gparts", s->gparts);
s->gparts = gparts_new;
/* Re-link the parts. */
if (s->nr_parts > 0 && s->nr_gparts > 0)
part_relink_parts_to_gparts(s->gparts, s->nr_gparts, s->parts);
/* Re-link the sparts. */
if (s->nr_sparts > 0 && s->nr_gparts > 0)
part_relink_sparts_to_gparts(s->gparts, s->nr_gparts, s->sparts);
/* Re-link the bparts. */
if (s->nr_bparts > 0 && s->nr_gparts > 0)
part_relink_bparts_to_gparts(s->gparts, s->nr_gparts, s->bparts);
/* Re-link everything to the gparts. */
if (s->nr_gparts > 0)
part_relink_all_parts_to_gparts(s->gparts, s->nr_gparts, s->parts,
s->sparts, s->bparts, &e->threadpool);
#ifdef SWIFT_DEBUG_CHECKS
......
......@@ -36,6 +36,201 @@
const int particle_split_factor = 2;
const double displacement_factor = 0.2;
/**
* @brief Data structure used by the counter mapper function
*/
struct data_count {
const struct engine *const e;
const float mass_threshold;
size_t counter;
};
/**
* @brief Data structure used by the split mapper function
*/
struct data_split {
const struct engine *const e;
const float mass_threshold;
size_t *const k_parts;
size_t *const k_gparts;
swift_lock_type lock;
};
/**
* @brief Mapper function to count the number of #part above the mass threshold
* for splitting.
*/
void engine_split_gas_particle_count_mapper(void *restrict map_data, int count,
void *restrict extra_data) {
/* Unpack the data */
struct part *parts = (struct part *)map_data;
struct data_count *data = (struct data_count *)extra_data;
const struct engine *e = data->e;
const float mass_threshold = data->mass_threshold;
size_t counter = 0;
for (int i = 0; i < count; ++i) {
const struct part *p = &parts[i];
/* Ignore inhibited particles */
if (part_is_inhibited(p, e)) continue;
/* Is the mass of this particle larger than the threshold? */
const float gas_mass = hydro_get_mass(p);
if (gas_mass > mass_threshold) ++counter;
}
/* Increment the global counter */
atomic_add(&data->counter, counter);
}
/**
* @brief Mapper function to split the #part above the mass threshold.
*/
void engine_split_gas_particle_split_mapper(void *restrict map_data, int count,
void *restrict extra_data) {
/* Unpack the data */
struct part *parts = (struct part *)map_data;
struct data_split *data = (struct data_split *)extra_data;
const float mass_threshold = data->mass_threshold;
const struct engine *e = data->e;
const int with_gravity = (e->policy & engine_policy_self_gravity) ||
(e->policy & engine_policy_external_gravity);
/* Additional thread-global particle arrays */
const struct space *s = e->s;
struct part *global_parts = s->parts;
struct xpart *global_xparts = s->xparts;
struct gpart *global_gparts = s->gparts;
/* xpart array corresponding to the thread-local particle array */
const ptrdiff_t offset = parts - global_parts;
struct xpart *xparts = global_xparts + offset;
/* Loop over the chunk of the part array assigned to this thread */
for (int i = 0; i < count; ++i) {
/* Is the mass of this particle larger than the threshold? */
struct part *p = &parts[i];
/* Ignore inhibited particles */
if (part_is_inhibited(p, e)) continue;
const float gas_mass = hydro_get_mass(p);
const float h = p->h;
/* Found a particle to split */
if (gas_mass > mass_threshold) {
/* Make sure only one thread is here */
lock_lock(&data->lock);
/* Where should we put the new particle in the array? */
const size_t k_parts = *data->k_parts;
const size_t k_gparts = *data->k_gparts;
/* Make the next slot available for the next particle */
(*data->k_parts)++;
if (with_gravity) (*data->k_gparts)++;
/* Release the lock and continue in parallel */
if (lock_unlock(&data->lock) != 0)
error("Impossible to unlock particle splitting");
/* We now know where to put the new particle we are going to create. */
/* Current other fields associated to this particle */
struct xpart *xp = &xparts[i];
struct gpart *gp = p->gpart;
/* Start by copying over the particles */
memcpy(&global_parts[k_parts], p, sizeof(struct part));
memcpy(&global_xparts[k_parts], xp, sizeof(struct xpart));
if (with_gravity) {
memcpy(&global_gparts[k_gparts], gp, sizeof(struct gpart));
}
/* Update the IDs */
global_parts[k_parts].id += 2;
/* Re-link everything */
if (with_gravity) {
global_parts[k_parts].gpart = &global_gparts[k_gparts];
global_gparts[k_gparts].id_or_neg_offset = -k_parts;
}
/* Displacement unit vector */
const double delta_x = random_unit_interval(p->id, e->ti_current,
(enum random_number_type)0);
const double delta_y = random_unit_interval(p->id, e->ti_current,
(enum random_number_type)1);
const double delta_z = random_unit_interval(p->id, e->ti_current,
(enum random_number_type)2);
/* Displace the old particle */
p->x[0] += delta_x * displacement_factor * h;
p->x[1] += delta_y * displacement_factor * h;
p->x[2] += delta_z * displacement_factor * h;
if (with_gravity) {
gp->x[0] += delta_x * displacement_factor * h;
gp->x[1] += delta_y * displacement_factor * h;
gp->x[2] += delta_z * displacement_factor * h;
}
/* Displace the new particle */
global_parts[k_parts].x[0] -= delta_x * displacement_factor * h;
global_parts[k_parts].x[1] -= delta_y * displacement_factor * h;
global_parts[k_parts].x[2] -= delta_z * displacement_factor * h;
if (with_gravity) {
global_gparts[k_gparts].x[0] -= delta_x * displacement_factor * h;
global_gparts[k_gparts].x[1] -= delta_y * displacement_factor * h;
global_gparts[k_gparts].x[2] -= delta_z * displacement_factor * h;
}
/* Divide the mass */
const double new_mass = gas_mass * 0.5;
hydro_set_mass(p, new_mass);
hydro_set_mass(&global_parts[k_parts], new_mass);
if (with_gravity) {
gp->mass = new_mass;
global_gparts[k_gparts].mass = new_mass;
}
/* Split the chemistry fields */
chemistry_split_part(p, particle_split_factor);
chemistry_split_part(&global_parts[k_parts], particle_split_factor);
/* Split the cooling fields */
cooling_split_part(p, xp, particle_split_factor);
cooling_split_part(&global_parts[k_parts], &global_xparts[k_parts],
particle_split_factor);
/* Split the star formation fields */
star_formation_split_part(p, xp, particle_split_factor);
star_formation_split_part(&global_parts[k_parts], &global_xparts[k_parts],
particle_split_factor);
/* Split the tracer fields */
tracers_split_part(p, xp, particle_split_factor);
tracers_split_part(&global_parts[k_parts], &global_xparts[k_parts],
particle_split_factor);
/* Mark the particles as not having been swallowed */
black_holes_mark_part_as_not_swallowed(&p->black_holes_data);
black_holes_mark_part_as_not_swallowed(
&global_parts[k_parts].black_holes_data);
}
}
}
/**
* @brief Identify all the gas particles above a given mass threshold
* and split them into 2.
......@@ -45,12 +240,15 @@ const double displacement_factor = 0.2;
* the local array of particles. In case of reallocations, it may
* also have to loop over the gravity and other arrays.
*
* This is done on a node-by-node basis. No MPI required here.
*
* @param e The #engine.
*/
void engine_split_gas_particles(struct engine *e) {
/* Abort if we are not doing any splitting */
if (!e->hydro_properties->particle_splitting) return;
if (e->s->nr_parts == 0) return;
/* Time this */
const ticks tic = getticks();
......@@ -59,8 +257,9 @@ void engine_split_gas_particles(struct engine *e) {
struct space *s = e->s;
const int with_gravity = (e->policy & engine_policy_self_gravity) ||
(e->policy & engine_policy_external_gravity);
const double mass_threshold =
const float mass_threshold =
e->hydro_properties->particle_splitting_mass_threshold;
const size_t nr_parts_old = s->nr_parts;
/* Quick check to avoid problems */
if (particle_split_factor != 2) {
......@@ -68,23 +267,12 @@ void engine_split_gas_particles(struct engine *e) {
"Invalid splitting factor. Can currently only split particles into 2!");
}
/* Start by counting how many particles are above the threshold for splitting
*/
size_t counter = 0;
const size_t nr_parts_old = s->nr_parts;
const struct part *parts_old = s->parts;
for (size_t i = 0; i < nr_parts_old; ++i) {
const struct part *p = &parts_old[i];
/* Ignore inhibited particles */
if (part_is_inhibited(p, e)) continue;
/* Is the mass of this particle larger than the threshold? */
const float gas_mass = hydro_get_mass(p);
if (gas_mass > mass_threshold) ++counter;
}
/* Start by counting how many particles are above the threshold
* for splitting (this is done in parallel over the threads) */
struct data_count data_count = {e, mass_threshold, 0};
threadpool_map(&e->threadpool, engine_split_gas_particle_count_mapper,
s->parts, nr_parts_old, sizeof(struct part), 0, &data_count);
const size_t counter = data_count.counter;
/* Early abort? */
if (counter == 0) {
......@@ -109,17 +297,29 @@ void engine_split_gas_particles(struct engine *e) {
if (e->verbose) message("Reallocating the part array!");
/* Allocate a larger array and copy over */
struct part *parts_new = NULL;
if (swift_memalign("parts", (void **)&parts_new, part_align,
sizeof(struct part) * s->size_parts) != 0)
error("Failed to allocate new part data.");
/* Tell the compiler that the arrays are aligned */
swift_align_information(struct part, s->parts, part_align);
swift_align_information(struct part, parts_new, part_align);
memcpy(parts_new, s->parts, sizeof(struct part) * s->nr_parts);
swift_free("parts", s->parts);
/* Allocate a larger array and copy over */
struct xpart *xparts_new = NULL;
if (swift_memalign("xparts", (void **)&xparts_new, xpart_align,
sizeof(struct xpart) * s->size_parts) != 0)
error("Failed to allocate new part data.");
/* Tell the compiler that the arrays are aligned */
swift_align_information(struct xpart, s->xparts, xpart_align);
swift_align_information(struct xpart, xparts_new, xpart_align);
memcpy(xparts_new, s->xparts, sizeof(struct xpart) * s->nr_parts);
swift_free("xparts", s->xparts);
......@@ -135,18 +335,23 @@ void engine_split_gas_particles(struct engine *e) {
if (e->verbose) message("Reallocating the gpart array!");
/* Allocate a larger array and copy over */
struct gpart *gparts_new = NULL;
if (swift_memalign("gparts", (void **)&gparts_new, gpart_align,
sizeof(struct gpart) * s->size_gparts) != 0)
error("Failed to allocate new gpart data.");
/* Tell the compiler that the arrays are aligned */
swift_align_information(struct gpart, s->gparts, gpart_align);
swift_align_information(struct gpart, gparts_new, gpart_align);
/* Copy the particles */
memcpy(gparts_new, s->gparts, sizeof(struct gpart) * s->nr_gparts);
swift_free("gparts", s->gparts);
/* We now need to correct all the pointers of the other particle arrays */
part_relink_all_parts_to_gparts(gparts_new, s->nr_gparts, s->parts,
s->sparts, s->bparts);
s->sparts, s->bparts, &e->threadpool);
s->gparts = gparts_new;
}
......@@ -157,115 +362,18 @@ void engine_split_gas_particles(struct engine *e) {
s->nr_gparts, s->nr_sparts, s->nr_bparts, e->verbose);
#endif
/* We now have enough memory in the part array to accomodate the new
* particles. We can start the splitting procedure */
size_t k_parts = s->nr_parts;
size_t k_gparts = s->nr_gparts;
/* Loop over the particles again to split them */
struct part *parts = s->parts;
struct xpart *xparts = s->xparts;
struct gpart *gparts = s->gparts;
for (size_t i = 0; i < nr_parts_old; ++i) {
/* Is the mass of this particle larger than the threshold? */
struct part *p = &parts[i];
/* Ignore inhibited particles */
if (part_is_inhibited(p, e)) continue;
const float gas_mass = hydro_get_mass(p);
const float h = p->h;
/* Found a particle to split */
if (gas_mass > mass_threshold) {
struct xpart *xp = &xparts[i];
struct gpart *gp = p->gpart;
/* Start by copying over the particles */
memcpy(&parts[k_parts], p, sizeof(struct part));
memcpy(&xparts[k_parts], xp, sizeof(struct xpart));
if (with_gravity) {
memcpy(&gparts[k_gparts], gp, sizeof(struct gpart));
}
/* Update the IDs */
parts[k_parts].id += 2;
/* Re-link everything */
if (with_gravity) {
parts[k_parts].gpart = &gparts[k_gparts];
gparts[k_gparts].id_or_neg_offset = -k_parts;
}
/* Displacement unit vector */
const double delta_x = random_unit_interval(p->id, e->ti_current,
(enum random_number_type)0);
const double delta_y = random_unit_interval(p->id, e->ti_current,
(enum random_number_type)1);
const double delta_z = random_unit_interval(p->id, e->ti_current,
(enum random_number_type)2);
/* Displace the old particle */
p->x[0] += delta_x * displacement_factor * h;
p->x[1] += delta_y * displacement_factor * h;
p->x[2] += delta_z * displacement_factor * h;
if (with_gravity) {
gp->x[0] += delta_x * displacement_factor * h;
gp->x[1] += delta_y * displacement_factor * h;
gp->x[2] += delta_z * displacement_factor * h;
}
/* Displace the new particle */
parts[k_parts].x[0] -= delta_x * displacement_factor * h;
parts[k_parts].x[1] -= delta_y * displacement_factor * h;
parts[k_parts].x[2] -= delta_z * displacement_factor * h;
if (with_gravity) {
gparts[k_gparts].x[0] -= delta_x * displacement_factor * h;
gparts[k_gparts].x[1] -= delta_y * displacement_factor * h;
gparts[k_gparts].x[2] -= delta_z * displacement_factor * h;
}
/* Divide the mass */
const double new_mass = gas_mass * 0.5;
hydro_set_mass(p, new_mass);
hydro_set_mass(&parts[k_parts], new_mass);
if (with_gravity) {
gp->mass = new_mass;
gparts[k_gparts].mass = new_mass;
}
/* Split the chemistry fields */
chemistry_split_part(p, particle_split_factor);
chemistry_split_part(&parts[k_parts], particle_split_factor);
/* Split the cooling fields */
cooling_split_part(p, xp, particle_split_factor);
cooling_split_part(&parts[k_parts], &xparts[k_parts],
particle_split_factor);
/* Split the star formation fields */
star_formation_split_part(p, xp, particle_split_factor);
star_formation_split_part(&parts[k_parts], &xparts[k_parts],
particle_split_factor);
/* Split the tracer fields */
tracers_split_part(p, xp, particle_split_factor);
tracers_split_part(&parts[k_parts], &xparts[k_parts],
particle_split_factor);
/* Mark the particles as not having been swallowed */
black_holes_mark_part_as_not_swallowed(&p->black_holes_data);
black_holes_mark_part_as_not_swallowed(&parts[k_parts].black_holes_data);
/* Move to the next free slot */
k_parts++;
if (with_gravity) k_gparts++;
}
}
struct data_split data_split = {e, mass_threshold, &k_parts, &k_gparts, 0};
lock_init(&data_split.lock);
threadpool_map(&e->threadpool, engine_split_gas_particle_split_mapper,
s->parts, nr_parts_old, sizeof(struct part), 0, &data_split);
if (lock_destroy(&data_split.lock) != 0) error("Error destroying lock");
/* Update the local counters */
s->nr_parts = k_parts;
......
......@@ -32,6 +32,7 @@
#include "error.h"
#include "hydro.h"
#include "part.h"
#include "threadpool.h"
/**
* @brief Re-link the #gpart%s associated with the list of #part%s.
......@@ -40,8 +41,8 @@
* @param N The number of particles to re-link;
* @param offset The offset of #part%s relative to the global parts list.
*/
void part_relink_gparts_to_parts(struct part *parts, size_t N,
ptrdiff_t offset) {
void part_relink_gparts_to_parts(struct part *parts, const size_t N,
const ptrdiff_t offset) {
for (size_t k = 0; k < N; k++) {
if (parts[k].gpart) {
parts[k].gpart->id_or_neg_offset = -(k + offset);
......@@ -56,8 +57,8 @@ void part_relink_gparts_to_parts(struct part *parts, size_t N,
* @param N The number of s-particles to re-link;
* @param offset The offset of #spart%s relative to the global sparts list.
*/
void part_relink_gparts_to_sparts(struct spart *sparts, size_t N,
ptrdiff_t offset) {
void part_relink_gparts_to_sparts(struct spart *sparts, const size_t N,
const ptrdiff_t offset) {
for (size_t k = 0; k < N; k++) {
if (sparts[k].gpart) {
sparts[k].gpart->id_or_neg_offset = -(k + offset);
......@@ -72,8 +73,8 @@ void part_relink_gparts_to_sparts(struct spart *sparts, size_t N,
* @param N The number of s-particles to re-link;
* @param offset The offset of #bpart%s relative to the global bparts list.
*/
void part_relink_gparts_to_bparts(struct bpart *bparts, size_t N,
ptrdiff_t offset) {
void part_relink_gparts_to_bparts(struct bpart *bparts, const size_t N,
const ptrdiff_t offset) {
for (size_t k = 0; k < N; k++) {
if (bparts[k].gpart) {
bparts[k].gpart->id_or_neg_offset = -(k + offset);
......@@ -88,7 +89,7 @@ void part_relink_gparts_to_bparts(struct bpart *bparts, size_t N,
* @param N The number of particles to re-link;
* @param parts The global #part array in which to find the #gpart offsets.
*/
void part_relink_parts_to_gparts(struct gpart *gparts, size_t N,
void part_relink_parts_to_gparts(struct gpart *gparts, const size_t N,
struct part *parts) {
for (size_t k = 0; k < N; k++) {
if (gparts[k].type == swift_type_gas) {
......@@ -104,7 +105,7 @@ void part_relink_parts_to_gparts(struct gpart *gparts, size_t N,
* @param N The number of particles to re-link;
* @param sparts The global #spart array in which to find the #gpart offsets.
*/
void part_relink_sparts_to_gparts(struct gpart *gparts, size_t N,
void part_relink_sparts_to_gparts(struct gpart *gparts, const size_t N,
struct spart *sparts) {
for (size_t k = 0; k < N; k++) {
if (gparts[k].type == swift_type_stars) {
......@@ -120,7 +121,7 @@ void part_relink_sparts_to_gparts(struct gpart *gparts, size_t N,
* @param N The number of particles to re-link;
* @param bparts The global #bpart array in which to find the #gpart offsets.
*/
void part_relink_bparts_to_gparts(struct gpart *gparts, size_t N,
void part_relink_bparts_to_gparts(struct gpart *gparts, const size_t N,
struct bpart *bparts) {
for (size_t k = 0; k < N; k++) {
if (gparts[k].type == swift_type_black_hole) {
......@@ -130,19 +131,34 @@ void part_relink_bparts_to_gparts(struct gpart *gparts, size_t N,
}
/**
* @brief Re-link both the #part%s and #spart%s associated with the list of
* #gpart%s.
* @brief Helper structure to pass data to the liking mapper functions.
*/
struct relink_data {
struct part *const parts;
struct gpart *const garts;
struct spart *const sparts;
struct bpart *const bparts;
};
/**
* @brief #threadpool mapper function for the linking of all particle types
* to the #gpart array.
*
* @param gparts The list of #gpart.
* @param N The number of particles to re-link;
* @param parts The global #part array in which to find the #gpart offsets.
* @param sparts The global #spart array in which to find the #gpart offsets.
* @param bparts The global #bpart array in which to find the #gpart offsets.
* @brief map_data The array of #gpart.
* @brief count The number of #gpart.
* @brief extra_data the #relink_data containing pointer to the other arrays.
*/
void part_relink_all_parts_to_gparts(struct gpart *gparts, size_t N,
struct part *parts, struct spart *sparts,
struct bpart *bparts) {
for (size_t k = 0; k < N; k++) {
void part_relink_all_parts_to_gparts_mapper(void *restrict map_data, int count,
void *restrict extra_data) {
/* Un-pack the data */
struct relink_data *data = (struct relink_data *)extra_data;
struct part *const parts = data->parts;
struct spart *const sparts = data->sparts;
struct bpart *const bparts = data->bparts;
struct gpart *const gparts = (struct gpart *)map_data;
for (int k = 0; k < count; k++) {
if (gparts[k].type == swift_type_gas) {
parts[-gparts[k].id_or_neg_offset].gpart = &gparts[k];
} else if (gparts[k].type == swift_type_stars) {
......@@ -153,6 +169,30 @@ void part_relink_all_parts_to_gparts(struct gpart *gparts, size_t N,
}
}
/**
* @brief Re-link both the #part%s, #spart%s and #bpart%s associated
* with the list of #gpart%s.
*
* This function uses thread parallelism and should not be called inside
* an already threaded section (unlike the functions linking individual arrays
* that are designed to be called in thread-parallel code).
*
* @param gparts The list of #gpart.
* @param N The number of particles to re-link;
* @param parts The global #part array in which to find the #gpart offsets.
* @param sparts The global #spart array in which to find the #gpart offsets.
* @param bparts The global #bpart array in which to find the #gpart offsets.
*/
void part_relink_all_parts_to_gparts(struct gpart *gparts, const size_t N,
struct part *parts, struct spart *sparts,
struct bpart *bparts,
struct threadpool *tp) {
struct relink_data data = {parts, /*gparts=*/NULL, sparts, bparts};
threadpool_map(tp, part_relink_all_parts_to_gparts_mapper, gparts, N,
sizeof(struct gpart), 0, &data);
}
/**
* @brief Verifies that the #gpart, #part and #spart are correctly linked
* together
......
......@@ -32,9 +32,10 @@