Skip to content
Snippets Groups Projects

[WIP] Comm tasks are special

Closed Pedro Gonnet requested to merge comm_tasks_are_special into master
2 files
+ 163
1
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 153
1
@@ -36,6 +36,7 @@
/* Local headers. */
#include "atomic.h"
#include "error.h"
#include "memswap.h"
/**
* @brief Enqueue all tasks in the incoming DEQ.
@@ -69,6 +70,48 @@ void queue_get_incoming(struct queue *q) {
q->tid = tid = temp;
}
/* Is the next task a comms task? If so, store it in the mpi_requests
* buffer. */
#ifdef WITH_MPI
if (tasks[offset].type == task_type_send ||
tasks[offset].type == task_type_recv) {
/* Do we need to grow the requests buffer? */
if (q->mpi_requests_count == q->mpi_requests_size) {
q->mpi_requests_size *= 2;
MPI_Request *mpi_requests_new = NULL;
int *mpi_requests_tid_new = NULL;
int *mpi_requests_index_new = NULL;
if ((mpi_requests_new = (MPI_Request *)malloc(
sizeof(MPI_Request) * q->mpi_requests_size)) == NULL ||
(mpi_requests_tid_new =
(int *)malloc(sizeof(int) * q->mpi_requests_size)) == NULL ||
(mpi_requests_index_new =
(int *)malloc(sizeof(int) * q->mpi_requests_size)) == NULL)
error("Failed to re-allocate MPI_Request buffers.");
memcpy(mpi_requests_new, q->mpi_requests,
sizeof(MPI_Request) * q->mpi_requests_count);
memcpy(mpi_requests_tid_new, q->mpi_requests_tid,
sizeof(int) * q->mpi_requests_count);
memcpy(mpi_requests_index_new, q->mpi_requests_index,
sizeof(int) * q->mpi_requests_count);
free(q->mpi_requests);
q->mpi_requests = mpi_requests_new;
free(q->mpi_requests_tid);
q->mpi_requests_tid = mpi_requests_tid_new;
free(q->mpi_requests_index);
q->mpi_requests_index = mpi_requests_index_new;
}
/* Add the MPI_Request to the list of monitored requests. */
q->mpi_requests[q->mpi_requests_count] = tasks[offset].req;
q->mpi_requests_tid[q->mpi_requests_count] = offset;
q->mpi_requests_count += 1;
/* Don't need to add this task to the regular queue. */
continue;
}
#endif // WITH_MPI
/* Drop the task at the end of the queue. */
tid[q->count] = offset;
q->count += 1;
@@ -153,7 +196,100 @@ void queue_init(struct queue *q, struct task *tasks) {
q->first_incoming = 0;
q->last_incoming = 0;
q->count_incoming = 0;
#ifdef WITH_MPI
/* Allocate buffers for special treatment of communication tasks. */
if ((q->mpi_requests = (MPI_Request *)malloc(sizeof(MPI_Request) *
queue_sizeinit)) == NULL ||
(q->mpi_requests_tid = (int *)malloc(sizeof(int) * queue_sizeinit)) ==
NULL ||
(q->mpi_requests_index = (int *)malloc(sizeof(int) * queue_sizeinit)) ==
NULL)
error("Failed to allocate MPI_Request buffers.");
q->mpi_requests_size = queue_sizeinit;
q->mpi_requests_count = 0;
q->mpi_requests_index_count = 0;
#endif // WITH_MPI
}
/**
* @breif Check if we have any ready MPI tasks in the mpi_requests list.
*
* @param queue The #queue we're checking, should be locked.
*
* @return A pointer to a task that's ready to run, or NULL.
*/
#ifdef WITH_MPI
struct task *queue_get_comm_task(struct queue *q) {
/* Bail if we don't have any requests. */
if (q->mpi_requests_count == 0) return NULL;
/* Check if any of the requests are done. */
#if 0
/* Don't do anything clever, call MPI_Testany and take the first request
that completed. */
int offset = MPI_UNDEFINED;
MPI_Status status;
int res;
int flag;
if ((res = MPI_Testany(q->mpi_requests_count, q->mpi_requests, &offset, &flag,
&status)) != MPI_SUCCESS)
mpi_error(res, "MPI_Testany failed.");
/* Did we get anything useful? */
if (flag) {
/* Swap things around and return the completed task. */
struct task *task = &q->tasks[q->mpi_requests_tid[offset]];
q->mpi_requests_count -= 1;
q->mpi_requests[offset] = q->mpi_requests[q->mpi_requests_count];
q->mpi_requests_tid[offset] = q->mpi_requests_tid[q->mpi_requests_count];
return task;
}
#else
/* Try to be clever and populate an array of indices of completed tasks with
MPI_Testsome, and return a task from that index. This could reduce the
number of times we run through the whole array of requests with
MPI_Testany. */
/* Populate the completed request indices, if needed. */
if (q->mpi_requests_index_count == 0) {
int res;
if ((res = MPI_Testsome(q->mpi_requests_count, q->mpi_requests,
&q->mpi_requests_index_count, q->mpi_requests_index,
MPI_STATUSES_IGNORE)) != MPI_SUCCESS)
mpi_error(res, "MPI_Testsome failed.");
/* Ensure that the indices are sorted in increasing order. We do this to
be sure that every time we take a request off the mpi_requests list
below, we won't mess up the indices themselves. */
for (int k = 0; k < q->mpi_requests_index_count - 1; k++) {
for (int j = k + 1; j < q->mpi_requests_index_count; j++) {
if (q->mpi_requests_index[k] > q->mpi_requests_index[j]) {
memswap(&q->mpi_requests_index[k], &q->mpi_requests_index[j],
sizeof(int));
}
}
}
}
/* Check if we already have the index of a completed comm task. */
if (q->mpi_requests_index_count > 0) {
/* Pick the task of the index. */
q->mpi_requests_index_count -= 1;
const int offset = q->mpi_requests_index[q->mpi_requests_index_count];
struct task *task = &q->tasks[q->mpi_requests_tid[offset]];
/* Remove the task from the list of mpi_requests. */
q->mpi_requests_count -= 1;
q->mpi_requests[offset] = q->mpi_requests[q->mpi_requests_count];
q->mpi_requests_tid[offset] = q->mpi_requests_tid[q->mpi_requests_count];
return task;
}
#endif
/* Fall-through, didn't find anything. */
return NULL;
}
#endif // WITH_MPI
/**
* @brief Get a task free of dependencies and conflicts.
@@ -179,11 +315,23 @@ struct task *queue_gettask(struct queue *q, const struct task *prev,
queue_get_incoming(q);
/* If there are no tasks, leave immediately. */
#ifdef WITH_MPI
if (q->count == 0 && q->mpi_requests_count == 0) {
#else
if (q->count == 0) {
#endif // WITH_MPI
lock_unlock_blind(qlock);
return NULL;
}
#ifdef WITH_MPI
/* Try to get a comms task first. */
if ((res = queue_get_comm_task(q)) != NULL) {
lock_unlock_blind(qlock);
return res;
}
#endif // WITH_MPI
/* Set some pointers we will use often. */
int *qtid = q->tid;
struct task *qtasks = q->tasks;
@@ -297,7 +445,11 @@ struct task *queue_gettask(struct queue *q, const struct task *prev,
}
void queue_clean(struct queue *q) {
free(q->tid);
free(q->tid_incoming);
#ifdef WITH_MPI
free(q->mpi_requests);
free(q->mpi_requests_tid);
free(q->mpi_requests_index);
#endif // WITH_MPI
}
Loading