diff --git a/src/engine.c b/src/engine.c index 1f701d7b7e5699a7dd377a75ec914df54db9747e..71153800c3892e46bc7b86a408b7dbb5bf6afa89 100644 --- a/src/engine.c +++ b/src/engine.c @@ -3352,6 +3352,14 @@ void engine_skip_force_and_kick(struct engine *e) { t->subtype == task_subtype_rho || t->subtype == task_subtype_gpart || t->subtype == task_subtype_sf_counts) t->skip = 1; + +#ifdef WITH_MPI + /* Skip testsome tasks for subtypes that are not xv. */ + else if (t->type == task_type_recv && t->subtype == task_subtype_testsome + && t->flags != task_subtype_xv) + t->skip = 1; +#endif + } /* Run through the cells and clear some flags. */ @@ -4181,7 +4189,7 @@ void engine_unskip(struct engine *e) { } #ifdef WITH_MPI - e->sched.nr_requests = 0; + for (int k = 0; k < task_subtype_count; k++) e->sched.nr_requests[k] = 0; #endif /* Activate all the regular tasks */ diff --git a/src/engine_maketasks.c b/src/engine_maketasks.c index edfb98d0af8a9ad48a0a8293395e3d29c0066a46..898d64e90f3bdf12653e08c7398fd9995946e3f1 100644 --- a/src/engine_maketasks.c +++ b/src/engine_maketasks.c @@ -444,21 +444,21 @@ void engine_addtasks_recv_hydro(struct engine *e, struct cell *c, /* Create the tasks. */ t_xv = scheduler_addtask(s, task_type_recv, task_subtype_xv, c->mpi.tag, 0, c, NULL); - scheduler_addunlock(s, s->testsome, t_xv); + scheduler_addunlock(s, s->testsome[task_subtype_xv], t_xv); t_rho = scheduler_addtask(s, task_type_recv, task_subtype_rho, c->mpi.tag, 0, c, NULL); - scheduler_addunlock(s, s->testsome, t_rho); + scheduler_addunlock(s, s->testsome[task_subtype_rho], t_rho); #ifdef EXTRA_HYDRO_LOOP t_gradient = scheduler_addtask(s, task_type_recv, task_subtype_gradient, c->mpi.tag, 0, c, NULL); - scheduler_addunlock(s, s->testsome, t_gradient); + scheduler_addunlock(s, s->testsome[task_subtype_gradient], t_gradient); #endif t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend_part, c->mpi.tag, 0, c, NULL); - scheduler_addunlock(s, s->testsome, t_ti); + scheduler_addunlock(s, s->testsome[task_subtype_tend_part], t_ti); } if (t_xv != NULL) { @@ -549,7 +549,7 @@ void engine_addtasks_recv_stars(struct engine *e, struct cell *c, t_sf_counts = scheduler_addtask(s, task_type_recv, task_subtype_sf_counts, c->mpi.tag, 0, c, NULL); /* Add testsome dependencies. */ - scheduler_addunlock(s, s->testsome, t_sf_counts); + scheduler_addunlock(s, s->testsome[task_subtype_sf_counts], t_sf_counts); } /* Have we reached a level where there are any stars tasks ? */ @@ -563,11 +563,11 @@ void engine_addtasks_recv_stars(struct engine *e, struct cell *c, /* Create the tasks. */ t_feedback = scheduler_addtask(s, task_type_recv, task_subtype_spart, c->mpi.tag, 0, c, NULL); - scheduler_addunlock(s, s->testsome, t_feedback); + scheduler_addunlock(s, s->testsome[task_subtype_spart], t_feedback); t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend_spart, c->mpi.tag, 0, c, NULL); - scheduler_addunlock(s, s->testsome, t_ti); + scheduler_addunlock(s, s->testsome[task_subtype_tend_spart], t_ti); if (with_star_formation && c->hydro.count > 0) { @@ -644,23 +644,23 @@ void engine_addtasks_recv_black_holes(struct engine *e, struct cell *c, /* Create the tasks. */ t_rho = scheduler_addtask(s, task_type_recv, task_subtype_bpart_rho, c->mpi.tag, 0, c, NULL); - scheduler_addunlock(s, s->testsome, t_rho); + scheduler_addunlock(s, s->testsome[task_subtype_bpart_rho], t_rho); t_bh_merger = scheduler_addtask( s, task_type_recv, task_subtype_bpart_merger, c->mpi.tag, 0, c, NULL); - scheduler_addunlock(s, s->testsome, t_bh_merger); + scheduler_addunlock(s, s->testsome[task_subtype_bpart_merger], t_bh_merger); t_gas_swallow = scheduler_addtask( s, task_type_recv, task_subtype_part_swallow, c->mpi.tag, 0, c, NULL); - scheduler_addunlock(s, s->testsome, t_gas_swallow); + scheduler_addunlock(s, s->testsome[task_subtype_part_swallow], t_gas_swallow); t_feedback = scheduler_addtask( s, task_type_recv, task_subtype_bpart_feedback, c->mpi.tag, 0, c, NULL); - scheduler_addunlock(s, s->testsome, t_feedback); + scheduler_addunlock(s, s->testsome[task_subtype_bpart_feedback], t_feedback); t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend_bpart, c->mpi.tag, 0, c, NULL); - scheduler_addunlock(s, s->testsome, t_ti); + scheduler_addunlock(s, s->testsome[task_subtype_tend_bpart], t_ti); } if (t_rho != NULL) { @@ -739,11 +739,11 @@ void engine_addtasks_recv_gravity(struct engine *e, struct cell *c, /* Create the tasks. */ t_grav = scheduler_addtask(s, task_type_recv, task_subtype_gpart, c->mpi.tag, 0, c, NULL); - scheduler_addunlock(s, s->testsome, t_grav); + scheduler_addunlock(s, s->testsome[task_subtype_gpart], t_grav); t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend_gpart, c->mpi.tag, 0, c, NULL); - scheduler_addunlock(s, s->testsome, t_ti); + scheduler_addunlock(s, s->testsome[task_subtype_tend_gpart], t_ti); } /* If we have tasks, link them. */ @@ -3189,9 +3189,12 @@ void engine_maketasks(struct engine *e) { } } - /* Create the testsome task. */ - sched->testsome = scheduler_addtask( - sched, task_type_recv, task_subtype_testsome, 0, 0, NULL, NULL); + /* Create the testsome tasks, one per subtype. */ + for (int k = 0; k < task_subtype_count; k++) { + sched->testsome[k] = + scheduler_addtask(sched, task_type_recv, task_subtype_testsome, k, + 0, NULL, NULL); + } threadpool_map(&e->threadpool, engine_addtasks_recv_mapper, recv_cell_type_pairs, num_recv_cells, diff --git a/src/engine_marktasks.c b/src/engine_marktasks.c index 05ba3cde4f8d675005dd7254e442daa611911fd3..fe9e8890477367fee8da0e911995fa5f0508194d 100644 --- a/src/engine_marktasks.c +++ b/src/engine_marktasks.c @@ -962,7 +962,7 @@ int engine_marktasks(struct engine *e) { #ifdef WITH_MPI /* Reset number of requests, these are regenerated. */ - s->nr_requests = 0; + for (int k = 0; k < task_subtype_count; k++) s->nr_requests[k] = 0; #endif /* Run through the tasks and mark as skip or not. */ diff --git a/src/queue.c b/src/queue.c index 703d2dd490f16c17f8446cb07e286c8046d0a154..d22bc7e15ba91dc66a0a8506efbe877452604f75 100644 --- a/src/queue.c +++ b/src/queue.c @@ -324,8 +324,9 @@ void queue_dump(int nodeID, int index, FILE *file, struct queue *q) { for (int k = 0; k < q->count; k++) { struct task *t = &q->tasks[q->tid[k]]; - fprintf(file, "%d %d %d %s %s %d\n", nodeID, index, k, - taskID_names[t->type], subtaskID_names[t->subtype], t->wait); + fprintf(file, "%d %d %d %s %s %.2f %lld\n", nodeID, index, k, + taskID_names[t->type], subtaskID_names[t->subtype], t->weight, + t->flags); } /* Release the task lock. */ diff --git a/src/scheduler.c b/src/scheduler.c index 63cd687401eadb706ff619eb9ae365d61deae7b4..0e83370e3d6fc8a369a8c329a2abe5ba4ca1a6ae 100644 --- a/src/scheduler.c +++ b/src/scheduler.c @@ -1287,7 +1287,7 @@ void scheduler_reset(struct scheduler *s, int size) { s->size = size; s->nr_tasks = 0; #ifdef WITH_MPI - s->nr_size_requests = 0; + for (int k = 0; k < task_subtype_count; k++) s->nr_size_requests[k] = 0; #endif s->tasks_next = 0; s->waiting = 0; @@ -1585,7 +1585,7 @@ void scheduler_enqueue_mapper(void *map_data, int num_elements, } else if (!t->skip) { /* Recv tasks are not enqueued until marked as ready by the testsome * tasks, but we do need to start the MPI recv for them for that to be - * possible, so do for tasks only being held by testsome. */ + * possible, so do for tasks only being held by a testsome. */ if (t->wait == 1 && t->type == task_type_recv && t->subtype != task_subtype_testsome) { scheduler_start_recv(s, t); @@ -1604,8 +1604,10 @@ void scheduler_enqueue_mapper(void *map_data, int num_elements, void scheduler_start(struct scheduler *s) { #ifdef WITH_MPI - s->nr_recv_tasks = 0; - s->nr_requests = 0; + for (int k = 0; k < task_subtype_count; k++) { + s->nr_size_requests[k] = 0; + s->nr_recv_tasks[k] = 0; + } #endif /* Reset all task timers. */ @@ -1622,21 +1624,22 @@ void scheduler_start(struct scheduler *s) { s->tasks[i].recv_started = 0; if (!s->tasks[i].skip && s->tasks[i].type == task_type_recv && s->tasks[i].subtype != task_subtype_testsome) - s->nr_recv_tasks++; + s->nr_recv_tasks[s->tasks[i].subtype]++; #endif } #ifdef WITH_MPI /* Initialise the requests storage. */ - if (s->nr_size_requests < s->nr_recv_tasks) { - swift_free("requests", s->requests); - swift_free("ind_requests", s->tasks_requests); - s->requests = (MPI_Request *)swift_malloc( - "requests", sizeof(MPI_Request) * s->nr_recv_tasks); - s->tasks_requests = (struct task **)swift_malloc( - "tasks_requests", sizeof(struct task *) * s->nr_recv_tasks); - s->nr_size_requests = s->nr_recv_tasks; + for (int i = 0; i < task_subtype_count; i++) { + if (s->nr_size_requests[i] < s->nr_recv_tasks[i]) { + swift_free("requests", s->requests[i]); + swift_free("ind_requests", s->tasks_requests[i]); + s->requests[i] = (MPI_Request *)swift_malloc("requests", sizeof(MPI_Request) * s->nr_recv_tasks[i]); + s->tasks_requests[i] = (struct task **)swift_malloc( + "tasks_requests", sizeof(struct task *) * s->nr_recv_tasks[i]); + s->nr_size_requests[i] = s->nr_recv_tasks[i]; + } } #endif @@ -1958,7 +1961,7 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) { #ifdef WITH_MPI /* Keep against changes between now and use. */ - int nr_recv_tasks = s->nr_recv_tasks; + int nr_recv_tasks = s->nr_recv_tasks[t->flags]; #endif /* Loop through the dependencies and add them to a queue if @@ -2028,6 +2031,7 @@ struct task *scheduler_done(struct scheduler *s, struct task *t) { scheduler_enqueue(s, t); } + /* Now remove the old waiting count. */ pthread_mutex_lock(&s->sleep_mutex); atomic_dec(&s->waiting); @@ -2177,7 +2181,7 @@ void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks, /* Init the locks. */ lock_init(&s->lock); #ifdef WITH_MPI - lock_init(&s->lock_requests); + for (int k = 0; k < task_subtype_count; k++) lock_init(&s->lock_requests[k]); #endif /* Globally accessible pointer. XXX yes we did this... */ @@ -2422,9 +2426,9 @@ void scheduler_start_recv(struct scheduler *s, struct task *t) { } if (t->subtype != task_subtype_testsome) { - // XXX debugging. if (t->req != MPI_REQUEST_NULL) error("MPI request is not MPI_REQUEST_NULL"); + int err = MPI_Irecv(buff, count, type, t->ci->nodeID, t->flags, subtaskMPI_comms[t->subtype], &t->req); @@ -2434,14 +2438,16 @@ void scheduler_start_recv(struct scheduler *s, struct task *t) { /* Record request and associated task. Need to lock this down so we don't * have an invalid extra request for a while. */ - if (lock_lock(&s->lock_requests) != 0) error("Failed to lock requests"); + if (lock_lock(&s->lock_requests[t->subtype]) != 0) + error("Failed to lock requests"); - int ind = s->nr_requests; - s->nr_requests++; - s->requests[ind] = t->req; - s->tasks_requests[ind] = t; + int ind = s->nr_requests[t->subtype]; + s->nr_requests[t->subtype]++; + s->requests[t->subtype][ind] = t->req; + s->tasks_requests[t->subtype][ind] = t; - if (lock_unlock(&s->lock_requests) != 0) error("Failed to unlock requests"); + if (lock_unlock(&s->lock_requests[t->subtype]) != 0) + error("Failed to unlock requests"); } #endif @@ -2465,7 +2471,7 @@ void scheduler_dump_queues(struct engine *e) { FILE *file_thread; if (engine_rank == 0) { file_thread = fopen(dumpfile, "w"); - fprintf(file_thread, "# rank queue index type subtype waits\n"); + fprintf(file_thread, "# rank queue index type subtype weight flags\n"); fclose(file_thread); } MPI_Barrier(MPI_COMM_WORLD); diff --git a/src/scheduler.h b/src/scheduler.h index 80a7d22409b47f1ec41488e59ad67bb17073c749..9a2caf546e4f09b5ee232559bd8533e01ecbec74 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -113,16 +113,16 @@ struct scheduler { pthread_key_t local_seed_pointer; #ifdef WITH_MPI - /* Task that tests for acknowledged MPI recv requests. */ - struct task *testsome; + /* Tasks that test for acknowledged MPI recv requests. */ + struct task *testsome[task_subtype_count]; /* Array of MPI recv requests and associated task indices. */ - MPI_Request *requests; - int nr_requests; - int nr_size_requests; - struct task **tasks_requests; - int nr_recv_tasks; - swift_lock_type lock_requests; + MPI_Request *requests[task_subtype_count]; + int nr_recv_tasks[task_subtype_count]; + int nr_requests[task_subtype_count]; + int nr_size_requests[task_subtype_count]; + struct task **tasks_requests[task_subtype_count]; + swift_lock_type lock_requests[task_subtype_count]; #endif }; @@ -191,7 +191,8 @@ scheduler_activate_recv(struct scheduler *s, struct link *link, /* Make sure to activate the testsome task. */ #ifdef WITH_MPI - if (s->testsome->skip) scheduler_activate(s, s->testsome); + if (s->testsome[l->t->subtype]->skip) + scheduler_activate(s, s->testsome[l->t->subtype]); #endif return l; } diff --git a/src/task.c b/src/task.c index 83730696a0e66a6f6a0f59533fb391750a28bb9a..4d0be78daa3ec9ab0c271f0f091968fe92fc05bf 100644 --- a/src/task.c +++ b/src/task.c @@ -542,19 +542,27 @@ int task_lock(struct task *t) { /* Communication task? */ case task_type_recv: #ifdef WITH_MPI - if (t->subtype == task_subtype_testsome) { + if (subtype == task_subtype_testsome) { - /* Check for any messages that could be received. - * Don't want any of this to change when MPI is using it, - * but lets not block. */ + /* Check for any messages that could be received. */ + /* Need the global reference to the scheduler. */ struct scheduler *s = scheduler_scheduler; - if (s->nr_requests > 0 && lock_trylock(&s->lock_requests) == 0) { - int nr_requests = s->nr_requests; + + /* Related subtype of recv tasks. */ + int asubtype = t->flags; + swift_lock_type *lock = &s->lock_requests[asubtype]; + + /* Don't want any of this to change when MPI is using it, + * but lets not block. */ + if (s->nr_requests[asubtype] > 0 && lock_trylock(lock) == 0) { + int nr_requests = s->nr_requests[asubtype]; int outcount = 0; int indices[nr_requests]; + MPI_Request *requests = s->requests[asubtype]; + struct task **tasks = s->tasks_requests[asubtype]; - err = MPI_Testsome(nr_requests, s->requests, &outcount, indices, - MPI_STATUSES_IGNORE); + err = MPI_Testsome(nr_requests, requests, &outcount, + indices, MPI_STATUSES_IGNORE); if (err != MPI_SUCCESS) { mpi_error(err, "Failed to test for recv messages"); } @@ -564,24 +572,23 @@ int task_lock(struct task *t) { /* Mark any released tasks as ready. */ for (int k = 0; k < outcount; k++) { int ind = indices[k]; - s->tasks_requests[ind]->req = MPI_REQUEST_NULL; - s->tasks_requests[ind]->recv_ready = 1; + tasks[ind]->req = MPI_REQUEST_NULL; + tasks[ind]->recv_ready = 1; /* And remove from the requests lists by swapping with end. */ if (ind < nr_requests - 1) { - s->requests[ind] = s->requests[nr_requests - 1]; - s->tasks_requests[ind] = s->tasks_requests[nr_requests - 1]; + requests[ind] = requests[nr_requests - 1]; + tasks[ind] = tasks[nr_requests - 1]; nr_requests--; } } - s->nr_requests = nr_requests; + s->nr_requests[asubtype] = nr_requests; /* Decrement total recvs we've seen. */ - s->nr_recv_tasks -= outcount; + s->nr_recv_tasks[asubtype] -= outcount; res = 1; } - if (lock_unlock(&s->lock_requests) != 0) - error("Failed to lock requests"); + if (lock_unlock(lock) != 0) error("Failed to unlock requests"); } return res; }