diff --git a/src/scheduler.c b/src/scheduler.c index 5c983a43d074a2f3e948fee8b16a9a7f96754b9a..d577721321a4eb98cdeae732fb9af93e4c78f358 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -1020,15 +1020,44 @@ void scheduler_rewait_mapper(void *map_data, int num_elements, } } +void scheduler_rewait_active_mapper(void *map_data, int num_elements, + void *extra_data) { + + struct scheduler *s = (struct scheduler *)extra_data; + const int *tid = (int *)map_data; + const unsigned int mask = s->mask; + const unsigned int submask = s->submask; + + for (int ind = 0; ind < num_elements; ind++) { + struct task *t = &s->tasks[tid[ind]]; + + if (t->skip || !((1 << t->type) & mask) || !((1 << t->subtype) & submask)) + continue; + + /* Skip sort tasks that have already been performed */ + if (t->type == task_type_sort && t->flags == 0) { + error("Empty sort task encountered."); + } + + /* Sets the waits of the dependances */ + for (int k = 0; k < t->nr_unlock_tasks; k++) { + struct task *u = t->unlock_tasks[k]; + atomic_inc(&u->wait); + } + } +} + void scheduler_enqueue_mapper(void *map_data, int num_elements, void *extra_data) { struct scheduler *s = (struct scheduler *)extra_data; const int *tid = (int *)map_data; struct task *tasks = s->tasks; + const unsigned int mask = s->mask; + const unsigned int submask = s->submask; for (int ind = 0; ind < num_elements; ind++) { struct task *t = &tasks[tid[ind]]; - if (atomic_dec(&t->wait) == 1 && !t->skip && ((1 << t->type) & s->mask) && - ((1 << t->subtype) & s->submask)) { + if (atomic_dec(&t->wait) == 1 && !t->skip && ((1 << t->type) & mask) && + ((1 << t->subtype) & submask)) { scheduler_enqueue(s, t); } } @@ -1045,13 +1074,13 @@ void scheduler_enqueue_mapper(void *map_data, int num_elements, void scheduler_start(struct scheduler *s, unsigned int mask, unsigned int submask) { + const ticks tic = getticks(); /* Store the masks */ s->mask = mask; s->submask = submask |= (1 << task_subtype_none); /* Clear all the waits and times. */ - message("nr_tasks=%i, active_count=%i.", s->nr_tasks, s->active_count); for (int k = 0; k < s->active_count; k++) { struct task *t = &s->tasks[s->tid_active[k]]; t->wait = 1; @@ -1060,10 +1089,18 @@ void scheduler_start(struct scheduler *s, unsigned int mask, if (((1 << t->type) & mask) == 0 || ((1 << t->subtype) & submask) == 0) t->skip = 1; } + /* message("clear took %.3f %s.", clocks_from_ticks(getticks() - tic), + clocks_getunit()); */ /* Re-wait the tasks. */ - threadpool_map(s->threadpool, scheduler_rewait_mapper, s->tasks, s->nr_tasks, - sizeof(struct task), 1000, s); + if (s->active_count > 1000) { + threadpool_map(s->threadpool, scheduler_rewait_active_mapper, s->tid_active, + s->active_count, sizeof(int), 1000, s); + } else { + scheduler_rewait_active_mapper(s->tid_active, s->active_count, s); + } + /* message("rewait took %.3f %s.", clocks_from_ticks(getticks() - tic), + clocks_getunit()); */ /* Check we have not missed an active task */ #ifdef SWIFT_DEBUG_CHECKS @@ -1108,13 +1145,11 @@ void scheduler_start(struct scheduler *s, unsigned int mask, #endif /* Loop over the tasks and enqueue whoever is ready. */ - for (int k = 0; k < s->active_count; k++) { - struct task *t = &s->tasks[s->tid_active[k]]; - if (atomic_dec(&t->wait) == 1 && !t->skip && ((1 << t->type) & s->mask) && - ((1 << t->subtype) & s->submask)) { - scheduler_enqueue(s, t); - pthread_cond_signal(&s->sleep_cond); - } + if (s->active_count > 1000) { + threadpool_map(s->threadpool, scheduler_enqueue_mapper, s->tid_active, + s->active_count, sizeof(int), 1000, s); + } else { + scheduler_enqueue_mapper(s->tid_active, s->active_count, s); } /* Clear the list of active tasks. */ @@ -1124,6 +1159,9 @@ void scheduler_start(struct scheduler *s, unsigned int mask, pthread_mutex_lock(&s->sleep_mutex); pthread_cond_broadcast(&s->sleep_cond); pthread_mutex_unlock(&s->sleep_mutex); + + message("took %.3f %s.", clocks_from_ticks(getticks() - tic), + clocks_getunit()); } /**