Skip to content
Snippets Groups Projects
Commit 8cb761fe authored by Peter W. Draper's avatar Peter W. Draper
Browse files

Formatting

parent 605773bb
No related branches found
No related tags found
1 merge request!8Draft: RDMA version with wrapped infinity calls
#!/bin/bash
# Clang format command, can be overridden using CLANG_FORMAT_CMD.
# We currrently use version 5.0 so any overrides should provide that.
clang=${CLANG_FORMAT_CMD:="clang-format-5.0"}
clang=${CLANG_FORMAT_CMD:="clang-format"}
# Formatting command
cmd="$clang -style=file $(git ls-files | grep '\.[ch]$')"
......
......@@ -128,7 +128,8 @@ static int volatile starting[MAX_NR_RANKS] = {1};
*/
static size_t toranktag(int subtype, int sendrank, int recvrank, int tag) {
size_t result = subtype | sendrank << subtype_shift |
recvrank << (subtype_shift + rank_shift)| tag << (subtype_shift * 2 + rank_shift);
recvrank << (subtype_shift + rank_shift) |
tag << (subtype_shift * 2 + rank_shift);
return result;
}
......@@ -255,18 +256,21 @@ static void *send_thread(void *arg) {
/* Extract the offset lists that we use. */
int nr = 0;
int size = (max_logs / 16 + 1);
size_t *ranktags = (size_t *)malloc(size * sizeof(size_t));
size_t *offsets = (size_t *)malloc(size * sizeof(size_t));
size_t *ranktags = (size_t *)malloc(size * sizeof(size_t));
size_t *offsets = (size_t *)malloc(size * sizeof(size_t));
/* A tag that will match any subtype or tag with our destination and source rank. */
/* A tag that will match any subtype or tag with our destination and source
* rank. */
size_t matchranktag = toranktag(0, myrank, rank, 0);
for (size_t j = 0; j < max_logs; j++) {
size_t ranktag = ranktag_lists[INDEX3(MAX_NR_RANKS, nr_ranks, myrank, rank, j)];
size_t ranktag =
ranktag_lists[INDEX3(MAX_NR_RANKS, nr_ranks, myrank, rank, j)];
if ((ranktag & matchranktag) == matchranktag) {
/* Keep this one. */
ranktags[nr] = ranktag;
offsets[nr] = ranktag_offsets[INDEX3(MAX_NR_RANKS, nr_ranks, myrank, rank, j)];
offsets[nr] =
ranktag_offsets[INDEX3(MAX_NR_RANKS, nr_ranks, myrank, rank, j)];
nr++;
if (nr >= size) {
size += (max_logs / 16 + 1);
......@@ -300,7 +304,8 @@ static void *send_thread(void *arg) {
memcpy(sendBuffer->getData(), dataptr, tobytes(datasize));
/* Need to find the offset for this data in the remotes window. */
size_t ranktag = toranktag(log->subtype, log->rank, log->otherrank, log->tag);
size_t ranktag =
toranktag(log->subtype, log->rank, log->otherrank, log->tag);
log->offset = 0;
int found = 0;
......@@ -505,7 +510,8 @@ static size_t pick_logs() {
log->injtic = 0;
log->endtic = 0;
log->data = NULL;
log->ranktag = toranktag(log->subtype, log->otherrank, log->rank, log->tag);
log->ranktag =
toranktag(log->subtype, log->otherrank, log->rank, log->tag);
/* Scale size. */
log->size *= messagescale;
......
......@@ -189,18 +189,18 @@ static void *send_thread(void *arg) {
// Create the QPs connecting to all the other ranks. Note we cannot do this
// until the related servers are up and running, so make sure that is true..
infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **)
calloc(nr_ranks, sizeof(infinity::queues::QueuePair *));
infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **)calloc(
nr_ranks, sizeof(infinity::queues::QueuePair *));
for (int k = 0; k < nr_ranks; k++) {
if (k != myrank) {
char *ip = &servers->ip[k * MPI_MAX_PROCESSOR_NAME];
if (verbose)
message("%d waiting for connection to remote server %s %d on %d", myrank,
ip, k, BASE_PORT + myrank);
message("%d waiting for connection to remote server %s %d on %d",
myrank, ip, k, BASE_PORT + myrank);
qps[k] = qpFactory->connectToRemoteHost(ip, BASE_PORT + myrank);
if (verbose)
message("%d connected to remote server %s %d on %d", myrank, ip,
k, BASE_PORT + myrank);
message("%d connected to remote server %s %d on %d", myrank, ip, k,
BASE_PORT + myrank);
}
}
......@@ -240,7 +240,6 @@ static void *send_thread(void *arg) {
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
for (int k = 0; k < nr_ranks; k++) delete qps[k];
free(qps);
delete qpFactory;
......@@ -374,8 +373,8 @@ static size_t pick_logs() {
size_t maxsize = 0;
/* Queues of send and receive logs. */
send_queue =
(struct mpiuse_log_entry **)calloc(nlogs, sizeof(struct mpiuse_log_entry *));
send_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_send = 0;
struct mpiuse_log_entry **recv_queue = (struct mpiuse_log_entry **)calloc(
......@@ -414,8 +413,8 @@ static size_t pick_logs() {
qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);
/* Now we need to count the numbers of messages to send per rank
* and create sub-queues for these.*/
/* Now we need to count the numbers of messages to send per rank
* and create sub-queues for these.*/
for (int k = 0; k < nr_ranks; k++) nr_recvs[k] = 0;
for (int k = 0; k < nr_recv; k++) {
struct mpiuse_log_entry *log = recv_queue[k];
......@@ -439,8 +438,6 @@ static size_t pick_logs() {
}
free(recv_queue);
if (verbose) {
message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send,
nr_recv);
......
......@@ -185,8 +185,8 @@ static void *send_thread(void *arg) {
new infinity::queues::QueuePairFactory(context);
// Create the QPs connecting to all the other ranks.
infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **)
calloc(nr_ranks, sizeof(infinity::queues::QueuePair *));
infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **)calloc(
nr_ranks, sizeof(infinity::queues::QueuePair *));
// We need to listen for messages from the other rank servers that we can
// connect to them as they need to be up first.
......@@ -215,8 +215,8 @@ static void *send_thread(void *arg) {
ip, index, BASE_PORT + myrank);
qps[index] = qpFactory->connectToRemoteHost(ip, BASE_PORT + myrank);
if (verbose)
message("%d connected to remote server %s %d on %d", myrank, ip,
index, BASE_PORT + myrank);
message("%d connected to remote server %s %d on %d", myrank, ip, index,
BASE_PORT + myrank);
}
// Startup complete, so start timing.
......@@ -261,7 +261,6 @@ static void *send_thread(void *arg) {
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
for (int k = 0; k < nr_ranks; k++) delete qps[k];
free(qps);
delete qpFactory;
......@@ -283,15 +282,17 @@ static void *recv_thread(void *arg) {
new infinity::queues::QueuePairFactory(context);
// Create the QPs connecting to all the other ranks.
infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **)
calloc(nr_ranks, sizeof(infinity::queues::QueuePair *));
infinity::queues::QueuePair **qps = (infinity::queues::QueuePair **)calloc(
nr_ranks, sizeof(infinity::queues::QueuePair *));
// Create buffers to receive messages. Each one is big enough for the
// largest message.
infinity::memory::Buffer **receiveBuffer = (infinity::memory::Buffer **)
calloc(nr_ranks, sizeof(infinity::memory::Buffer *));
infinity::memory::Buffer **receiveBuffer =
(infinity::memory::Buffer **)calloc(nr_ranks,
sizeof(infinity::memory::Buffer *));
for (int k = 0; k < nr_ranks; k++) {
receiveBuffer[k] = new infinity::memory::Buffer(context, tobytes(MESSAGE_SIZE));
receiveBuffer[k] =
new infinity::memory::Buffer(context, tobytes(MESSAGE_SIZE));
context->postReceiveBuffer(receiveBuffer[k]);
}
......@@ -407,12 +408,12 @@ static size_t pick_logs() {
size_t maxsize = 0;
/* Queues of send and receive logs. */
send_queue =
(struct mpiuse_log_entry **)calloc(nlogs, sizeof(struct mpiuse_log_entry *));
send_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_send = 0;
recv_queue =
(struct mpiuse_log_entry **)calloc(nlogs, sizeof(struct mpiuse_log_entry *));
recv_queue = (struct mpiuse_log_entry **)calloc(
nlogs, sizeof(struct mpiuse_log_entry *));
nr_recv = 0;
for (size_t k = 0; k < nlogs; k++) {
......
......@@ -120,7 +120,8 @@ static void *inject_thread(void *arg) {
/* Start ready to receive. */
log->data = calloc(log->size, 1);
//message("recv frm %d, tag %d, size %zd", log->otherrank, log->tag, log->size);
// message("recv frm %d, tag %d, size %zd", log->otherrank, log->tag,
// log->size);
err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
subtypeMPI_comms[log->subtype], &log->req);
......@@ -147,8 +148,8 @@ static void *inject_thread(void *arg) {
/* All done, thread exiting. */
if (verbose) {
message("%d injections completed recvs = %d, sends = %d",
ind_req, nr_recvs, nr_sends);
message("%d injections completed recvs = %d, sends = %d", ind_req, nr_recvs,
nr_sends);
}
message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
clocks_getunit());
......@@ -167,8 +168,9 @@ static void *send_thread(void *arg) {
for (int k = 0; k < nr_sends; k++) {
struct mpiuse_log_entry *log = sends_queue[k];
log->injtic = getticks();
//message("sending to %d, tag %d, size %zd", log->otherrank, log->tag, log->size);
MPI_Send(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
// message("sending to %d, tag %d, size %zd", log->otherrank, log->tag,
// log->size);
MPI_Send(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
subtypeMPI_comms[log->subtype]);
log->done = 1;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment