Commit b788cc00 authored by Matthieu Schaller's avatar Matthieu Schaller
Browse files

Merged Pedro's dynamic unlock branch

parents 1c1e2d4d 62962a2f
......@@ -46,10 +46,6 @@
#include "kernel.h"
#include "timers.h"
#define to_check 5394
#define num_checks 11
struct task *check[num_checks];
/**
* @brief Add an unlock_task to the given task.
*
......@@ -61,40 +57,6 @@ struct task *check[num_checks];
void scheduler_addunlock(struct scheduler *s, struct task *ta,
struct task *tb) {
/* /\* Main loop. *\/ */
/* while (1) { */
/* /\* Follow the links. *\/ */
/* while (ta->nr_unlock_tasks == task_maxunlock + 1) */
/* ta = ta->unlock_tasks[task_maxunlock]; */
/* /\* Get the index of the next free task. *\/ */
/* const int ind = atomic_inc(&ta->nr_unlock_tasks); */
/* /\* Is there room in this task? *\/ */
/* if (ind < task_maxunlock) { */
/* ta->unlock_tasks[ind] = tb; */
/* break; */
/* } */
/* /\* Otherwise, generate a link task. *\/ */
/* else { */
/* /\* Only one thread should have to do this. *\/ */
/* if (ind == task_maxunlock) { */
/* ta->unlock_tasks[task_maxunlock] = */
/* scheduler_addtask(s, task_type_link, task_subtype_none,
* ta->flags, */
/* 0, ta->ci, ta->cj, 0); */
/* ta->unlock_tasks[task_maxunlock]->implicit = 1; */
/* } */
/* /\* Otherwise, reduce the count. *\/ */
/* else */
/* atomic_dec(&ta->nr_unlock_tasks); */
/* } */
/* } */
/* Lock the scheduler since re-allocating the unlocks is not
thread-safe. */
if (lock_lock(&s->lock) != 0) error("Unable to lock scheduler.");
......@@ -164,6 +126,9 @@ void scheduler_splittasks(struct scheduler *s) {
break;
}
/* Skip sorting tasks. */
if (t->type == task_type_psort) continue;
/* Empty task? */
if (t->ci == NULL || (t->type == task_type_pair && t->cj == NULL)) {
t->type = task_type_none;
......@@ -989,10 +954,12 @@ void scheduler_reweight(struct scheduler *s) {
* @param s The #scheduler.
* @param mask The task types to enqueue.
*/
void scheduler_start(struct scheduler *s, unsigned int mask) {
int nr_tasks = s->nr_tasks, *tid = s->tasks_ind;
struct task *t, *tasks = s->tasks;
// ticks tic;
/* Store the mask */
s->mask = mask | (1 << task_type_rewait);
......@@ -1003,6 +970,8 @@ void scheduler_start(struct scheduler *s, unsigned int mask) {
s->tasks[k].wait = 1;
s->tasks[k].rid = -1;
}
// message( "waiting tasks took %.3f ms." , (double)( getticks() - tic ) /
// CPU_TPS * 1000 );
/* Enqueue a set of extraenous tasks to set the task waits. */
struct task *rewait_tasks = &s->tasks[s->nr_tasks];
......@@ -1033,152 +1002,21 @@ void scheduler_start(struct scheduler *s, unsigned int mask) {
}
pthread_mutex_unlock(&s->sleep_mutex);
/* message("waiting tasks took %.3f ms.",
(double)(getticks() - tic) / CPU_TPS * 1000); */
(double)(getticks() - tic) / CPU_TPS * 1000); */
/* Loop over the tasks and enqueue whoever is ready. */
// tic = getticks();
for (int k = 0; k < s->nr_tasks; k++) {
t = &tasks[tid[k]];
if (atomic_dec(&t->wait) == 1 && ((1 << t->type) & s->mask) && !t->skip) {
scheduler_enqueue(s, t);
pthread_cond_broadcast(&s->sleep_cond);
}
}
// message( "enqueueing tasks took %.3f ms." , (double)( getticks() - tic ) /
// CPU_TPS * 1000 );
}
#if 0
void scheduler_start(struct scheduler *s, unsigned int mask) {
int k, j, nr_tasks = s->nr_tasks, *tid = s->tasks_ind;
struct task *t, *tasks = s->tasks;
struct task *store = NULL;
int count = 0;
// ticks tic;
// message("begin");
// fflush(stdout);
/* Store the mask */
s->mask = mask;
for (k = 0;k<num_checks; ++k)
check[k] = NULL;
/* Run through the tasks and set their waits. */
// tic = getticks();
for (k = nr_tasks - 1; k >= 0; k--) {
t = &tasks[tid[k]];
t->wait = 1;
t->rid = -1;
if(k==to_check) {
//message("LOOP1: task %d type=%s-%s unlock=%d wait=%d", k, taskID_names[t->type], subtaskID_names[t->subtype], t->nr_unlock_tasks, t->wait);
store = t;
}
if (!((1 << t->type) & mask) || t->skip) continue;
for (j = 0; j < t->nr_unlock_tasks; j++) {
atomic_inc(&t->unlock_tasks[j]->wait);
/* if(t->unlock_tasks[j] == store) { */
/* message("task %d type=%s-%s unlocks the pair unlock=%d wait=%d %p", k, taskID_names[t->type], subtaskID_names[t->subtype], t->nr_unlock_tasks, t->wait, t); */
/* message("Link index: %6li", t->nr_unlock_tasks == task_maxunlock + 1 ? t->unlock_tasks[task_maxunlock] - s->tasks : -1); */
/* check[count] = t; */
/* ++count; */
/* } */
/* if(t->unlock_tasks[j] == &tasks[9563] ) { */
/* message("task %d %s %s unlocking task %d %s %s\n", */
/* k, taskID_names[t->type], subtaskID_names[t->subtype], */
/* 9563, taskID_names[t->unlock_tasks[j]->type], */
/* subtaskID_names[t->unlock_tasks[j]->type]); */
/* } */
}
}
// message( "waiting tasks took %.3f ms." , (double)( getticks() - tic ) /
// CPU_TPS * 1000 );
scheduler_print_tasks(s, "tasks_start.dat");
//message("All waits set nr_tasks=%d", nr_tasks);
//fflush(stdout);
/* Don't enqueue link tasks directly. */
mask &= ~(1 << task_type_link);
s->mask = mask;
for (k = 0; k < nr_tasks; k++) {
t = &tasks[tid[k]];
/* if(k==to_check) { */
/* message("LOOP2: task %5d type=%s-%s unlock=%d wait=%d t=%p", k, taskID_names[t->type], subtaskID_names[t->subtype], t->nr_unlock_tasks, t->wait, t); */
/* fflush(stdout); */
/* } */
for (j = 0; j < t->nr_unlock_tasks; j++) {
if(t->unlock_tasks[j] == store) {
//message("task %d type=%s-%s unlocks the pair unlock=%d wait=%d %p", k, taskID_names[t->type], subtaskID_names[t->subtype], t->nr_unlock_tasks, t->wait, t);
//message("Link index: %6li", t->nr_unlock_tasks == task_maxunlock + 1 ? t->unlock_tasks[task_maxunlock] - s->tasks : -1);
check[count] = t;
++count;
}
}
}
/* Loop over the tasks and enqueue whoever is ready. */
// tic = getticks();
for (k = 0; k < s->nr_tasks; k++) {
t = &tasks[tid[k]];
/* if (((1 << t->type) & mask) && !t->skip) { */
/* if (t->wait == 0) { */
/* scheduler_enqueue(s, t); */
/* pthread_cond_broadcast(&s->sleep_cond); */
/* } else */
/* break; */
/* } */
if (atomic_dec(&t->wait) == 1 &&
((1 << t->type) & s->mask) &&
!t->skip) {
scheduler_enqueue(s, t);
pthread_cond_broadcast(&s->sleep_cond);
}
}
scheduler_dump_queue(s);
// message("Done enqueieing");fflush(stdout);
// message( "enqueueing tasks took %.3f ms." , (double)( getticks() - tic ) /
// CPU_TPS * 1000 );
}
#endif
/**
* @brief Put a task on one of the queues.
*
......@@ -1193,43 +1031,16 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
int err;
#endif
// if(t->type == task_type_pair) {
// message("Enqueuing a %s", taskID_names[t->type]);
// fflush(stdout);
// }
/* Fail if this task has already been enqueued before. */
if (t->rid >= 0) error("Task has already been enqueued.");
for (int k = 0; k < num_checks; ++k) {
if (t == check[k]) {
// message("task %5d type=%s-%s unlock=%d wait=%d %p", 0,
// taskID_names[t->type], subtaskID_names[t->subtype], t->nr_unlock_tasks,
// t->wait, t);
}
}
/* Ignore skipped tasks and tasks not in the mask. */
if (t->skip || (1 << t->type) & ~(s->mask)) {
return;
}
for (int k = 0; k < num_checks; ++k) {
if (t == check[k]) {
// message("not ignored !");
}
}
/* If this is an implicit task, just pretend it's done. */
if (t->implicit) {
for (int k = 0; k < num_checks; ++k) {
if (t == check[k]) {
// message("implicit");
}
}
for (int j = 0; j < t->nr_unlock_tasks; j++) {
struct task *t2 = t->unlock_tasks[j];
if (atomic_dec(&t2->wait) == 1) scheduler_enqueue(s, t2);
......@@ -1323,14 +1134,9 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
struct task *scheduler_done(struct scheduler *s, struct task *t) {
for (int k = 0; k < num_checks; ++k) {
if (t == check[k]) {
// message("task %5d type=%s-%s unlock=%d wait=%d %p", 0,
// taskID_names[t->type], subtaskID_names[t->subtype], t->nr_unlock_tasks,
// t->wait, t);
}
}
int k, res;
struct task *t2, *next = NULL;
struct cell *super = t->ci->super;
/* Release whatever locks this task held. */
if (!t->implicit) task_unlock(t);
......@@ -1340,22 +1146,6 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) {
for (int k = 0; k < t->nr_unlock_tasks; k++) {
struct task *t2 = t->unlock_tasks[k];
int res = atomic_dec(&t2->wait);
/* if (t->type == task_type_init) */
/* message("Done with init ! Unlocking a %s task. %d dependencies left",
*/
/* taskID_names[t2->type], res); */
/* if (t->type == task_type_pair) */
/* message("Done with pair ! Unlocking a %s task. %d dependencies left",
*/
/* taskID_names[t2->type], res); */
for (int k = 0; k < num_checks; ++k) {
if (t2 == check[k]) {
// message("Unlocking the task %p", t2);
}
}
if (res < 1) {
error("Negative wait!");
} else if (res == 1) {
......@@ -1390,6 +1180,9 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) {
struct task *scheduler_unlock(struct scheduler *s, struct task *t) {
int k, res;
struct task *t2, *next = NULL;
/* Loop through the dependencies and add them to a queue if
they are ready. */
for (int k = 0; k < t->nr_unlock_tasks; k++) {
......@@ -1548,46 +1341,6 @@ void scheduler_init(struct scheduler *s, struct space *space, int nr_queues,
s->tasks_next = 0;
}
/**
* @brief Print all the tasks in the queue of the scheduler
*
* @param s The #scheduler.
*/
void scheduler_dump_queue(struct scheduler *s) {
int i, j;
FILE *file;
char buffer[256];
struct queue *q;
struct task *t;
for (i = 0; i < s->nr_queues; ++i) {
/* Open file */
sprintf(buffer, "queue_%d.dat", i);
file = fopen(buffer, "w");
/* Get the queue */
q = &s->queues[i];
/* Some general info */
fprintf(file, "# Queue %d, size=%d, count=%d\n", i, q->size, q->count);
fprintf(file, "# Index type subtype\n");
for (j = 0; j < q->count; ++j) {
/* Get the task */
t = &q->tasks[j];
/* And print... */
fprintf(file, "%d %s %s\n", j, taskID_names[t->type],
subtaskID_names[t->subtype]);
}
/* Be nice and clean */
fclose(file);
}
}
/**
* @brief Prints the list of tasks to a file
......@@ -1595,21 +1348,21 @@ void scheduler_dump_queue(struct scheduler *s) {
* @param s The #scheduler
* @param fileName Name of the file to write to
*/
void scheduler_print_tasks(struct scheduler *s, char *fileName) {
void scheduler_print_tasks(struct scheduler *s, char *fileName) {
const int nr_tasks = s->nr_tasks, *tid = s->tasks_ind;
struct task *t, *tasks = s->tasks;
const int nr_tasks = s->nr_tasks, *tid = s->tasks_ind;
struct task *t, *tasks = s->tasks;
FILE *file = fopen(fileName, "w");
FILE *file = fopen(fileName, "w");
fprintf(file, "# Rank Name Subname unlocks waits\n");
fprintf(file, "# Rank Name Subname unlocks waits\n");
for (int k = nr_tasks - 1; k >= 0; k--) {
t = &tasks[tid[k]];
if (!((1 << t->type)) || t->skip) continue;
fprintf(file, "%d %s %s %d %d\n", k, taskID_names[t->type],
subtaskID_names[t->subtype], t->nr_unlock_tasks, t->wait);
}
for (int k = nr_tasks - 1; k >= 0; k--) {
t = &tasks[tid[k]];
if (!((1 << t->type)) || t->skip) continue;
fprintf(file, "%d %s %s %d %d\n", k, taskID_names[t->type],
subtaskID_names[t->subtype], t->nr_unlock_tasks, t->wait);
}
fclose(file);
}
fclose(file);
}
......@@ -76,7 +76,7 @@ struct scheduler {
struct task **unlocks;
int *unlock_ind;
int nr_unlocks, size_unlocks;
/* Lock for this scheduler. */
lock_type lock;
......
/*******************************************************************************
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
*
......@@ -43,6 +43,9 @@
#include "lock.h"
#include "runner.h"
/* Shared sort structure. */
struct parallel_sort space_sort_struct;
/* Split size. */
int space_splitsize = space_splitsize_default;
int space_subsize = space_subsize_default;
......@@ -271,10 +274,10 @@ void space_regrid(struct space *s, double cell_max, int verbose) {
space_rebuild_recycle(s, &s->cells[k]);
s->cells[k].sorts = NULL;
s->cells[k].nr_tasks = 0;
s->cells[k].nr_link_density = 0;
s->cells[k].nr_link_force = 0;
s->cells[k].link_density = NULL;
s->cells[k].link_force = NULL;
s->cells[k].nr_density = 0;
s->cells[k].nr_force = 0;
s->cells[k].density = NULL;
s->cells[k].force = NULL;
s->cells[k].dx_max = 0.0f;
s->cells[k].sorted = 0;
s->cells[k].count = 0;
......@@ -391,7 +394,7 @@ void space_rebuild(struct space *s, double cell_max, int verbose) {
/* Sort the parts according to their cells. */
// tic = getticks();
parts_sort(s->parts, s->xparts, ind, nr_parts, 0, s->nr_cells - 1);
space_parts_sort(s, ind, nr_parts, 0, s->nr_cells - 1);
// message( "parts_sort took %.3f ms." , (double)(getticks() - tic) / CPU_TPS
// * 1000 );
......@@ -399,7 +402,7 @@ void space_rebuild(struct space *s, double cell_max, int verbose) {
for (k = 0; k < nr_parts; k++)
if (s->parts[k].gpart != NULL) s->parts[k].gpart->part = &s->parts[k];
/* Verify sort. */
/* Verify space_sort_struct. */
/* for ( k = 1 ; k < nr_parts ; k++ ) {
if ( ind[k-1] > ind[k] ) {
error( "Sort failed!" );
......@@ -465,7 +468,11 @@ void space_rebuild(struct space *s, double cell_max, int verbose) {
/* At this point, we have the upper-level cells, old or new. Now make
sure that the parts in each cell are ok. */
// tic = getticks();
for (k = 0; k < s->nr_cells; k++) space_split(s, &cells[k]);
// for (k = 0; k < s->nr_cells; k++) space_split(s, &cells[k]);
for (k = 0; k < s->nr_cells; k++)
scheduler_addtask(&s->e->sched, task_type_split_cell, task_subtype_none,
k, 0, &cells[k], NULL, 0);
engine_launch(s->e, s->e->nr_threads, 1 << task_type_split_cell);
// message( "space_split took %.3f ms." , (double)(getticks() - tic) / CPU_TPS
// * 1000 );
......@@ -475,113 +482,131 @@ void space_rebuild(struct space *s, double cell_max, int verbose) {
* @brief Sort the particles and condensed particles according to the given
*indices.
*
* @param parts The list of #part
* @param xparts The list of reduced particles
* @param s The #space.
* @param ind The indices with respect to which the parts are sorted.
* @param N The number of parts
* @param min Lowest index.
* @param max highest index.
*/
void parts_sort(struct part *parts, struct xpart *xparts, int *ind, int N,
int min, int max) {
struct qstack {
volatile int i, j, min, max;
volatile int ready;
};
struct qstack *qstack;
unsigned int qstack_size = 2 * (max - min) + 10;
volatile unsigned int first, last, waiting;
int pivot;
int i, ii, j, jj, temp_i, qid;
struct part temp_p;
struct xpart temp_xp;
void space_parts_sort(struct space *s, int *ind, int N, int min, int max) {
// Populate the global parallel_sort structure with the input data.
space_sort_struct.parts = s->parts;
space_sort_struct.xparts = s->xparts;
space_sort_struct.ind = ind;
space_sort_struct.stack_size = 2 * (max - min + 1) + 10 + s->e->nr_threads;
if ((space_sort_struct.stack = malloc(sizeof(struct qstack) *
space_sort_struct.stack_size)) == NULL)
error("Failed to allocate sorting stack.");
for (int i = 0; i < space_sort_struct.stack_size; i++)
space_sort_struct.stack[i].ready = 0;
// Add the first interval.
space_sort_struct.stack[0].i = 0;
space_sort_struct.stack[0].j = N - 1;
space_sort_struct.stack[0].min = min;
space_sort_struct.stack[0].max = max;
space_sort_struct.stack[0].ready = 1;
space_sort_struct.first = 0;
space_sort_struct.last = 1;
space_sort_struct.waiting = 1;
// Launch the sorting tasks.
engine_launch(s->e, s->e->nr_threads, (1 << task_type_psort));
/* Verify space_sort_struct. */
/* for (int i = 1; i < N; i++)
if (ind[i - 1] > ind[i])
error("Sorting failed (ind[%i]=%i,ind[%i]=%i), min=%i, max=%i.", i - 1, ind[i - 1], i,
ind[i], min, max);
message("Sorting succeeded."); */
// Clean up.
free(space_sort_struct.stack);
}
/* for ( int k = 0 ; k < N ; k++ )
if ( ind[k] > max || ind[k] < min )
error( "ind[%i]=%i is not in [%i,%i]." , k , ind[k] , min , max ); */
void space_do_parts_sort() {
/* Allocate the stack. */
if ((qstack = malloc(sizeof(struct qstack) * qstack_size)) == NULL)
error("Failed to allocate qstack.");
/* Init the interval stack. */
qstack[0].i = 0;
qstack[0].j = N - 1;
qstack[0].min = min;
qstack[0].max = max;
qstack[0].ready = 1;
for (i = 1; i < qstack_size; i++) qstack[i].ready = 0;
first = 0;
last = 1;
waiting = 1;
/* Pointers to the sorting data. */
int *ind = space_sort_struct.ind;
struct part *parts = space_sort_struct.parts;
struct xpart *xparts = space_sort_struct.xparts;
/* Main loop. */
while (waiting > 0) {
while (space_sort_struct.waiting) {
/* Grab an interval off the queue. */
qid = (first++) % qstack_size;
int qid =
atomic_inc(&space_sort_struct.first) % space_sort_struct.stack_size;
/* Wait for the entry to be ready, or for the sorting do be done. */
while (!space_sort_struct.stack[qid].ready)
if (!space_sort_struct.waiting) return;
/* Get the stack entry. */
i = qstack[qid].i;
j = qstack[qid].j;
min = qstack[qid].min;
max = qstack[qid].max;
qstack[qid].ready = 0;
int i = space_sort_struct.stack[qid].i;
int j = space_sort_struct.stack[qid].j;
int min = space_sort_struct.stack[qid].min;
int max = space_sort_struct.stack[qid].max;
space_sort_struct.stack[qid].ready = 0;
/* Loop over sub-intervals. */
while (1) {
/* Bring beer. */
pivot = (min + max) / 2;
const int pivot = (min + max) / 2;
/* message("Working on interval [%i,%i] with min=%i, max=%i, pivot=%i.",
i, j, min, max, pivot); */
/* One pass of QuickSort's partitioning. */
ii = i;
jj = j;
int ii = i;
int jj = j;
while (ii < jj) {
while (ii <= j && ind[ii] <= pivot) ii++;
while (jj >= i && ind[jj] > pivot) jj--;
if (ii < jj) {
temp_i = ind[ii];
int temp_i = ind[ii];
ind[ii] = ind[jj];
ind[jj] = temp_i;
temp_p = parts[ii];
struct part temp_p = parts[ii];
parts[ii] = parts[jj];
parts[jj] = temp_p;
temp_xp = xparts[ii];
struct xpart temp_xp = xparts[ii];
xparts[ii] = xparts[jj];
xparts[jj] = temp_xp;
}
}
/* Verify sort. */
/* for ( int k = i ; k <= jj ; k++ )
if ( ind[k] > pivot ) {
message( "sorting failed at k=%i, ind[k]=%i, pivot=%i, i=%i, j=%i,
N=%i." , k , ind[k] , pivot , i , j , N );
error( "Partition failed (<=pivot)." );