poolstep${s}r${i}.html
+
+
+
+
+
+EOF
+done
+
+cat <> index.html
+
+
+EOF
+
+echo "Finished"
+
+exit
diff --git a/src/cell.h b/src/cell.h
index 93a59545bbed36fdd639bd7305443e4f6770e465..ef887d10e6867ec3db58b694e0c62949d19e9e7e 100644
--- a/src/cell.h
+++ b/src/cell.h
@@ -239,9 +239,6 @@ struct cell {
/*! Maximum beginning of (integer) time step in this cell. */
integertime_t ti_beg_max;
- /*! Last (integer) time the cell's sort arrays were updated. */
- integertime_t ti_sort;
-
/*! Last (integer) time the cell's part were drifted forward in time. */
integertime_t ti_old_part;
@@ -347,6 +344,9 @@ struct cell {
char do_sub_sort;
#ifdef SWIFT_DEBUG_CHECKS
+ /*! Last (integer) time the cell's sort arrays were updated. */
+ integertime_t ti_sort;
+
/*! The list of tasks that have been executed on this cell */
char tasks_executed[64];
diff --git a/src/engine.c b/src/engine.c
index ee48d6565dc62f725b0159dc44e8d9d92d7a4adf..6bd2fb5ade526e4ff7fd363fa655fd096afcd827 100644
--- a/src/engine.c
+++ b/src/engine.c
@@ -2402,21 +2402,6 @@ void engine_make_gravityrecursive_tasks(struct engine *e) {
/* } */
}
-void engine_check_sort_tasks(struct engine *e, struct cell *c) {
-
- /* Find the parent sort task, if any, and copy its flags. */
- if (c->sorts != NULL) {
- struct cell *parent = c->parent;
- while (parent != NULL && parent->sorts == NULL) parent = parent->parent;
- if (parent != NULL) c->sorts->flags |= parent->sorts->flags;
- }
-
- /* Recurse? */
- if (c->split)
- for (int k = 0; k < 8; k++)
- if (c->progeny[k] != NULL) engine_check_sort_tasks(e, c->progeny[k]);
-}
-
/**
* @brief Fill the #space's task list.
*
@@ -2495,9 +2480,6 @@ void engine_maketasks(struct engine *e) {
for (int k = 0; k < nr_cells; k++)
engine_make_hierarchical_tasks(e, &cells[k]);
- /* Append hierarchical tasks to each cell. */
- for (int k = 0; k < nr_cells; k++) engine_check_sort_tasks(e, &cells[k]);
-
/* Run through the tasks and make force tasks for each density task.
Each force task depends on the cell ghosts and unlocks the kick task
of its super-cell. */
@@ -2792,7 +2774,7 @@ int engine_marktasks(struct engine *e) {
/* Run through the tasks and mark as skip or not. */
size_t extra_data[3] = {(size_t)e, rebuild_space, (size_t)&e->sched};
threadpool_map(&e->threadpool, engine_marktasks_mapper, s->tasks, s->nr_tasks,
- sizeof(struct task), 10000, extra_data);
+ sizeof(struct task), 0, extra_data);
rebuild_space = extra_data[1];
if (e->verbose)
@@ -3696,7 +3678,7 @@ void engine_drift_all(struct engine *e) {
#endif
threadpool_map(&e->threadpool, engine_do_drift_all_mapper, e->s->cells_top,
- e->s->nr_cells, sizeof(struct cell), 1, e);
+ e->s->nr_cells, sizeof(struct cell), 0, e);
/* Synchronize particle positions */
space_synchronize_particle_positions(e->s);
@@ -3748,7 +3730,7 @@ void engine_drift_top_multipoles(struct engine *e) {
const ticks tic = getticks();
threadpool_map(&e->threadpool, engine_do_drift_top_multipoles_mapper,
- e->s->cells_top, e->s->nr_cells, sizeof(struct cell), 10, e);
+ e->s->cells_top, e->s->nr_cells, sizeof(struct cell), 0, e);
#ifdef SWIFT_DEBUG_CHECKS
/* Check that all cells have been drifted to the current time. */
@@ -3786,7 +3768,7 @@ void engine_reconstruct_multipoles(struct engine *e) {
const ticks tic = getticks();
threadpool_map(&e->threadpool, engine_do_reconstruct_multipoles_mapper,
- e->s->cells_top, e->s->nr_cells, sizeof(struct cell), 10, e);
+ e->s->cells_top, e->s->nr_cells, sizeof(struct cell), 0, e);
if (e->verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
diff --git a/src/engine.h b/src/engine.h
index c7aaa08b57eb2e61b311deae7a7ccb102f7e3cf8..79ff45a69a39ee7d4d45589df51ce2d53f810fda 100644
--- a/src/engine.h
+++ b/src/engine.h
@@ -77,7 +77,7 @@ extern const char *engine_policy_names[];
#define engine_queue_scale 1.2
#define engine_maxtaskspercell 96
#define engine_maxproxies 64
-#define engine_tasksreweight 10
+#define engine_tasksreweight 1
#define engine_parts_size_grow 1.05
#define engine_redistribute_alloc_margin 1.2
#define engine_default_energy_file_name "energy"
diff --git a/src/gravity.c b/src/gravity.c
index 97b2955b32e1513c3d86d1d1f4da2169130feb77..49bbaca39b5278009543204a0d9f5e72d69806c4 100644
--- a/src/gravity.c
+++ b/src/gravity.c
@@ -207,7 +207,7 @@ void gravity_exact_force_compute(struct space *s, const struct engine *e) {
data.const_G = e->physical_constants->const_newton_G;
threadpool_map(&s->e->threadpool, gravity_exact_force_compute_mapper,
- s->gparts, s->nr_gparts, sizeof(struct gpart), 1000, &data);
+ s->gparts, s->nr_gparts, sizeof(struct gpart), 0, &data);
message("Computed exact gravity for %d gparts (took %.3f %s). ",
data.counter_global, clocks_from_ticks(getticks() - tic),
diff --git a/src/queue.h b/src/queue.h
index 951a3e5a056d7ad0c3935f98341a0d93c805e3ad..c85cf0cabe30a03d163e2564fdc216c19495761a 100644
--- a/src/queue.h
+++ b/src/queue.h
@@ -29,7 +29,7 @@
#define queue_sizeinit 100
#define queue_sizegrow 2
#define queue_search_window 8
-#define queue_incoming_size 1024
+#define queue_incoming_size 10240
#define queue_struct_align 64
/* Counters. */
diff --git a/src/runner.c b/src/runner.c
index 5af652d8a70053110f2d7b65426995b21ad93279..04d230209a27d9c16d5ffbc0fffc9d5519e025d4 100644
--- a/src/runner.c
+++ b/src/runner.c
@@ -355,11 +355,11 @@ void runner_do_sort(struct runner *r, struct cell *c, int flags, int cleanup,
finger = finger->parent) {
if (finger->sorted & ~c->sorted) error("Inconsistent sort flags (upward).");
}
-#endif
/* Update the sort timer which represents the last time the sorts
were re-set. */
if (c->sorted == 0) c->ti_sort = r->e->ti_current;
+#endif
/* start by allocating the entry arrays. */
if (c->sort == NULL) {
diff --git a/src/scheduler.c b/src/scheduler.c
index e14fc017d3fef8b85e71b398634ce06e31830151..2e645951256d415b072cc964003d4042a842923d 100644
--- a/src/scheduler.c
+++ b/src/scheduler.c
@@ -759,7 +759,7 @@ void scheduler_splittasks(struct scheduler *s) {
/* Call the mapper on each current task. */
threadpool_map(s->threadpool, scheduler_splittasks_mapper, s->tasks,
- s->nr_tasks, sizeof(struct task), 1000, s);
+ s->nr_tasks, sizeof(struct task), 0, s);
}
/**
@@ -1174,7 +1174,7 @@ void scheduler_start(struct scheduler *s) {
/* Re-wait the tasks. */
if (s->active_count > 1000) {
threadpool_map(s->threadpool, scheduler_rewait_mapper, s->tid_active,
- s->active_count, sizeof(int), 1000, s);
+ s->active_count, sizeof(int), 0, s);
} else {
scheduler_rewait_mapper(s->tid_active, s->active_count, s);
}
@@ -1250,7 +1250,7 @@ void scheduler_start(struct scheduler *s) {
/* Loop over the tasks and enqueue whoever is ready. */
if (s->active_count > 1000) {
threadpool_map(s->threadpool, scheduler_enqueue_mapper, s->tid_active,
- s->active_count, sizeof(int), 1000, s);
+ s->active_count, sizeof(int), 0, s);
} else {
scheduler_enqueue_mapper(s->tid_active, s->active_count, s);
}
diff --git a/src/space.c b/src/space.c
index 23902a37501c7b13992f5423a3f002d526ba2c27..8ad571be3800fbeebd280dfd09b9ee29158bfdf8 100644
--- a/src/space.c
+++ b/src/space.c
@@ -378,7 +378,7 @@ void space_regrid(struct space *s, int verbose) {
/* Free the old cells, if they were allocated. */
if (s->cells_top != NULL) {
threadpool_map(&s->e->threadpool, space_rebuild_recycle_mapper,
- s->cells_top, s->nr_cells, sizeof(struct cell), 100, s);
+ s->cells_top, s->nr_cells, sizeof(struct cell), 0, s);
free(s->cells_top);
free(s->multipoles_top);
s->maxdepth = 0;
@@ -491,7 +491,7 @@ void space_regrid(struct space *s, int verbose) {
/* Free the old cells, if they were allocated. */
threadpool_map(&s->e->threadpool, space_rebuild_recycle_mapper,
- s->cells_top, s->nr_cells, sizeof(struct cell), 100, s);
+ s->cells_top, s->nr_cells, sizeof(struct cell), 0, s);
s->maxdepth = 0;
}
@@ -970,7 +970,7 @@ void space_split(struct space *s, struct cell *cells, int nr_cells,
const ticks tic = getticks();
threadpool_map(&s->e->threadpool, space_split_mapper, cells, nr_cells,
- sizeof(struct cell), 1, s);
+ sizeof(struct cell), 0, s);
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
@@ -1004,7 +1004,7 @@ void space_sanitize(struct space *s) {
if (s->e->nodeID == 0) message("Cleaning up unreasonable values of h");
threadpool_map(&s->e->threadpool, space_sanitize_mapper, s->cells_top,
- s->nr_cells, sizeof(struct cell), 1, NULL);
+ s->nr_cells, sizeof(struct cell), 0, NULL);
}
/**
@@ -1187,7 +1187,7 @@ void space_parts_get_cell_index(struct space *s, int *ind, struct cell *cells,
data.ind = ind;
threadpool_map(&s->e->threadpool, space_parts_get_cell_index_mapper, s->parts,
- s->nr_parts, sizeof(struct part), 1000, &data);
+ s->nr_parts, sizeof(struct part), 0, &data);
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
@@ -1214,7 +1214,7 @@ void space_gparts_get_cell_index(struct space *s, int *gind, struct cell *cells,
data.ind = gind;
threadpool_map(&s->e->threadpool, space_gparts_get_cell_index_mapper,
- s->gparts, s->nr_gparts, sizeof(struct gpart), 1000, &data);
+ s->gparts, s->nr_gparts, sizeof(struct gpart), 0, &data);
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
@@ -1241,7 +1241,7 @@ void space_sparts_get_cell_index(struct space *s, int *sind, struct cell *cells,
data.ind = sind;
threadpool_map(&s->e->threadpool, space_sparts_get_cell_index_mapper,
- s->sparts, s->nr_sparts, sizeof(struct spart), 1000, &data);
+ s->sparts, s->nr_sparts, sizeof(struct spart), 0, &data);
if (verbose)
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
@@ -2501,7 +2501,7 @@ void space_synchronize_particle_positions(struct space *s) {
(s->nr_gparts > 0 && s->nr_sparts > 0))
threadpool_map(&s->e->threadpool,
space_synchronize_particle_positions_mapper, s->gparts,
- s->nr_gparts, sizeof(struct gpart), 1000, (void *)s);
+ s->nr_gparts, sizeof(struct gpart), 0, (void *)s);
}
/**
diff --git a/src/statistics.c b/src/statistics.c
index 57d60bcb1b247c9616c859b7ac8a475acdcd878f..5a3f1ff4f9a2232a14817e7e55fd2cff5bdcd80e 100644
--- a/src/statistics.c
+++ b/src/statistics.c
@@ -271,12 +271,12 @@ void stats_collect(const struct space *s, struct statistics *stats) {
/* Run parallel collection of statistics for parts */
if (s->nr_parts > 0)
threadpool_map(&s->e->threadpool, stats_collect_part_mapper, s->parts,
- s->nr_parts, sizeof(struct part), 10000, &extra_data);
+ s->nr_parts, sizeof(struct part), 0, &extra_data);
/* Run parallel collection of statistics for gparts */
if (s->nr_gparts > 0)
threadpool_map(&s->e->threadpool, stats_collect_gpart_mapper, s->gparts,
- s->nr_gparts, sizeof(struct gpart), 10000, &extra_data);
+ s->nr_gparts, sizeof(struct gpart), 0, &extra_data);
}
/**
diff --git a/src/swift.h b/src/swift.h
index 20397eb24df478cba65a0e35d686b402f1d8ee70..1d1a7c7d04b3662c524504c292aa7d9eee2c3d09 100644
--- a/src/swift.h
+++ b/src/swift.h
@@ -57,6 +57,7 @@
#include "sourceterms.h"
#include "space.h"
#include "task.h"
+#include "threadpool.h"
#include "timeline.h"
#include "timers.h"
#include "tools.h"
diff --git a/src/threadpool.c b/src/threadpool.c
index c11fd8121bb02f36fce1796d79a7eb55a38102c4..79bd586cd9d32b22a21f05ca66bfcc6b30bdfbbf 100644
--- a/src/threadpool.c
+++ b/src/threadpool.c
@@ -26,13 +26,139 @@
#include
#include
#include
+#ifdef SWIFT_DEBUG_THREADPOOL
+#include
+#endif
/* This object's header. */
#include "threadpool.h"
/* Local headers. */
#include "atomic.h"
+#include "clocks.h"
#include "error.h"
+#include "minmax.h"
+
+#ifdef SWIFT_DEBUG_THREADPOOL
+/**
+ * @breif Store a log entry of the given chunk.
+ */
+void threadpool_log(struct threadpool *tp, int tid, size_t chunk_size,
+ ticks tic, ticks toc) {
+ struct mapper_log *log = &tp->logs[tid > 0 ? tid : 0];
+
+ /* Check if we need to re-allocate the log buffer. */
+ if (log->count == log->size) {
+ log->size *= 2;
+ struct mapper_log_entry *new_log;
+ if ((new_log = (struct mapper_log_entry *)malloc(
+ sizeof(struct mapper_log_entry) * log->size)) == NULL)
+ error("Failed to re-allocate mapper log.");
+ memcpy(new_log, log->log, sizeof(struct mapper_log_entry) * log->count);
+ free(log->log);
+ log->log = new_log;
+ }
+
+ /* Store the new entry. */
+ struct mapper_log_entry *entry = &log->log[log->count];
+ entry->tid = tid;
+ entry->chunk_size = chunk_size;
+ entry->tic = tic;
+ entry->toc = toc;
+ entry->map_function = tp->map_function;
+ log->count++;
+}
+
+void threadpool_dump_log(struct threadpool *tp, const char *filename,
+ int reset) {
+
+ /* Open the output file. */
+ FILE *fd;
+ if ((fd = fopen(filename, "w")) == NULL)
+ error("Failed to create log file '%s'.", filename);
+
+ /* Create a buffer of function names. */
+ const int max_names = 100;
+ struct name_entry {
+ threadpool_map_function map_function;
+ const char *name;
+ };
+ struct name_entry names[max_names];
+ bzero(names, sizeof(struct name_entry) * max_names);
+
+ /* Write a header. */
+ fprintf(fd, "# map_function thread_id chunk_size tic toc\n");
+ fprintf(fd, "# {'num_threads': %i, 'cpufreq': %lli}\n", tp->num_threads,
+ clocks_get_cpufreq());
+
+ /* Loop over the per-tid logs and dump them. */
+ for (int k = 0; k < tp->num_threads; k++) {
+ struct mapper_log *log = &tp->logs[k];
+
+ /* Loop over the log entries and dump them. */
+ for (int i = 0; i < log->count; i++) {
+
+ struct mapper_log_entry *entry = &log->log[i];
+
+ /* Look for the function pointer in the buffer. */
+ int nid = 0;
+ while (nid < max_names && names[nid].map_function != entry->map_function)
+ nid++;
+
+ /* If the name was not found, make a new entry. */
+ if (nid == max_names) {
+ for (int j = 1; j < max_names; j++) names[j - 1] = names[j];
+ names[0].map_function = entry->map_function;
+ Dl_info dl_info;
+ dladdr(entry->map_function, &dl_info);
+ names[0].name = dl_info.dli_sname;
+ nid = 0;
+ }
+
+ /* Log a line to the file. */
+ fprintf(fd, "%s %i %i %lli %lli\n", names[nid].name, entry->tid,
+ entry->chunk_size, entry->tic, entry->toc);
+ }
+
+ /* Clear the log if requested. */
+ if (reset) log->count = 0;
+ }
+
+ /* Close the file. */
+ fclose(fd);
+}
+#endif // SWIFT_DEBUG_THREADPOOL
+
+/**
+ * @brief Runner main loop, get a chunk and call the mapper function.
+ */
+void threadpool_chomp(struct threadpool *tp, int tid) {
+
+ /* Loop until we can't get a chunk. */
+ while (1) {
+ /* Desired chunk size. */
+ size_t chunk_size =
+ (tp->map_data_size - tp->map_data_count) / (2 * tp->num_threads);
+ if (chunk_size > tp->map_data_chunk) chunk_size = tp->map_data_chunk;
+ if (chunk_size < 1) chunk_size = 1;
+
+ /* Get a chunk and check its size. */
+ size_t task_ind = atomic_add(&tp->map_data_count, chunk_size);
+ if (task_ind >= tp->map_data_size) break;
+ if (task_ind + chunk_size > tp->map_data_size)
+ chunk_size = tp->map_data_size - task_ind;
+
+/* Call the mapper function. */
+#ifdef SWIFT_DEBUG_THREADPOOL
+ ticks tic = getticks();
+#endif
+ tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind),
+ chunk_size, tp->map_extra_data);
+#ifdef SWIFT_DEBUG_THREADPOOL
+ threadpool_log(tp, tid, chunk_size, tic, getticks());
+#endif
+ }
+}
void *threadpool_runner(void *data) {
@@ -43,39 +169,13 @@ void *threadpool_runner(void *data) {
while (1) {
/* Let the controller know that this thread is waiting. */
- pthread_mutex_lock(&tp->thread_mutex);
- tp->num_threads_waiting += 1;
- if (tp->num_threads_waiting == tp->num_threads) {
- pthread_cond_signal(&tp->control_cond);
- }
+ pthread_barrier_wait(&tp->wait_barrier);
/* Wait for the controller. */
- pthread_cond_wait(&tp->thread_cond, &tp->thread_mutex);
- tp->num_threads_waiting -= 1;
- tp->num_threads_running += 1;
- if (tp->num_threads_running == tp->num_threads) {
- pthread_cond_signal(&tp->control_cond);
- }
- pthread_mutex_unlock(&tp->thread_mutex);
-
- /* The index of the mapping task we will work on next. */
- while (1) {
- /* Desired chunk size. */
- size_t chunk_size =
- (tp->map_data_size - tp->map_data_count) / (2 * tp->num_threads);
- if (chunk_size > tp->map_data_chunk) chunk_size = tp->map_data_chunk;
- if (chunk_size < 1) chunk_size = 1;
-
- /* Get a chunk and check its size. */
- size_t task_ind = atomic_add(&tp->map_data_count, chunk_size);
- if (task_ind >= tp->map_data_size) break;
- if (task_ind + chunk_size > tp->map_data_size)
- chunk_size = tp->map_data_size - task_ind;
-
- /* Call the mapper function. */
- tp->map_function((char *)tp->map_data + (tp->map_data_stride * task_ind),
- chunk_size, tp->map_extra_data);
- }
+ pthread_barrier_wait(&tp->run_barrier);
+
+ /* Do actual work. */
+ threadpool_chomp(tp, atomic_inc(&tp->num_threads_running));
}
}
@@ -89,18 +189,28 @@ void threadpool_init(struct threadpool *tp, int num_threads) {
/* Initialize the thread counters. */
tp->num_threads = num_threads;
- tp->num_threads_waiting = 0;
+
+#ifdef SWIFT_DEBUG_THREADPOOL
+ if ((tp->logs = (struct mapper_log *)malloc(sizeof(struct mapper_log) *
+ num_threads)) == NULL)
+ error("Failed to allocate mapper logs.");
+ for (int k = 0; k < num_threads; k++) {
+ tp->logs[k].size = threadpool_log_initial_size;
+ tp->logs[k].count = 0;
+ if ((tp->logs[k].log = (struct mapper_log_entry *)malloc(
+ sizeof(struct mapper_log_entry) * tp->logs[k].size)) == NULL)
+ error("Failed to allocate mapper log.");
+ }
+#endif
/* If there is only a single thread, do nothing more as of here as
we will just do work in the (blocked) calling thread. */
if (num_threads == 1) return;
- /* Init the threadpool mutexes. */
- if (pthread_mutex_init(&tp->thread_mutex, NULL) != 0)
- error("Failed to initialize mutexex.");
- if (pthread_cond_init(&tp->control_cond, NULL) != 0 ||
- pthread_cond_init(&tp->thread_cond, NULL) != 0)
- error("Failed to initialize condition variables.");
+ /* Init the barriers. */
+ if (pthread_barrier_init(&tp->wait_barrier, NULL, num_threads) != 0 ||
+ pthread_barrier_init(&tp->run_barrier, NULL, num_threads) != 0)
+ error("Failed to initialize barriers.");
/* Set the task counter to zero. */
tp->map_data_size = 0;
@@ -109,24 +219,21 @@ void threadpool_init(struct threadpool *tp, int num_threads) {
tp->map_data_chunk = 0;
tp->map_function = NULL;
- /* Allocate the threads. */
- if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads)) ==
- NULL) {
+ /* Allocate the threads, one less than requested since the calling thread
+ works as well. */
+ if ((tp->threads = (pthread_t *)malloc(sizeof(pthread_t) *
+ (num_threads - 1))) == NULL) {
error("Failed to allocate thread array.");
}
/* Create and start the threads. */
- pthread_mutex_lock(&tp->thread_mutex);
- for (int k = 0; k < num_threads; k++) {
+ for (int k = 0; k < num_threads - 1; k++) {
if (pthread_create(&tp->threads[k], NULL, &threadpool_runner, tp) != 0)
error("Failed to create threadpool runner thread.");
}
/* Wait for all the threads to be up and running. */
- while (tp->num_threads_waiting < tp->num_threads) {
- pthread_cond_wait(&tp->control_cond, &tp->thread_mutex);
- }
- pthread_mutex_unlock(&tp->thread_mutex);
+ pthread_barrier_wait(&tp->wait_barrier);
}
/**
@@ -140,7 +247,8 @@ void threadpool_init(struct threadpool *tp, int num_threads) {
* @param map_data The data on which the mapping function will be called.
* @param N Number of elements in @c map_data.
* @param stride Size, in bytes, of each element of @c map_data.
- * @param chunk Number of map data elements to pass to the function at a time.
+ * @param chunk Number of map data elements to pass to the function at a time,
+ * or zero to choose the number automatically.
* @param extra_data Addtitional pointer that will be passed to the mapping
* function, may contain additional data.
*/
@@ -148,37 +256,65 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function,
void *map_data, size_t N, int stride, int chunk,
void *extra_data) {
+#ifdef SWIFT_DEBUG_THREADPOOL
+ ticks tic = getticks();
+#endif
+
/* If we just have a single thread, call the map function directly. */
if (tp->num_threads == 1) {
map_function(map_data, N, extra_data);
+#ifdef SWIFT_DEBUG_THREADPOOL
+ threadpool_log(tp, 0, N, tic, getticks());
+#endif
return;
}
/* Set the map data and signal the threads. */
- pthread_mutex_lock(&tp->thread_mutex);
tp->map_data_stride = stride;
tp->map_data_size = N;
tp->map_data_count = 0;
- tp->map_data_chunk = chunk;
+ tp->map_data_chunk =
+ chunk ? chunk
+ : max((int)(N / (tp->num_threads * threadpool_default_chunk_ratio)),
+ 1);
tp->map_function = map_function;
tp->map_data = map_data;
tp->map_extra_data = extra_data;
tp->num_threads_running = 0;
- pthread_cond_broadcast(&tp->thread_cond);
/* Wait for all the threads to be up and running. */
- while (tp->num_threads_running < tp->num_threads) {
- pthread_cond_wait(&tp->control_cond, &tp->thread_mutex);
- }
+ pthread_barrier_wait(&tp->run_barrier);
+
+ /* Do some work while I'm at it. */
+ threadpool_chomp(tp, tp->num_threads - 1);
/* Wait for all threads to be done. */
- while (tp->num_threads_waiting < tp->num_threads) {
- pthread_cond_wait(&tp->control_cond, &tp->thread_mutex);
- }
- pthread_mutex_unlock(&tp->thread_mutex);
+ pthread_barrier_wait(&tp->wait_barrier);
+
+#ifdef SWIFT_DEBUG_THREADPOOL
+ /* Log the total call time to thread id -1. */
+ threadpool_log(tp, -1, N, tic, getticks());
+#endif
}
+/**
+ * @brief Re-sets the log for this #threadpool.
+ */
+#ifdef SWIFT_DEBUG_THREADPOOL
+void threadpool_reset_log(struct threadpool *tp) {
+ for (int k = 0; k < tp->num_threads; k++) tp->logs[k].count = 0;
+}
+#endif
+
/**
* @brief Frees up the memory allocated for this #threadpool.
*/
-void threadpool_clean(struct threadpool *tp) { free(tp->threads); }
+void threadpool_clean(struct threadpool *tp) {
+ free(tp->threads);
+#ifdef SWIFT_DEBUG_THREADPOOL
+ for (int k = 0; k < tp->num_threads; k++) {
+ free(tp->logs[k].log);
+ }
+ free(tp->logs);
+#endif
+}
diff --git a/src/threadpool.h b/src/threadpool.h
index f9c7eeffb700adc579ec05902193b888cdd6363d..019403f658a22d36c4a6e1ec1ae1fdc47c62658d 100644
--- a/src/threadpool.h
+++ b/src/threadpool.h
@@ -25,10 +25,44 @@
/* Some standard headers. */
#include
+/* Local includes. */
+#include "cycle.h"
+
+/* Local defines. */
+#define threadpool_log_initial_size 1000
+#define threadpool_default_chunk_ratio 7
+
/* Function type for mappings. */
typedef void (*threadpool_map_function)(void *map_data, int num_elements,
void *extra_data);
+/* Data for threadpool logging. */
+struct mapper_log_entry {
+
+ /* ID of the thread executing the chunk. */
+ int tid;
+
+ /* Size of the chunk processed. */
+ int chunk_size;
+
+ /* Pointer to the mapper function. */
+ threadpool_map_function map_function;
+
+ /*! Start and end time of this task */
+ ticks tic, toc;
+};
+
+struct mapper_log {
+ /* Log of threadpool mapper calls. */
+ struct mapper_log_entry *log;
+
+ /* Size of the allocated log. */
+ int size;
+
+ /* Number of entries in the log. */
+ int count;
+};
+
/* Data of a threadpool. */
struct threadpool {
@@ -36,8 +70,8 @@ struct threadpool {
pthread_t *threads;
/* This is where threads go to rest. */
- pthread_mutex_t thread_mutex;
- pthread_cond_t control_cond, thread_cond;
+ pthread_barrier_t wait_barrier;
+ pthread_barrier_t run_barrier;
/* Current map data and count. */
void *map_data, *map_extra_data;
@@ -49,7 +83,11 @@ struct threadpool {
int num_threads;
/* Counter for the number of threads that are done. */
- volatile int num_threads_waiting, num_threads_running;
+ volatile int num_threads_running;
+
+#ifdef SWIFT_DEBUG_THREADPOOL
+ struct mapper_log *logs;
+#endif
};
/* Function prototypes. */
@@ -58,5 +96,10 @@ void threadpool_map(struct threadpool *tp, threadpool_map_function map_function,
void *map_data, size_t N, int stride, int chunk,
void *extra_data);
void threadpool_clean(struct threadpool *tp);
+#ifdef SWIFT_DEBUG_THREADPOOL
+void threadpool_reset_log(struct threadpool *tp);
+void threadpool_dump_log(struct threadpool *tp, const char *filename,
+ int reset);
+#endif
#endif /* SWIFT_THREADPOOL_H */
diff --git a/tests/test125cells.c b/tests/test125cells.c
index 29e661b28c833da46c21fe44bce99a607207b0e7..a34bc00e843f793f17ffe20dad1f1b32055f494d 100644
--- a/tests/test125cells.c
+++ b/tests/test125cells.c
@@ -336,7 +336,6 @@ struct cell *make_cell(size_t n, const double offset[3], double size, double h,
cell->ti_old_part = 8;
cell->ti_end_min = 8;
cell->ti_end_max = 8;
- cell->ti_sort = 0;
// shuffle_particles(cell->parts, cell->count);
diff --git a/tests/test27cells.c b/tests/test27cells.c
index fe0b15bfe7671ddbf7c9f66a19407d1d74d5b380..f34d82c587a5a4abb91670b03f02393fa48e57ca 100644
--- a/tests/test27cells.c
+++ b/tests/test27cells.c
@@ -174,7 +174,6 @@ struct cell *make_cell(size_t n, double *offset, double size, double h,
cell->ti_old_part = 8;
cell->ti_end_min = 8;
cell->ti_end_max = 8;
- cell->ti_sort = 8;
shuffle_particles(cell->parts, cell->count);
diff --git a/tests/testPeriodicBC.c b/tests/testPeriodicBC.c
index 6dbe27a64908e232cb9478dfd2e2cf41d908d463..469ca9b15bab3b28978bdadb12a07846ac30551b 100644
--- a/tests/testPeriodicBC.c
+++ b/tests/testPeriodicBC.c
@@ -173,7 +173,6 @@ struct cell *make_cell(size_t n, double *offset, double size, double h,
cell->ti_old_part = 8;
cell->ti_end_min = 8;
cell->ti_end_max = 8;
- cell->ti_sort = 8;
shuffle_particles(cell->parts, cell->count);
diff --git a/tests/testThreadpool.c b/tests/testThreadpool.c
index aa65d533a29afbe4e7e8384fb887281822a31e58..2a9e98c5ca71ba62c5f6a266f4a36f658f61b063 100644
--- a/tests/testThreadpool.c
+++ b/tests/testThreadpool.c
@@ -23,6 +23,7 @@
#include
// Local includes.
+#include "../config.h"
#include "../src/atomic.h"
#include "../src/threadpool.h"
@@ -78,6 +79,11 @@ int main(int argc, char *argv[]) {
threadpool_map(&tp, map_function_first, data, N, sizeof(int), 2, NULL);
}
+/* If logging was enabled, dump the log. */
+#ifdef SWIFT_DEBUG_THREADPOOL
+ threadpool_dump_log(&tp, "threadpool_log.txt", 1);
+#endif
+
/* Be clean */
threadpool_clean(&tp);