diff --git a/src/engine.c b/src/engine.c index 91f879472d894a7ae427f8512531fcc423595260..c624b1e717a9057aaf8afda6a96f73e82af59140 100644 --- a/src/engine.c +++ b/src/engine.c @@ -171,6 +171,9 @@ void engine_redistribute(struct engine *e) { } const int cid = cell_getid(cdim, parts[k].x[0] * ih[0], parts[k].x[1] * ih[1], parts[k].x[2] * ih[2]); + /* if (cid < 0 || cid >= s->nr_cells) + error("Bad cell id %i for part %i at [%.3e,%.3e,%.3e].", + cid, k, parts[k].x[0], parts[k].x[1], parts[k].x[2]); */ dest[k] = cells[cid].nodeID; counts[nodeID * nr_nodes + dest[k]] += 1; } @@ -184,8 +187,8 @@ void engine_redistribute(struct engine *e) { /* Get the new number of parts for this node, be generous in allocating. */ int nr_parts = 0; for (int k = 0; k < nr_nodes; k++) nr_parts += counts[k * nr_nodes + nodeID]; - struct part *parts_new; - struct xpart *xparts_new, *xparts = s->xparts; + struct part *parts_new = NULL; + struct xpart *xparts_new = NULL, *xparts = s->xparts; if (posix_memalign((void **)&parts_new, part_align, sizeof(struct part) * nr_parts * 1.2) != 0 || posix_memalign((void **)&xparts_new, part_align, @@ -305,11 +308,16 @@ void engine_repartition(struct engine *e) { int nr_nodes = e->nr_nodes, nodeID = e->nodeID; float wscale = 1e-3, vscale = 1e-3, wscale_buff; idx_t wtot = 0; - const idx_t wmax = 1e9 / e->nr_nodes; - + idx_t wmax = 1e9 / e->nr_nodes; + idx_t wmin; + /* Clear the repartition flag. */ e->forcerepart = 0; + /* Nothing to do if only using a single node. Also avoids METIS + * bug that doesn't handle this case well. */ + if (nr_nodes == 1) return; + /* Allocate the inds and weights. */ if ((inds = (idx_t *)malloc(sizeof(idx_t) * 26 *nr_cells)) == NULL || (weights_v = (idx_t *)malloc(sizeof(idx_t) *nr_cells)) == NULL || @@ -485,6 +493,24 @@ void engine_repartition(struct engine *e) { /* As of here, only one node needs to compute the partition. */ if (nodeID == 0) { + /* Final rescale of all weights to avoid a large range. Large ranges have + * been seen to cause an incomplete graph. */ + wmin = wmax; + wmax = 0.0; + for (k = 0; k < 26 * nr_cells; k++) { + wmax = weights_e[k] > wmax ? weights_e[k] : wmax; + wmin = weights_e[k] < wmin ? weights_e[k] : wmin; + } + if ((wmax - wmin) > engine_maxmetisweight) { + wscale = engine_maxmetisweight / (wmax - wmin); + for (k = 0; k < 26 * nr_cells; k++) { + weights_e[k] = (weights_e[k] - wmin) * wscale + 1; + } + for (k = 0; k < nr_cells; k++) { + weights_v[k] = (weights_v[k] - wmin) * wscale + 1; + } + } + /* Check that the edge weights are fully symmetric. */ /* for ( cid = 0 ; cid < nr_cells ; cid++ ) for ( k = 0 ; k < 26 ; k++ ) { @@ -543,16 +569,49 @@ void engine_repartition(struct engine *e) { /* Call METIS. */ idx_t one = 1, idx_nr_cells = nr_cells, idx_nr_nodes = nr_nodes; idx_t objval; + + /* Dump graph in METIS format */ + /*dumpMETISGraph("metis_graph", idx_nr_cells, one, offsets, inds, + weights_v, NULL, weights_e);*/ + if (METIS_PartGraphRecursive(&idx_nr_cells, &one, offsets, inds, weights_v, NULL, weights_e, &idx_nr_nodes, NULL, NULL, options, &objval, nodeIDs) != METIS_OK) - error("Call to METIS_PartGraphKway failed."); + error("Call to METIS_PartGrapRecursive failed."); /* Dump the 3d array of cell IDs. */ /* printf( "engine_repartition: nodeIDs = reshape( [" ); for ( i = 0 ; i < cdim[0]*cdim[1]*cdim[2] ; i++ ) printf( "%i " , (int)nodeIDs[ i ] ); printf("] ,%i,%i,%i);\n",cdim[0],cdim[1],cdim[2]); */ + + + /* Check that the nodeIDs are ok. */ + for (k = 0; k < nr_cells; k++) + if (nodeIDs[k] < 0 || nodeIDs[k] >= nr_nodes) + error("Got bad nodeID %" PRIDX " for cell %i.", nodeIDs[k], k); + + /* Check that the partition is complete and all nodes have some work. */ + int present[nr_nodes]; + int failed = 0; + for (i = 0; i < nr_nodes; i++) present[i] = 0; + for (i = 0; i < nr_cells; i++) present[nodeIDs[i]]++; + for (i = 0; i < nr_nodes; i++) { + if (!present[i]) { + failed = 1; + message("Node %d is not present after repartition", i); + } + } + + /* If partition failed continue with the current one, but make this + * clear. */ + if (failed) { + message( + "WARNING: METIS repartition has failed, continuing with " + "the current partition, load balance will not be optimal"); + for (k = 0; k < nr_cells; k++) nodeIDs[k] = cells[k].nodeID; + } + } /* Broadcast the result of the partition. */ @@ -908,8 +967,8 @@ int engine_exchange_strays(struct engine *e, int offset, int *ind, int N) { message("sent out %i particles, got %i back.", N, count_in); if (offset + count_in > s->size_parts) { s->size_parts = (offset + count_in) * 1.05; - struct part *parts_new; - struct xpart *xparts_new; + struct part *parts_new = NULL; + struct xpart *xparts_new = NULL; if (posix_memalign((void **)&parts_new, part_align, sizeof(struct part) * s->size_parts) != 0 || posix_memalign((void **)&xparts_new, part_align, @@ -1249,6 +1308,9 @@ void engine_maketasks(struct engine *e) { #endif + /* Set the unlocks per task. */ + scheduler_set_unlocks(sched); + /* Rank the tasks. */ scheduler_ranktasks(sched); @@ -1772,15 +1834,6 @@ void engine_launch(struct engine *e, int nr_runners, unsigned int mask) { //fflush(stdout); } -/* void hassorted(struct cell *c) { */ - -/* if (c->sorted) error("Suprious sorted flags."); */ - -/* if (c->split) */ -/* for (int k = 0; k < 8; k++) */ -/* if (c->progeny[k] != NULL) hassorted(c->progeny[k]); */ -/* } */ - /** * @brief Initialises the particles and set them in a state ready to move *forward in time. @@ -1845,8 +1898,7 @@ void engine_init_particles(struct engine *e) { (1 << task_type_sort) | (1 << task_type_self) | (1 << task_type_pair) | (1 << task_type_sub) | (1 << task_type_init) | (1 << task_type_ghost) | - (1 << task_type_send) | (1 << task_type_recv) | - (1 << task_type_link)); + (1 << task_type_send) | (1 << task_type_recv)); TIMER_TOC(timer_runners); @@ -1954,7 +2006,7 @@ void engine_step(struct engine *e) { (1 << task_type_pair) | (1 << task_type_sub) | (1 << task_type_init) | (1 << task_type_ghost) | (1 << task_type_kick) | (1 << task_type_send) | - (1 << task_type_recv) | (1 << task_type_link)); + (1 << task_type_recv)); scheduler_print_tasks(&e->sched, "tasks_after.dat"); @@ -2097,8 +2149,8 @@ void engine_split(struct engine *e, int *grid) { message("Re-allocating parts array from %i to %i.", s->size_parts, (int)(s->nr_parts * 1.2)); s->size_parts = s->nr_parts * 1.2; - struct part *parts_new; - struct xpart *xparts_new; + struct part *parts_new = NULL; + struct xpart *xparts_new = NULL; if (posix_memalign((void **)&parts_new, part_align, sizeof(struct part) * s->size_parts) != 0 || posix_memalign((void **)&xparts_new, part_align, diff --git a/src/engine.h b/src/engine.h index 20ba4f155d30f5fd7e0f10b59e387f61e017507f..043998074094d22cb8cbe8f893b52a1a62bd8ecf 100644 --- a/src/engine.h +++ b/src/engine.h @@ -47,6 +47,8 @@ #define engine_maxproxies 64 #define engine_tasksreweight 10 +#define engine_maxmetisweight 10000.0f + /* The rank of the engine as a global variable (for messages). */ extern int engine_rank; diff --git a/src/scheduler.c b/src/scheduler.c index a7468d8abb9f3bca43e4fd29c96fdc516dc7889e..aa41983e232ad3cd0600f3b7dd4f395fd749daef 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -61,38 +61,68 @@ struct task *check[num_checks]; void scheduler_addunlock(struct scheduler *s, struct task *ta, struct task *tb) { - /* Main loop. */ - while (1) { - - /* Follow the links. */ - while (ta->nr_unlock_tasks == task_maxunlock + 1) - ta = ta->unlock_tasks[task_maxunlock]; - - /* Get the index of the next free task. */ - const int ind = atomic_inc(&ta->nr_unlock_tasks); - - /* Is there room in this task? */ - if (ind < task_maxunlock) { - ta->unlock_tasks[ind] = tb; - break; - } - - /* Otherwise, generate a link task. */ - else { + /* /\* Main loop. *\/ */ + /* while (1) { */ + + /* /\* Follow the links. *\/ */ + /* while (ta->nr_unlock_tasks == task_maxunlock + 1) */ + /* ta = ta->unlock_tasks[task_maxunlock]; */ + + /* /\* Get the index of the next free task. *\/ */ + /* const int ind = atomic_inc(&ta->nr_unlock_tasks); */ + + /* /\* Is there room in this task? *\/ */ + /* if (ind < task_maxunlock) { */ + /* ta->unlock_tasks[ind] = tb; */ + /* break; */ + /* } */ + + /* /\* Otherwise, generate a link task. *\/ */ + /* else { */ + + /* /\* Only one thread should have to do this. *\/ */ + /* if (ind == task_maxunlock) { */ + /* ta->unlock_tasks[task_maxunlock] = */ + /* scheduler_addtask(s, task_type_link, task_subtype_none, + * ta->flags, */ + /* 0, ta->ci, ta->cj, 0); */ + /* ta->unlock_tasks[task_maxunlock]->implicit = 1; */ + /* } */ + + /* /\* Otherwise, reduce the count. *\/ */ + /* else */ + /* atomic_dec(&ta->nr_unlock_tasks); */ + /* } */ + /* } */ + + /* Lock the scheduler since re-allocating the unlocks is not + thread-safe. */ + if (lock_lock(&s->lock) != 0) error("Unable to lock scheduler."); + + /* Does the buffer need to be grown? */ + if (s->nr_unlocks == s->size_unlocks) { + struct task **unlocks_new; + int *unlock_ind_new; + s->size_unlocks *= 2; + if ((unlocks_new = (struct task **)malloc( + sizeof(struct task *) *s->size_unlocks)) == NULL || + (unlock_ind_new = (int *)malloc(sizeof(int) * s->size_unlocks)) == NULL) + error("Failed to re-allocate unlocks."); + memcpy(unlocks_new, s->unlocks, sizeof(struct task *) * s->nr_unlocks); + memcpy(unlock_ind_new, s->unlock_ind, sizeof(int) * s->nr_unlocks); + free(s->unlocks); + free(s->unlock_ind); + s->unlocks = unlocks_new; + s->unlock_ind = unlock_ind_new; + } - /* Only one thread should have to do this. */ - if (ind == task_maxunlock) { - ta->unlock_tasks[task_maxunlock] = - scheduler_addtask(s, task_type_link, task_subtype_none, ta->flags, - 0, ta->ci, ta->cj, 0); - ta->unlock_tasks[task_maxunlock]->implicit = 1; - } + /* Write the unlock to the scheduler. */ + const int ind = atomic_inc(&s->nr_unlocks); + s->unlocks[ind] = tb; + s->unlock_ind[ind] = ta - s->tasks; - /* Otherwise, reduce the count. */ - else - atomic_dec(&ta->nr_unlock_tasks); - } - } + /* Release the scheduler. */ + if (lock_unlock(&s->lock) != 0) error("Unable to unlock scheduler."); } /** @@ -117,7 +147,7 @@ void scheduler_splittasks(struct scheduler *s) { {-1, -1, -1, -1, -1, -1, -1, 12}}; float sid_scale[13] = {0.1897, 0.4025, 0.1897, 0.4025, 0.5788, 0.4025, 0.1897, 0.4025, 0.1897, 0.4025, 0.5788, 0.4025, 0.5788}; - + /* Loop through the tasks... */ redo = 0; t_old = t = NULL; @@ -500,8 +530,6 @@ void scheduler_splittasks(struct scheduler *s) { /* Otherwise, if not spilt, stitch-up the sorting. */ else { - // message("called"); - /* Create the sort for ci. */ // lock_lock( &ci->lock ); if (ci->sorts == NULL) @@ -525,131 +553,131 @@ void scheduler_splittasks(struct scheduler *s) { } /* pair interaction? */ - /* /\* Gravity interaction? *\/ */ - /* else if (t->type == task_type_grav_mm) { */ - - /* /\* Get a handle on the cells involved. *\/ */ - /* ci = t->ci; */ - /* cj = t->cj; */ - - /* /\* Self-interaction? *\/ */ - /* if (cj == NULL) { */ - - /* /\* Ignore this task if the cell has no gparts. *\/ */ - /* if (ci->gcount == 0) t->type = task_type_none; */ - - /* /\* If the cell is split, recurse. *\/ */ - /* else if (ci->split) { */ - - /* /\* Make a single sub-task? *\/ */ - /* if (scheduler_dosub && ci->count < space_subsize / ci->count) { */ - - /* t->type = task_type_sub; */ - /* t->subtype = task_subtype_grav; */ - - /* } */ - - /* /\* Otherwise, just split the task. *\/ */ - /* else { */ - - /* /\* Split this task into tasks on its progeny. *\/ */ - /* t->type = task_type_none; */ - /* for (j = 0; j < 8; j++) */ - /* if (ci->progeny[j] != NULL && ci->progeny[j]->gcount > 0) { */ - /* if (t->type == task_type_none) { */ - /* t->type = task_type_grav_mm; */ - /* t->ci = ci->progeny[j]; */ - /* t->cj = NULL; */ - /* } else */ - /* t = scheduler_addtask(s, task_type_grav_mm, task_subtype_none, */ - /* 0, 0, ci->progeny[j], NULL, 0); */ - /* for (k = j + 1; k < 8; k++) */ - /* if (ci->progeny[k] != NULL && ci->progeny[k]->gcount > 0) { */ - /* if (t->type == task_type_none) { */ - /* t->type = task_type_grav_mm; */ - /* t->ci = ci->progeny[j]; */ - /* t->cj = ci->progeny[k]; */ - /* } else */ - /* t = scheduler_addtask(s, task_type_grav_mm, */ - /* task_subtype_none, 0, 0, */ - /* ci->progeny[j], ci->progeny[k], 0); */ - /* } */ - /* } */ - /* redo = (t->type != task_type_none); */ - /* } */ - - /* } */ - - /* /\* Otherwise, just make a pp task out of it. *\/ */ - /* else */ - /* t->type = task_type_grav_pp; */ - - /* } */ - - /* /\* Nope, pair. *\/ */ - /* else { */ - - /* /\* Make a sub-task? *\/ */ - /* if (scheduler_dosub && ci->count < space_subsize / cj->count) { */ - - /* t->type = task_type_sub; */ - /* t->subtype = task_subtype_grav; */ - - /* } */ - - /* /\* Otherwise, split the task. *\/ */ - /* else { */ - - /* /\* Get the opening angle theta. *\/ */ - /* float dx[3], theta; */ - /* for (k = 0; k < 3; k++) { */ - /* dx[k] = fabsf(ci->loc[k] - cj->loc[k]); */ - /* if (s->space->periodic && dx[k] > 0.5 * s->space->dim[k]) */ - /* dx[k] = -dx[k] + s->space->dim[k]; */ - /* if (dx[k] > 0.0f) dx[k] -= ci->h[k]; */ - /* } */ - /* theta = */ - /* (dx[0] * dx[0] + dx[1] * dx[1] + dx[2] * dx[2]) / */ - /* (ci->h[0] * ci->h[0] + ci->h[1] * ci->h[1] + ci->h[2] * ci->h[2]); */ - - /* /\* Ignore this task if the cell has no gparts. *\/ */ - /* if (ci->gcount == 0 || cj->gcount == 0) t->type = task_type_none; */ - - /* /\* Split the interaction? *\/ */ - /* else if (theta < const_theta_max * const_theta_max) { */ - - /* /\* Are both ci and cj split? *\/ */ - /* if (ci->split && cj->split) { */ - - /* /\* Split this task into tasks on its progeny. *\/ */ - /* t->type = task_type_none; */ - /* for (j = 0; j < 8; j++) */ - /* if (ci->progeny[j] != NULL && ci->progeny[j]->gcount > 0) { */ - /* for (k = 0; k < 8; k++) */ - /* if (cj->progeny[k] != NULL && cj->progeny[k]->gcount > 0) { */ - /* if (t->type == task_type_none) { */ - /* t->type = task_type_grav_mm; */ - /* t->ci = ci->progeny[j]; */ - /* t->cj = cj->progeny[k]; */ - /* } else */ - /* t = scheduler_addtask( */ - /* s, task_type_grav_mm, task_subtype_none, 0, 0, */ - /* ci->progeny[j], cj->progeny[k], 0); */ - /* } */ - /* } */ - /* redo = (t->type != task_type_none); */ - - /* } */ - - /* /\* Otherwise, make a pp task out of it. *\/ */ - /* else */ - /* t->type = task_type_grav_pp; */ - /* } */ - /* } */ - - /* } /\* gravity pair interaction? *\/ */ - - /* } /\* gravity interaction? *\/ */ + /* Gravity interaction? */ + else if (t->type == task_type_grav_mm) { + + /* Get a handle on the cells involved. */ + ci = t->ci; + cj = t->cj; + + /* Self-interaction? */ + if (cj == NULL) { + + /* Ignore this task if the cell has no gparts. */ + if (ci->gcount == 0) t->type = task_type_none; + + /* If the cell is split, recurse. */ + else if (ci->split) { + + /* Make a single sub-task? */ + if (scheduler_dosub && ci->count < space_subsize / ci->count) { + + t->type = task_type_sub; + t->subtype = task_subtype_grav; + + } + + /* Otherwise, just split the task. */ + else { + + /* Split this task into tasks on its progeny. */ + t->type = task_type_none; + for (j = 0; j < 8; j++) + if (ci->progeny[j] != NULL && ci->progeny[j]->gcount > 0) { + if (t->type == task_type_none) { + t->type = task_type_grav_mm; + t->ci = ci->progeny[j]; + t->cj = NULL; + } else + t = scheduler_addtask(s, task_type_grav_mm, task_subtype_none, + 0, 0, ci->progeny[j], NULL, 0); + for (k = j + 1; k < 8; k++) + if (ci->progeny[k] != NULL && ci->progeny[k]->gcount > 0) { + if (t->type == task_type_none) { + t->type = task_type_grav_mm; + t->ci = ci->progeny[j]; + t->cj = ci->progeny[k]; + } else + t = scheduler_addtask(s, task_type_grav_mm, + task_subtype_none, 0, 0, + ci->progeny[j], ci->progeny[k], 0); + } + } + redo = (t->type != task_type_none); + } + + } + + /* Otherwise, just make a pp task out of it. */ + else + t->type = task_type_grav_pp; + + } + + /* Nope, pair. */ + else { + + /* Make a sub-task? */ + if (scheduler_dosub && ci->count < space_subsize / cj->count) { + + t->type = task_type_sub; + t->subtype = task_subtype_grav; + + } + + /* Otherwise, split the task. */ + else { + + /* Get the opening angle theta. */ + float dx[3], theta; + for (k = 0; k < 3; k++) { + dx[k] = fabsf(ci->loc[k] - cj->loc[k]); + if (s->space->periodic && dx[k] > 0.5 * s->space->dim[k]) + dx[k] = -dx[k] + s->space->dim[k]; + if (dx[k] > 0.0f) dx[k] -= ci->h[k]; + } + theta = + (dx[0] * dx[0] + dx[1] * dx[1] + dx[2] * dx[2]) / + (ci->h[0] * ci->h[0] + ci->h[1] * ci->h[1] + ci->h[2] * ci->h[2]); + + /* Ignore this task if the cell has no gparts. */ + if (ci->gcount == 0 || cj->gcount == 0) t->type = task_type_none; + + /* Split the interaction? */ + else if (theta < const_theta_max * const_theta_max) { + + /* Are both ci and cj split? */ + if (ci->split && cj->split) { + + /* Split this task into tasks on its progeny. */ + t->type = task_type_none; + for (j = 0; j < 8; j++) + if (ci->progeny[j] != NULL && ci->progeny[j]->gcount > 0) { + for (k = 0; k < 8; k++) + if (cj->progeny[k] != NULL && cj->progeny[k]->gcount > 0) { + if (t->type == task_type_none) { + t->type = task_type_grav_mm; + t->ci = ci->progeny[j]; + t->cj = cj->progeny[k]; + } else + t = scheduler_addtask( + s, task_type_grav_mm, task_subtype_none, 0, 0, + ci->progeny[j], cj->progeny[k], 0); + } + } + redo = (t->type != task_type_none); + + } + + /* Otherwise, make a pp task out of it. */ + else + t->type = task_type_grav_pp; + } + } + + } /* gravity pair interaction? */ + + } /* gravity interaction? */ } /* loop over all tasks. */ } @@ -683,8 +711,6 @@ struct task *scheduler_addtask(struct scheduler *s, int type, int subtype, /* Get a pointer to the new task. */ t = &s->tasks[ind]; - //if (t->type == task_type_sort) message("sort!"); - /* Copy the data. */ t->type = type; t->subtype = subtype; @@ -713,6 +739,63 @@ struct task *scheduler_addtask(struct scheduler *s, int type, int subtype, return t; } +/** + * @brief Set the unlock pointers in each task. + * + * @param s The #scheduler. + */ + +void scheduler_set_unlocks(struct scheduler *s) { + + /* Store the counts for each task. */ + int *counts; + if ((counts = (int *)malloc(sizeof(int) * s->nr_tasks)) == NULL) + error("Failed to allocate temporary counts array."); + bzero(counts, sizeof(int) * s->nr_tasks); + for (int k = 0; k < s->nr_unlocks; k++) counts[s->unlock_ind[k]] += 1; + + /* Compute the offset for each unlock block. */ + int *offsets; + if ((offsets = (int *)malloc(sizeof(int) * (s->nr_tasks + 1))) == NULL) + error("Failed to allocate temporary offsets array."); + offsets[0] = 0; + for (int k = 0; k < s->nr_tasks; k++) offsets[k + 1] = offsets[k] + counts[k]; + + /* Create and fill a temporary array with the sorted unlocks. */ + struct task **unlocks; + if ((unlocks = (struct task **)malloc(sizeof(struct task *) * + s->size_unlocks)) == NULL) + error("Failed to allocate temporary unlocks array."); + for (int k = 0; k < s->nr_unlocks; k++) { + const int ind = s->unlock_ind[k]; + unlocks[offsets[ind]] = s->unlocks[k]; + offsets[ind] += 1; + } + + /* Swap the unlocks. */ + free(s->unlocks); + s->unlocks = unlocks; + + /* Re-set the offsets. */ + offsets[0] = 0; + for (int k = 1; k < s->nr_tasks; k++) + offsets[k] = offsets[k - 1] + counts[k - 1]; + for (int k = 0; k < s->nr_tasks; k++) + for (int j = offsets[k]; j < offsets[k + 1]; j++) s->unlock_ind[j] = k; + + /* Set the unlocks in the tasks. */ + for (int k = 0; k < s->nr_tasks; k++) { + struct task *t = &s->tasks[k]; + t->nr_unlock_tasks = counts[k]; + t->unlock_tasks = &s->unlocks[offsets[k]]; + for (int j = offsets[k]; j < offsets[k + 1]; j++) s->unlock_ind[j] = k; + } + + /* Clean up. */ + free(counts); + free(offsets); +} + /** * @brief Sort the tasks in topological order over all queues. * @@ -802,6 +885,7 @@ void scheduler_reset(struct scheduler *s, int size) { s->tasks_next = 0; s->waiting = 0; s->mask = 0; + s->nr_unlocks = 0; /* Set the task pointers in the queues. */ for (k = 0; k < s->nr_queues; k++) s->queues[k].tasks = s->tasks; @@ -905,7 +989,72 @@ void scheduler_reweight(struct scheduler *s) { * @param s The #scheduler. * @param mask The task types to enqueue. */ +void scheduler_start(struct scheduler *s, unsigned int mask) { + + int nr_tasks = s->nr_tasks, *tid = s->tasks_ind; + struct task *t, *tasks = s->tasks; + + /* Store the mask */ + s->mask = mask | (1 << task_type_rewait); + + /* Clear all the waits and rids. */ + // ticks tic = getticks(); + for (int k = 0; k < s->nr_tasks; k++) { + s->tasks[k].wait = 1; + s->tasks[k].rid = -1; + } + + /* Enqueue a set of extraenous tasks to set the task waits. */ + struct task *rewait_tasks = &s->tasks[s->nr_tasks]; + const int num_rewait_tasks = s->nr_queues > s->size - s->nr_tasks + ? s->size - s->nr_tasks + : s->nr_queues; + const int waiting_old = + s->waiting; // Remember that engine_launch may fiddle with this value. + for (int k = 0; k < num_rewait_tasks; k++) { + rewait_tasks[k].type = task_type_rewait; + rewait_tasks[k].ci = (struct cell *)&s->tasks[k * nr_tasks / s->nr_queues]; + rewait_tasks[k].cj = + (struct cell *)&s->tasks[(k + 1) * nr_tasks / s->nr_queues]; + rewait_tasks[k].skip = 0; + rewait_tasks[k].wait = 0; + rewait_tasks[k].rid = -1; + rewait_tasks[k].weight = 1; + rewait_tasks[k].implicit = 0; + rewait_tasks[k].nr_unlock_tasks = 0; + scheduler_enqueue(s, &rewait_tasks[k]); + pthread_cond_broadcast(&s->sleep_cond); + } + + /* Wait for the rewait tasks to have executed. */ + pthread_mutex_lock(&s->sleep_mutex); + while (s->waiting > waiting_old) { + pthread_cond_wait(&s->sleep_cond, &s->sleep_mutex); + } + pthread_mutex_unlock(&s->sleep_mutex); + /* message("waiting tasks took %.3f ms.", + (double)(getticks() - tic) / CPU_TPS * 1000); */ + + /* Loop over the tasks and enqueue whoever is ready. */ + // tic = getticks(); + for (int k = 0; k < s->nr_tasks; k++) { + t = &tasks[tid[k]]; + if (atomic_dec(&t->wait) == 1 && ((1 << t->type) & s->mask) && !t->skip) { + + scheduler_enqueue(s, t); + pthread_cond_broadcast(&s->sleep_cond); + } + } + + // message( "enqueueing tasks took %.3f ms." , (double)( getticks() - tic ) / + // CPU_TPS * 1000 ); +} + + + + +#if 0 void scheduler_start(struct scheduler *s, unsigned int mask) { int k, j, nr_tasks = s->nr_tasks, *tid = s->tasks_ind; @@ -1024,6 +1173,12 @@ void scheduler_start(struct scheduler *s, unsigned int mask) { // CPU_TPS * 1000 ); } +#endif + + + + + /** * @brief Put a task on one of the queues. * @@ -1046,41 +1201,35 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { /* Fail if this task has already been enqueued before. */ if (t->rid >= 0) error("Task has already been enqueued."); - - for(int k=0; k<num_checks; ++k) { - - if(t == check[k]) { - //message("task %5d type=%s-%s unlock=%d wait=%d %p", 0, taskID_names[t->type], subtaskID_names[t->subtype], t->nr_unlock_tasks, t->wait, t); - + for (int k = 0; k < num_checks; ++k) { + if (t == check[k]) { + // message("task %5d type=%s-%s unlock=%d wait=%d %p", 0, + // taskID_names[t->type], subtaskID_names[t->subtype], t->nr_unlock_tasks, + // t->wait, t); } - } - + /* Ignore skipped tasks and tasks not in the mask. */ - if (t->skip || ((1 << t->type) & ~(s->mask) && t->type != task_type_link)) { + if (t->skip || (1 << t->type) & ~(s->mask)) { return; } + for (int k = 0; k < num_checks; ++k) { - for(int k=0; k<num_checks; ++k) { - - if(t == check[k]) { - //message("not ignored !"); - + if (t == check[k]) { + // message("not ignored !"); } - } - /* If this is an implicit task, just pretend it's done. */ if (t->implicit) { - for(int k=0; k<num_checks; ++k) { - if(t == check[k]) { - //message("implicit"); + for (int k = 0; k < num_checks; ++k) { + if (t == check[k]) { + // message("implicit"); + } } - } for (int j = 0; j < t->nr_unlock_tasks; j++) { struct task *t2 = t->unlock_tasks[j]; if (atomic_dec(&t2->wait) == 1) scheduler_enqueue(s, t2); @@ -1174,18 +1323,15 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) { struct task *scheduler_done(struct scheduler *s, struct task *t) { + for (int k = 0; k < num_checks; ++k) { - for(int k=0; k<num_checks; ++k) { - - if(t == check[k]) { - //message("task %5d type=%s-%s unlock=%d wait=%d %p", 0, taskID_names[t->type], subtaskID_names[t->subtype], t->nr_unlock_tasks, t->wait, t); - - + if (t == check[k]) { + // message("task %5d type=%s-%s unlock=%d wait=%d %p", 0, + // taskID_names[t->type], subtaskID_names[t->subtype], t->nr_unlock_tasks, + // t->wait, t); } - } - /* Release whatever locks this task held. */ if (!t->implicit) task_unlock(t); @@ -1195,21 +1341,21 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) { struct task *t2 = t->unlock_tasks[k]; int res = atomic_dec(&t2->wait); /* if (t->type == task_type_init) */ - /* message("Done with init ! Unlocking a %s task. %d dependencies left", */ + /* message("Done with init ! Unlocking a %s task. %d dependencies left", + */ /* taskID_names[t2->type], res); */ /* if (t->type == task_type_pair) */ - /* message("Done with pair ! Unlocking a %s task. %d dependencies left", */ + /* message("Done with pair ! Unlocking a %s task. %d dependencies left", + */ /* taskID_names[t2->type], res); */ - for(int k=0; k<num_checks; ++k) { + for (int k = 0; k < num_checks; ++k) { - if(t2 == check[k]) { - //message("Unlocking the task %p", t2); + if (t2 == check[k]) { + // message("Unlocking the task %p", t2); } - } - - + if (res < 1) { error("Negative wait!"); } else if (res == 1) { @@ -1217,7 +1363,7 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) { } } - /* Task definitely done. */ + /* Task definitely done, signal any sleeping runners. */ if (!t->implicit) { t->toc = getticks(); pthread_mutex_lock(&s->sleep_mutex); @@ -1378,6 +1524,15 @@ void scheduler_init(struct scheduler *s, struct space *space, int nr_queues, pthread_mutex_init(&s->sleep_mutex, NULL) != 0) error("Failed to initialize sleep barrier."); + /* Init the unlocks. */ + if ((s->unlocks = (struct task **)malloc( + sizeof(struct task *) *scheduler_init_nr_unlocks)) == NULL || + (s->unlock_ind = + (int *)malloc(sizeof(int) * scheduler_init_nr_unlocks)) == NULL) + error("Failed to allocate unlocks."); + s->nr_unlocks = 0; + s->size_unlocks = scheduler_init_nr_unlocks; + /* Set the scheduler variables. */ s->nr_queues = nr_queues; s->flags = flags; diff --git a/src/scheduler.h b/src/scheduler.h index e78ca7c8dea8997c89b4a8cd140ba5ac0b810055..79ff2738eb56575a9b02b274ad36b53d71035c89 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -32,7 +32,7 @@ /* Some constants. */ #define scheduler_maxwait 3 -#define scheduler_maxunlock 40 +#define scheduler_init_nr_unlocks 10000 #define scheduler_dosub 1 #define scheduler_maxsteal 10 #define scheduler_maxtries 2 @@ -72,6 +72,11 @@ struct scheduler { /* The task indices. */ int *tasks_ind; + /* The task unlocks. */ + struct task **unlocks; + int *unlock_ind; + int nr_unlocks, size_unlocks; + /* Lock for this scheduler. */ lock_type lock; @@ -103,6 +108,7 @@ void scheduler_splittasks(struct scheduler *s); struct task *scheduler_done(struct scheduler *s, struct task *t); struct task *scheduler_unlock(struct scheduler *s, struct task *t); void scheduler_addunlock(struct scheduler *s, struct task *ta, struct task *tb); +void scheduler_set_unlocks(struct scheduler *s); void scheduler_dump_queue(struct scheduler *s); void scheduler_print_tasks(struct scheduler *s, char *fileName); diff --git a/src/task.c b/src/task.c index 8a9ef80886f8f0624215bef6347741c14d84a595..a911e3c1ffd3d4164dc0b7a57030d459fda0359b 100644 --- a/src/task.c +++ b/src/task.c @@ -45,8 +45,9 @@ /* Task type names. */ const char *taskID_names[task_type_count] = { "none", "sort", "self", "pair", "sub", "init", - "ghost", "drift", "kick", "send", "recv", "link", - "grav_pp", "grav_mm", "grav_up", "grav_down"}; + "ghost", "drift", "kick", "send", "recv", + "grav_pp", "grav_mm", "grav_up", "grav_down", + "psort", "split_cell", "rewait"}; const char *subtaskID_names[task_type_count] = {"none", "density", "force", "grav"}; diff --git a/src/task.h b/src/task.h index b8a5c7cce9455e3c68a3bb99ba57805e00b10de8..fbeed3a7a47ee6e2e180ae48dd8e0eb1029fb868 100644 --- a/src/task.h +++ b/src/task.h @@ -26,7 +26,7 @@ /* Some constants. */ #define task_maxwait 3 -#define task_maxunlock 100 +#define task_maxunlock 15 /* The different task types. */ enum task_types { @@ -41,11 +41,13 @@ enum task_types { task_type_kick, task_type_send, task_type_recv, - task_type_link, task_type_grav_pp, task_type_grav_mm, task_type_grav_up, task_type_grav_down, + task_type_psort, + task_type_split_cell, + task_type_rewait, task_type_count }; @@ -82,7 +84,7 @@ struct task { ticks tic, toc; int nr_unlock_tasks; - struct task *unlock_tasks[task_maxunlock + 1]; + struct task **unlock_tasks; }; /* Function prototypes. */