Skip to content
Snippets Groups Projects
Commit c20c6c25 authored by Pedro Gonnet's avatar Pedro Gonnet
Browse files

use the threadpool only if there are enough tasks.

parent 141a31bd
No related branches found
No related tags found
1 merge request!282Scheduler activate root
......@@ -1020,15 +1020,44 @@ void scheduler_rewait_mapper(void *map_data, int num_elements,
}
}
void scheduler_rewait_active_mapper(void *map_data, int num_elements,
void *extra_data) {
struct scheduler *s = (struct scheduler *)extra_data;
const int *tid = (int *)map_data;
const unsigned int mask = s->mask;
const unsigned int submask = s->submask;
for (int ind = 0; ind < num_elements; ind++) {
struct task *t = &s->tasks[tid[ind]];
if (t->skip || !((1 << t->type) & mask) || !((1 << t->subtype) & submask))
continue;
/* Skip sort tasks that have already been performed */
if (t->type == task_type_sort && t->flags == 0) {
error("Empty sort task encountered.");
}
/* Sets the waits of the dependances */
for (int k = 0; k < t->nr_unlock_tasks; k++) {
struct task *u = t->unlock_tasks[k];
atomic_inc(&u->wait);
}
}
}
void scheduler_enqueue_mapper(void *map_data, int num_elements,
void *extra_data) {
struct scheduler *s = (struct scheduler *)extra_data;
const int *tid = (int *)map_data;
struct task *tasks = s->tasks;
const unsigned int mask = s->mask;
const unsigned int submask = s->submask;
for (int ind = 0; ind < num_elements; ind++) {
struct task *t = &tasks[tid[ind]];
if (atomic_dec(&t->wait) == 1 && !t->skip && ((1 << t->type) & s->mask) &&
((1 << t->subtype) & s->submask)) {
if (atomic_dec(&t->wait) == 1 && !t->skip && ((1 << t->type) & mask) &&
((1 << t->subtype) & submask)) {
scheduler_enqueue(s, t);
}
}
......@@ -1045,13 +1074,13 @@ void scheduler_enqueue_mapper(void *map_data, int num_elements,
void scheduler_start(struct scheduler *s, unsigned int mask,
unsigned int submask) {
const ticks tic = getticks();
/* Store the masks */
s->mask = mask;
s->submask = submask |= (1 << task_subtype_none);
/* Clear all the waits and times. */
message("nr_tasks=%i, active_count=%i.", s->nr_tasks, s->active_count);
for (int k = 0; k < s->active_count; k++) {
struct task *t = &s->tasks[s->tid_active[k]];
t->wait = 1;
......@@ -1060,10 +1089,18 @@ void scheduler_start(struct scheduler *s, unsigned int mask,
if (((1 << t->type) & mask) == 0 || ((1 << t->subtype) & submask) == 0)
t->skip = 1;
}
/* message("clear took %.3f %s.", clocks_from_ticks(getticks() - tic),
clocks_getunit()); */
/* Re-wait the tasks. */
threadpool_map(s->threadpool, scheduler_rewait_mapper, s->tasks, s->nr_tasks,
sizeof(struct task), 1000, s);
if (s->active_count > 1000) {
threadpool_map(s->threadpool, scheduler_rewait_active_mapper, s->tid_active,
s->active_count, sizeof(int), 1000, s);
} else {
scheduler_rewait_active_mapper(s->tid_active, s->active_count, s);
}
/* message("rewait took %.3f %s.", clocks_from_ticks(getticks() - tic),
clocks_getunit()); */
/* Check we have not missed an active task */
#ifdef SWIFT_DEBUG_CHECKS
......@@ -1108,13 +1145,11 @@ void scheduler_start(struct scheduler *s, unsigned int mask,
#endif
/* Loop over the tasks and enqueue whoever is ready. */
for (int k = 0; k < s->active_count; k++) {
struct task *t = &s->tasks[s->tid_active[k]];
if (atomic_dec(&t->wait) == 1 && !t->skip && ((1 << t->type) & s->mask) &&
((1 << t->subtype) & s->submask)) {
scheduler_enqueue(s, t);
pthread_cond_signal(&s->sleep_cond);
}
if (s->active_count > 1000) {
threadpool_map(s->threadpool, scheduler_enqueue_mapper, s->tid_active,
s->active_count, sizeof(int), 1000, s);
} else {
scheduler_enqueue_mapper(s->tid_active, s->active_count, s);
}
/* Clear the list of active tasks. */
......@@ -1124,6 +1159,9 @@ void scheduler_start(struct scheduler *s, unsigned int mask,
pthread_mutex_lock(&s->sleep_mutex);
pthread_cond_broadcast(&s->sleep_cond);
pthread_mutex_unlock(&s->sleep_mutex);
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
clocks_getunit());
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment