Skip to content
Snippets Groups Projects
Commit 77f9fcfb authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Use Rput and Put and avoid local synchronization calls. This is faster than...

Use Rput and Put and avoid local synchronization calls. This is faster than p2p, but it is not 100% certain about the order of delivery.
parent e8f53fe7
No related branches found
No related tags found
1 merge request!11Draft: Fast one-sided MPI version
#CFLAGS = -g -O0 -Wall -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined #CFLAGS = -g -O0 -Wall -std=gnu11 -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined
CFLAGS = -g -O0 -Wall CFLAGS = -g -O0 -Wall -std=gnu11
all: swiftmpistepsim swiftmpifakestepsim swiftmpirdmastepsim all: swiftmpistepsim swiftmpifakestepsim swiftmpirdmastepsim
......
...@@ -96,8 +96,8 @@ static double messagescale = 1.0; ...@@ -96,8 +96,8 @@ static double messagescale = 1.0;
static int datacheck = 0; static int datacheck = 0;
/* Integer types of send and recv tasks, must match log. */ /* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22; static const int task_type_send = 25;
static const int task_type_recv = 23; static const int task_type_recv = 26;
/* Global ommunicators for each of the subtypes. */ /* Global ommunicators for each of the subtypes. */
static MPI_Comm subtypeMPI_comms[task_subtype_count]; static MPI_Comm subtypeMPI_comms[task_subtype_count];
...@@ -116,7 +116,6 @@ static size_t *ranktag_lists; ...@@ -116,7 +116,6 @@ static size_t *ranktag_lists;
/* The local send queue. */ /* The local send queue. */
static struct mpiuse_log_entry **volatile send_queue; static struct mpiuse_log_entry **volatile send_queue;
static int volatile nr_send = 0; static int volatile nr_send = 0;
static int volatile todo_send = 0;
static int volatile injecting = 1; static int volatile injecting = 1;
/* The local receive queue. */ /* The local receive queue. */
...@@ -126,7 +125,6 @@ static int volatile todo_recv = 0; ...@@ -126,7 +125,6 @@ static int volatile todo_recv = 0;
/* The local requests queue. */ /* The local requests queue. */
static struct mpiuse_log_entry **volatile req_queue; static struct mpiuse_log_entry **volatile req_queue;
static int volatile ind_req = 0;
static int volatile nr_req = 0; static int volatile nr_req = 0;
static int volatile todo_req = 0; static int volatile todo_req = 0;
...@@ -210,7 +208,10 @@ static void *send_thread(void *arg) { ...@@ -210,7 +208,10 @@ static void *send_thread(void *arg) {
ticks starttics = getticks(); ticks starttics = getticks();
injecting = 1; injecting = 1;
double tsum = 0.0;
for (int k = 0; k < nr_send; k++) { for (int k = 0; k < nr_send; k++) {
ticks tics = getticks();
struct mpiuse_log_entry *log = send_queue[k]; struct mpiuse_log_entry *log = send_queue[k];
if (log == NULL) error("NULL send message queued (%d/%d)", k, nr_send); if (log == NULL) error("NULL send message queued (%d/%d)", k, nr_send);
...@@ -257,21 +258,24 @@ static void *send_thread(void *arg) { ...@@ -257,21 +258,24 @@ static void *send_thread(void *arg) {
} }
/* And start the send of data to other rank. */ /* And start the send of data to other rank. */
int ret = MPI_Raccumulate(&dataptr[1], datasize - 1, MPI_BLOCKTYPE, int ret = MPI_Rput(&dataptr[1], datasize - 1, MPI_BLOCKTYPE,
log->otherrank, log->offset + 1, datasize - 1, log->otherrank, log->offset + 1, datasize - 1,
MPI_BLOCKTYPE, MPI_REPLACE, MPI_BLOCKTYPE, mpi_window[log->subtype], &log->req);
mpi_window[log->subtype], &log->req); if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to put data");
if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data");
/* Add to the requests queue. */ /* Add to the requests queue. */
int ind = atomic_inc(&nr_req); int ind = atomic_inc(&nr_req);
req_queue[ind] = log; req_queue[ind] = log;
atomic_inc(&todo_req); atomic_inc(&todo_req);
ticks dt = getticks() - tics;
tsum += (double)dt;
} }
/* All done. */ /* All done. */
atomic_dec(&injecting); atomic_dec(&injecting);
message("sum = %f, mean = %f", clocks_from_ticks(tsum), clocks_from_ticks(tsum / nr_send));
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics), message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit()); clocks_getunit());
...@@ -298,29 +302,15 @@ static void *req_thread(void *arg) { ...@@ -298,29 +302,15 @@ static void *req_thread(void *arg) {
error("MPI_Test call failed"); error("MPI_Test call failed");
} }
if (res) { if (res) {
// Start new epoch on window?
int ret =
MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS)
mpi_error_message(ret, "MPI_Win_flush failed");
/* Send the UNLOCKED message. */ /* Send the UNLOCKED message. */
BLOCKTYPE newval[1]; BLOCKTYPE newval[1];
BLOCKTYPE oldval[1]; BLOCKTYPE oldval[1];
newval[0] = UNLOCKED; newval[0] = UNLOCKED;
oldval[0] = 0; oldval[0] = 0;
ret = MPI_Compare_and_swap(&newval[0], log->data, &oldval[0], int ret = MPI_Put(&newval[0], 1, MPI_BLOCKTYPE,
MPI_BLOCKTYPE, log->otherrank, log->offset, log->otherrank, log->offset, 1,
mpi_window[log->subtype]); MPI_BLOCKTYPE, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to put unlock");
if (ret != MPI_SUCCESS)
mpi_error_message(ret, "MPI_Compare_and_swap error");
/* And complete. */
ret = MPI_Win_flush_local(log->otherrank, mpi_window[log->subtype]);
if (ret != MPI_SUCCESS)
mpi_error_message(ret, "MPI_Win_flush failed");
/* Done. */ /* Done. */
log->done = 1; log->done = 1;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment