Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
swiftmpistepsim
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container registry
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
SWIFT
swiftmpistepsim
Commits
549ec37b
Commit
549ec37b
authored
4 years ago
by
Peter W. Draper
Browse files
Options
Downloads
Patches
Plain Diff
Two thread one sided RDMA working...
parent
4cd5e977
Branches
Branches containing commit
No related tags found
1 merge request
!8
Draft: RDMA version with wrapped infinity calls
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
Makefile
+7
-3
7 additions, 3 deletions
Makefile
swiftmpirdmaonestepsim2.c
+749
-0
749 additions, 0 deletions
swiftmpirdmaonestepsim2.c
with
756 additions
and
3 deletions
Makefile
+
7
−
3
View file @
549ec37b
#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined
CFLAGS
=
-g
-O3
-Wall
-Iinfinity
/include
#
CFLAGS = -g -O3 -Wall -Iinfinity/include -DINFINITY_ASSERT_ON
#
CFLAGS = -g -O3 -Wall -Iinfinity/include
CFLAGS
=
-g
-O3
-Wall
-Iinfinity
/include
-DINFINITY_DEBUG_ON
-DINFINITY_ASSERT_ON
#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=thread
INCLUDES
=
mpiuse.h atomic.h cycle.h clocks.h error.h
...
...
@@ -12,7 +12,7 @@ INFINITY = -Linfinity -linfinity -libverbs
all
:
swiftmpistepsim swiftmpistepsim2
\
swiftmpirdmastepsim swiftmpirdmastepsim2 swiftmpirdmastepsim3
\
swiftmpirdmaonestepsim
swiftmpirdmaonestepsim
swiftmpirdmaonestepsim2
swiftmpistepsim
:
swiftmpistepsim.c $(DEPS)
mpicxx
$(
CFLAGS
)
-o
swiftmpistepsim swiftmpistepsim.c
$(
SOURCES
)
...
...
@@ -26,6 +26,9 @@ swiftmpirdmastepsim: swiftmpirdmastepsim.c $(DEPS)
swiftmpirdmaonestepsim
:
swiftmpirdmaonestepsim.c $(DEPS)
mpicxx
$(
CFLAGS
)
-o
swiftmpirdmaonestepsim swiftmpirdmaonestepsim.c
$(
SOURCES
)
$(
INFINITY
)
swiftmpirdmaonestepsim2
:
swiftmpirdmaonestepsim2.c $(DEPS)
mpicxx
$(
CFLAGS
)
-o
swiftmpirdmaonestepsim2 swiftmpirdmaonestepsim2.c
$(
SOURCES
)
$(
INFINITY
)
swiftmpirdmastepsim2
:
swiftmpirdmastepsim2.c $(DEPS)
mpicxx
$(
CFLAGS
)
-o
swiftmpirdmastepsim2 swiftmpirdmastepsim2.c
$(
SOURCES
)
$(
INFINITY
)
...
...
@@ -37,6 +40,7 @@ clean:
rm
-f
swiftmpistepsim2
rm
-f
swiftmpirdmastepsim
rm
-f
swiftmpirdmaonestepsim
rm
-f
swiftmpirdmaonestepsim2
rm
-f
swiftmpirdmastepsim2
rm
-f
swiftmpirdmastepsim3
...
...
This diff is collapsed.
Click to expand it.
swiftmpirdmaonestepsim2.c
0 → 100644
+
749
−
0
View file @
549ec37b
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2020 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
// Pure RDMA version, we use MPI for process control and synchronization.
// Write variant, attempting to use eager-like one-sided sends.
// Variation has a single send and receive thread per rank.
#include
<arpa/inet.h>
#include
<limits.h>
#include
<mpi.h>
#include
<netdb.h>
#include
<pthread.h>
#include
<stdio.h>
#include
<stdlib.h>
#include
<string.h>
#include
<unistd.h>
#include
<infinity/core/Context.h>
#include
<infinity/memory/Buffer.h>
#include
<infinity/memory/RegionToken.h>
#include
<infinity/queues/QueuePair.h>
#include
<infinity/queues/QueuePairFactory.h>
#include
<infinity/requests/RequestToken.h>
#include
"atomic.h"
#include
"clocks.h"
#include
"error.h"
#include
"mpiuse.h"
/* Our rank for all to see. */
int
myrank
=
-
1
;
/* CPU frequency estimate, shared so we do this once. */
static
long
long
cpufreq
=
0
;
/* Number of ranks. */
static
int
nr_ranks
;
/* Base port no. Ranks use +rank. XXX argument, autodiscovery? XXX */
static
int
BASE_PORT
=
27771
;
/* Size of a block of memory. */
#define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
static
const
int
BYTESINBLOCK
=
sizeof
(
BLOCKTYPE
);
/* Flags for controlling access. High end of size_t. */
static
size_t
UNLOCKED
=
(((
size_t
)
2
<<
63
)
-
1
);
/* Size of message header in blocks. The lock/rank, subtype, size and tag. */
static
const
size_t
HEADER_SIZE
=
4
;
/* Maximum Size of a message in blocks. */
static
size_t
max_size
=
0
;
/* Are we verbose. */
static
int
verbose
=
0
;
/* Scale to apply to the size of the messages we send. */
static
float
messagescale
=
1
.
0
;
/* Set a data pattern and check we get this back, slow... */
static
int
datacheck
=
0
;
/* Integer types of send and recv tasks, must match log. */
static
const
int
task_type_send
=
22
;
static
const
int
task_type_recv
=
23
;
/* 3D index of array. */
#define INDEX3(nx, ny, x, y, z) (nx * ny * z + nx * y + x)
/* 2D index of array. */
#define INDEX2(nx, x, y) (nx * y + x)
/* Bit shift to accomodate all the bits of the maximum rank id. */
static
int
rank_shift
=
0
;
/* Bit shift to accomodate all the bits of the maximum subtype. */
static
int
subtype_shift
=
0
;
/* Maximum no. of messages (logs). */
static
size_t
max_logs
=
0
;
/* Offsets of the ranktag regions within the receive windows and lists of the
* assocated tags. */
static
size_t
*
ranktag_sizes
;
static
size_t
*
ranktag_offsets
;
static
size_t
*
ranktag_lists
;
/* The local send queue. */
static
struct
mpiuse_log_entry
**
volatile
send_queue
;
static
int
volatile
nr_sends
=
0
;
static
int
volatile
todo_send
=
0
;
/* Lock for starting up completed. */
static
int
volatile
starting_up
=
1
;
/* Local receive queues separated by rank. */
static
int
volatile
nr_recvs
=
0
;
static
struct
mpiuse_log_entry
**
volatile
recv_queue
;
/**
* @brief Convert ranks, subtype and tag into a single unique value.
*
* Assumes there is enough space in a size_t for these values.
*
* @param subtype the subtype of the message
* @param sendrank the receive rank
* @param recvrank the receive rank
* @param tag the tag
*
* @result a unique value based on both values
*/
static
size_t
toranktag
(
int
subtype
,
int
sendrank
,
int
recvrank
,
int
tag
)
{
size_t
result
=
subtype
|
sendrank
<<
subtype_shift
|
recvrank
<<
(
subtype_shift
+
rank_shift
)
|
tag
<<
(
subtype_shift
*
2
+
rank_shift
);
return
result
;
}
/**
* @brief Find an IP address for the given hostname.
*
* @param hostname the hostname
*
* @result the IP address, note copy away to keep.
*/
static
char
*
toipaddr
(
char
*
hostname
)
{
struct
hostent
*
hostent
=
gethostbyname
(
hostname
);
if
(
hostent
==
NULL
)
{
error
(
"Failed to convert hostname '%s' to an IP address"
,
hostname
);
}
struct
in_addr
**
addr_list
=
(
struct
in_addr
**
)
hostent
->
h_addr_list
;
return
inet_ntoa
(
*
addr_list
[
0
]);
}
/**
* @brief Convert a byte count into a number of blocks, rounds up.
*
* @param nr_bytes the number of bytes.
*
* @result the number of blocks needed.
*/
static
BLOCKTYPE
toblocks
(
BLOCKTYPE
nr_bytes
)
{
return
(
nr_bytes
+
(
BYTESINBLOCK
-
1
))
/
BYTESINBLOCK
;
}
/**
* @brief Convert a block count into a number of bytes.
*
* @param nr_block the number of blocks.
*
* @result the number of bytes.
*/
static
BLOCKTYPE
tobytes
(
BLOCKTYPE
nr_blocks
)
{
return
(
nr_blocks
*
BYTESINBLOCK
);
}
/**
* @brief fill a data area with given value.
*
* @param size size of data in bytes.
* @param data the data to fill.
* @param value the value to fill.
*/
static
void
datacheck_fill
(
BLOCKTYPE
size
,
BLOCKTYPE
*
data
,
BLOCKTYPE
value
)
{
for
(
BLOCKTYPE
i
=
0
;
i
<
size
;
i
++
)
{
data
[
i
]
=
value
;
}
}
/**
* @brief test a filled data area for a value.
*
* @param size size of data in bytes.
* @param data the data to check.
* @param value the value expected.
*
* @result 1 on success, 0 otherwise.
*/
static
int
datacheck_test
(
BLOCKTYPE
size
,
BLOCKTYPE
*
data
,
BLOCKTYPE
value
)
{
for
(
size_t
i
=
0
;
i
<
size
;
i
++
)
{
if
(
data
[
i
]
!=
value
)
{
message
(
"see %zd expected %zd @ %zd (%zd to go)"
,
data
[
i
],
value
,
i
,
size
);
return
0
;
}
}
return
1
;
}
/* Struct of server ip addresses as formatted strings.*/
struct
servers
{
char
*
ip
;
};
/**
* @brief Send thread, sends RDMA messages to the other ranks.
*
* Messages are all considered in order.
*/
static
void
*
send_thread
(
void
*
arg
)
{
// Get the destination IPs and ranks.
struct
servers
*
servers
=
(
struct
servers
*
)
arg
;
// Need a factory to create QPs.
infinity
::
core
::
Context
*
context
=
new
infinity
::
core
::
Context
();
infinity
::
queues
::
QueuePairFactory
*
qpFactory
=
new
infinity
::
queues
::
QueuePairFactory
(
context
);
// Create the QPs connecting to all the other ranks.
infinity
::
queues
::
QueuePair
**
qps
=
(
infinity
::
queues
::
QueuePair
**
)
calloc
(
nr_ranks
,
sizeof
(
infinity
::
queues
::
QueuePair
*
));
// And pointers to the remote memory.
infinity
::
memory
::
RegionToken
**
remoteBufferToken
=
(
infinity
::
memory
::
RegionToken
**
)
calloc
(
nr_ranks
,
sizeof
(
infinity
::
memory
::
RegionToken
*
));
// We need to listen for messages from the other rank servers that we can
// connect to them as they need to be up first.
int
buf
[
1
];
MPI_Request
reqs
[
nr_ranks
];
reqs
[
myrank
]
=
MPI_REQUEST_NULL
;
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
{
if
(
k
!=
myrank
)
{
MPI_Irecv
(
buf
,
1
,
MPI_INT
,
k
,
k
,
MPI_COMM_WORLD
,
&
reqs
[
k
]);
}
}
/* Now we poll for any servers that are ready to connect. */
int
index
;
MPI_Status
status
;
while
(
1
)
{
MPI_Waitany
(
nr_ranks
,
reqs
,
&
index
,
&
status
);
// All done when all requests have completed.
if
(
index
==
MPI_UNDEFINED
)
break
;
// Got one, so connect.
char
*
ip
=
&
servers
->
ip
[
index
*
MPI_MAX_PROCESSOR_NAME
];
if
(
verbose
)
message
(
"%d waiting for connection to remote server %s %d on %d"
,
myrank
,
ip
,
index
,
BASE_PORT
+
myrank
);
qps
[
index
]
=
qpFactory
->
connectToRemoteHost
(
ip
,
BASE_PORT
+
myrank
);
if
(
verbose
)
message
(
"%d connected to remote server %s %d on %d"
,
myrank
,
ip
,
index
,
BASE_PORT
+
myrank
);
remoteBufferToken
[
index
]
=
(
infinity
::
memory
::
RegionToken
*
)
qps
[
index
]
->
getUserData
();
if
(
remoteBufferToken
[
index
]
==
NULL
)
{
message
(
"remoteBufferToken for %d is NULL"
,
index
);
}
else
{
message
(
"remoteBufferToken for %d not NULL"
,
index
);
}
}
/* Register some memory for use by RDMA, make it large enough for our
* biggest message. */
auto
*
sendBuffer
=
new
infinity
::
memory
::
Buffer
(
context
,
tobytes
(
max_size
));
// Startup complete, so start timing and release the receive thread.
MPI_Barrier
(
MPI_COMM_WORLD
);
// Vital for synchronization.
clocks_set_cpufreq
(
cpufreq
);
starting_up
=
0
;
message
(
"All synchronized"
);
ticks
starttics
=
getticks
();
for
(
int
k
=
0
;
k
<
nr_sends
;
k
++
)
{
struct
mpiuse_log_entry
*
log
=
send_queue
[
k
];
/* Data has the actual data and room for the header. */
BLOCKTYPE
datasize
=
toblocks
(
log
->
size
)
+
HEADER_SIZE
;
BLOCKTYPE
*
dataptr
=
(
BLOCKTYPE
*
)
calloc
(
datasize
,
BYTESINBLOCK
);
log
->
data
=
dataptr
;
log
->
injtic
=
getticks
();
/* Fill data with pattern. */
if
(
datacheck
)
datacheck_fill
(
toblocks
(
log
->
size
),
&
dataptr
[
HEADER_SIZE
],
log
->
tag
);
/* First element is the lock element, which can have any value other than
* UNLOCKED, the other elements define an MPI message. */
dataptr
[
0
]
=
myrank
;
dataptr
[
1
]
=
log
->
subtype
;
dataptr
[
2
]
=
log
->
size
;
dataptr
[
3
]
=
log
->
tag
;
/* Copy this to the registered memory. */
memcpy
(
sendBuffer
->
getData
(),
dataptr
,
tobytes
(
datasize
));
/* Need to find the offset for this data in the remotes window. */
size_t
ranktag
=
toranktag
(
log
->
subtype
,
log
->
rank
,
log
->
otherrank
,
log
->
tag
);
log
->
offset
=
0
;
int
found
=
0
;
// XXX must be faster way of doing this... Go back to per rank lists.
for
(
size_t
j
=
0
;
j
<
max_logs
;
j
++
)
{
if
(
ranktag_lists
[
INDEX3
(
nr_ranks
,
nr_ranks
,
myrank
,
log
->
otherrank
,
j
)]
==
ranktag
)
{
log
->
offset
=
ranktag_offsets
[
INDEX3
(
nr_ranks
,
nr_ranks
,
myrank
,
log
->
otherrank
,
j
)];
found
=
1
;
break
;
}
}
if
(
!
found
)
{
error
(
"Failed sending a message of size %zd to %d/%d "
"@ %zd
\n
, no offset found for ranktag %zd"
,
datasize
,
log
->
otherrank
,
log
->
subtype
,
log
->
offset
,
ranktag
);
}
if
(
verbose
)
message
(
"sending message subtype %d from %d to %d todo: %d/%d, offset %ld size %ld"
,
log
->
subtype
,
myrank
,
log
->
otherrank
,
nr_sends
-
k
,
nr_sends
,
log
->
offset
,
datasize
);
// And send
infinity
::
requests
::
RequestToken
requestToken
(
context
);
qps
[
log
->
otherrank
]
->
write
(
sendBuffer
,
0
,
// localOffset
remoteBufferToken
[
log
->
otherrank
],
// destination
tobytes
(
log
->
offset
),
// remoteOffset
tobytes
(
datasize
),
// sizeInBytes
infinity
::
queues
::
OperationFlags
(),
&
requestToken
);
requestToken
.
waitUntilCompleted
();
requestToken
.
reset
();
// Now we update the unlock field.
((
BLOCKTYPE
*
)
sendBuffer
->
getData
())[
0
]
=
UNLOCKED
;
qps
[
log
->
otherrank
]
->
write
(
sendBuffer
,
0
,
// localOffset
remoteBufferToken
[
log
->
otherrank
],
// destination
tobytes
(
log
->
offset
),
// remoteOffset
BYTESINBLOCK
,
// sizeInBytes
infinity
::
queues
::
OperationFlags
(),
&
requestToken
);
requestToken
.
waitUntilCompleted
();
// Since we reuse the sendBuffer.
// requestToken.reset();
log
->
endtic
=
getticks
();
}
message
(
"took %.3f %s"
,
clocks_from_ticks
(
getticks
()
-
starttics
),
clocks_getunit
());
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
delete
qps
[
k
];
free
(
qps
);
delete
sendBuffer
;
delete
qpFactory
;
delete
context
;
return
NULL
;
}
/**
* @brief recv thread, listens for remote sends from another rank.
*/
static
void
*
recv_thread
(
void
*
arg
)
{
struct
servers
*
servers
=
(
struct
servers
*
)
arg
;
// Need a factory to create QPs.
infinity
::
core
::
Context
*
context
=
new
infinity
::
core
::
Context
();
infinity
::
queues
::
QueuePairFactory
*
qpFactory
=
new
infinity
::
queues
::
QueuePairFactory
(
context
);
// Create the QPs connecting to all the other ranks.
infinity
::
queues
::
QueuePair
**
qps
=
(
infinity
::
queues
::
QueuePair
**
)
calloc
(
nr_ranks
,
sizeof
(
infinity
::
queues
::
QueuePair
*
));
// Create buffers to receive all the remote data.
infinity
::
memory
::
Buffer
**
bufferToReadWrite
=
(
infinity
::
memory
::
Buffer
**
)
calloc
(
nr_ranks
,
sizeof
(
infinity
::
memory
::
Buffer
*
));
infinity
::
memory
::
RegionToken
**
bufferToken
=
(
infinity
::
memory
::
RegionToken
**
)
calloc
(
nr_ranks
,
sizeof
(
infinity
::
memory
::
RegionToken
*
));
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
{
if
(
ranktag_sizes
[
k
]
>
0
)
{
bufferToReadWrite
[
k
]
=
new
infinity
::
memory
::
Buffer
(
context
,
tobytes
(
ranktag_sizes
[
k
]));
bufferToken
[
k
]
=
bufferToReadWrite
[
k
]
->
createRegionToken
();
}
else
{
// Dummy.
bufferToReadWrite
[
k
]
=
new
infinity
::
memory
::
Buffer
(
context
,
BYTESINBLOCK
);
bufferToken
[
k
]
=
bufferToReadWrite
[
k
]
->
createRegionToken
();
}
}
// Do the port binding for each other rank.
int
buf
[
1
];
MPI_Request
req
;
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
{
if
(
k
!=
myrank
)
{
if
(
verbose
)
message
(
"%d binding to %d on port %d"
,
myrank
,
k
,
BASE_PORT
+
k
);
qpFactory
->
bindToPort
(
BASE_PORT
+
k
);
// Send message this port is about to block for a connection.
if
(
verbose
)
message
(
"Blocking for first message on %d"
,
BASE_PORT
+
k
);
MPI_Isend
(
buf
,
1
,
MPI_INT
,
k
,
myrank
,
MPI_COMM_WORLD
,
&
req
);
qps
[
k
]
=
qpFactory
->
acceptIncomingConnection
(
bufferToken
[
k
],
sizeof
(
infinity
::
memory
::
RegionToken
));
if
(
verbose
)
message
(
"Accepting incoming connections on %d"
,
BASE_PORT
+
k
);
}
}
message
(
"All incomings up"
);
/* Now we wait for the remotes to connect before proceeding, otherwise the
* timings will be incorrect. */
while
(
starting_up
)
;
/* Ignore the timing on previous part, which is fixed. */
ticks
starttics
=
getticks
();
/* We loop while new requests are being send and we still have messages
* to receive. */
int
todo_recv
=
nr_recvs
;
while
(
todo_recv
>
0
)
{
/* Loop over remaining messages, checking if any have been unlocked. */
for
(
int
k
=
0
;
k
<
nr_recvs
;
k
++
)
{
struct
mpiuse_log_entry
*
log
=
recv_queue
[
k
];
if
(
log
!=
NULL
&&
!
log
->
done
)
{
/* On the first attempt we start listening for this receive. */
if
(
log
->
injtic
==
0
)
log
->
injtic
=
getticks
();
/* Get find the data for this message. */
BLOCKTYPE
*
dataptr
=
&
((
BLOCKTYPE
*
)
bufferToReadWrite
[
log
->
otherrank
]
->
getData
())[
log
->
offset
];
/* Check if this has been unlocked. */
BLOCKTYPE
volatile
lock
=
dataptr
[
0
];
if
(
lock
==
UNLOCKED
)
{
// XXX should check these for correctness.
// int subtype = dataptr[1];
// size_t size = dataptr[2];
// int tag = dataptr[3];
if
(
verbose
)
message
(
"receive message subtype %d from %d to %d todo: %d/%d,"
" offset %ld size %ld"
,
log
->
subtype
,
myrank
,
log
->
otherrank
,
todo_recv
,
nr_recvs
,
log
->
offset
,
toblocks
(
log
->
size
)
+
HEADER_SIZE
);
/* Check data sent data is unchanged and received data is as
* expected. */
if
(
datacheck
&&
!
datacheck_test
(
toblocks
(
log
->
size
),
&
dataptr
[
HEADER_SIZE
],
log
->
tag
))
{
message
(
"Data mismatch on completion"
);
}
/* Done, clean up. */
log
->
done
=
1
;
log
->
endtic
=
getticks
();
todo_recv
--
;
}
}
}
}
message
(
"took %.3f %s"
,
clocks_from_ticks
(
getticks
()
-
starttics
),
clocks_getunit
());
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
{
delete
qps
[
k
];
delete
bufferToReadWrite
[
k
];
delete
bufferToken
[
k
];
}
free
(
qps
);
delete
context
;
delete
qpFactory
;
/* Thread exits. */
return
NULL
;
}
/**
* @brief Comparison function for tags.
*/
static
int
cmp_logs
(
const
void
*
p1
,
const
void
*
p2
)
{
struct
mpiuse_log_entry
*
l1
=
*
(
struct
mpiuse_log_entry
**
)
p1
;
struct
mpiuse_log_entry
*
l2
=
*
(
struct
mpiuse_log_entry
**
)
p2
;
if
(
l1
->
tic
>
l2
->
tic
)
return
1
;
if
(
l1
->
tic
<
l2
->
tic
)
return
-
1
;
return
0
;
}
/**
* @brief Pick out the relevant logging data for our rank.
*/
static
size_t
pick_logs
()
{
size_t
nlogs
=
mpiuse_nr_logs
();
size_t
maxsize
=
0
;
/* Queues of send and receive logs. */
send_queue
=
(
struct
mpiuse_log_entry
**
)
calloc
(
nlogs
,
sizeof
(
struct
mpiuse_log_entry
*
));
nr_sends
=
0
;
recv_queue
=
(
struct
mpiuse_log_entry
**
)
calloc
(
nlogs
,
sizeof
(
struct
mpiuse_log_entry
*
));
nr_recvs
=
0
;
for
(
size_t
k
=
0
;
k
<
nlogs
;
k
++
)
{
struct
mpiuse_log_entry
*
log
=
mpiuse_get_log
(
k
);
if
(
log
->
activation
)
{
if
(
log
->
rank
==
myrank
)
{
log
->
done
=
0
;
log
->
injtic
=
0
;
log
->
endtic
=
0
;
log
->
data
=
NULL
;
log
->
ranktag
=
toranktag
(
log
->
subtype
,
log
->
otherrank
,
log
->
rank
,
log
->
tag
);
/* Scale size. */
log
->
size
*=
messagescale
;
if
(
log
->
type
==
task_type_send
)
{
send_queue
[
nr_sends
]
=
log
;
nr_sends
++
;
}
else
if
(
log
->
type
==
task_type_recv
)
{
recv_queue
[
nr_recvs
]
=
log
;
nr_recvs
++
;
}
else
{
error
(
"task type '%d' is not a known send or recv task"
,
log
->
type
);
}
}
/* Across all ranks. */
if
(
log
->
size
>
maxsize
)
maxsize
=
log
->
size
;
}
}
/* Sort into increasing tic. */
qsort
(
recv_queue
,
nr_recvs
,
sizeof
(
struct
mpiuse_log_entry
*
),
cmp_logs
);
qsort
(
send_queue
,
nr_sends
,
sizeof
(
struct
mpiuse_log_entry
*
),
cmp_logs
);
/* Offsets and ranktags. */
ranktag_offsets
=
(
size_t
*
)
calloc
(
nr_ranks
*
nr_ranks
*
max_logs
,
sizeof
(
size_t
));
ranktag_lists
=
(
size_t
*
)
calloc
(
nr_ranks
*
nr_ranks
*
max_logs
,
sizeof
(
size_t
));
ranktag_sizes
=
(
size_t
*
)
calloc
(
nr_ranks
*
nr_ranks
*
max_logs
,
sizeof
(
size_t
));
/* Setup the ranktag offsets for our receive windows. Also define the sizes
* of the windows. */
for
(
int
k
=
0
;
k
<
nr_recvs
;
k
++
)
{
struct
mpiuse_log_entry
*
log
=
recv_queue
[
k
];
ranktag_lists
[
INDEX3
(
nr_ranks
,
nr_ranks
,
log
->
otherrank
,
log
->
rank
,
k
)]
=
log
->
ranktag
;
ranktag_offsets
[
INDEX3
(
nr_ranks
,
nr_ranks
,
log
->
otherrank
,
log
->
rank
,
k
)]
=
ranktag_sizes
[
log
->
otherrank
];
log
->
offset
=
ranktag_sizes
[
log
->
otherrank
];
/* Need to use a multiple of blocks to keep alignment. */
size_t
size
=
toblocks
(
log
->
size
)
+
HEADER_SIZE
;
ranktag_sizes
[
log
->
otherrank
]
+=
size
;
}
if
(
verbose
)
{
message
(
"maxsize = %zd, nr_sends = %d, nr_recvs = %d"
,
maxsize
,
nr_sends
,
nr_recvs
);
}
return
maxsize
;
}
/**
* @brief usage help.
*/
static
void
usage
(
char
*
argv
[])
{
fprintf
(
stderr
,
"Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat
\n
"
,
argv
[
0
]);
fprintf
(
stderr
,
" options: -v verbose
\n
"
);
fflush
(
stderr
);
}
/**
* @brief main function.
*/
int
main
(
int
argc
,
char
*
argv
[])
{
/* Start time for logging. This will be reset to restart time. */
clocks_set_cpufreq
(
0
);
cpufreq
=
clocks_get_cpufreq
();
/* Initiate MPI. */
int
prov
=
0
;
int
res
=
MPI_Init_thread
(
&
argc
,
&
argv
,
MPI_THREAD_MULTIPLE
,
&
prov
);
if
(
res
!=
MPI_SUCCESS
)
error
(
"Call to MPI_Init_thread failed with error %i."
,
res
);
res
=
MPI_Comm_size
(
MPI_COMM_WORLD
,
&
nr_ranks
);
if
(
res
!=
MPI_SUCCESS
)
error
(
"MPI_Comm_size failed with error %i."
,
res
);
res
=
MPI_Comm_rank
(
MPI_COMM_WORLD
,
&
myrank
);
if
(
res
!=
MPI_SUCCESS
)
error
(
"Call to MPI_Comm_rank failed with error %i."
,
res
);
/* Handle the command-line, we expect a mpiuse data file to read and various
* options. */
int
opt
;
while
((
opt
=
getopt
(
argc
,
argv
,
"vds:"
))
!=
-
1
)
{
switch
(
opt
)
{
case
'd'
:
datacheck
=
1
;
break
;
case
'v'
:
verbose
=
1
;
break
;
case
's'
:
messagescale
=
atof
(
optarg
);
break
;
default:
if
(
myrank
==
0
)
usage
(
argv
);
return
1
;
}
}
if
(
optind
>=
argc
-
1
)
{
if
(
myrank
==
0
)
usage
(
argv
);
return
1
;
}
char
*
infile
=
argv
[
optind
];
char
*
logfile
=
argv
[
optind
+
1
];
/* Now we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
* log per rank per step, so you need to combine all ranks from a step. */
mpiuse_log_restore
(
infile
);
int
nranks
=
mpiuse_nr_ranks
();
/* This should match the expected size. */
if
(
nr_ranks
!=
nranks
)
error
(
"The number of MPI ranks %d does not match the expected value %d"
,
nranks
,
nr_ranks
);
/* Index of most significant bits in the maximum rank and subtypes.
* Assumes GCC intrinsic. */
rank_shift
=
(
sizeof
(
int
)
*
CHAR_BIT
)
-
__builtin_clz
(
nr_ranks
);
subtype_shift
=
(
sizeof
(
int
)
*
CHAR_BIT
)
-
__builtin_clz
(
32
);
/* We all need to agree on a maximum count of logs, so we can exchange the
* offset arrays (would be ragged otherwise and difficult to exchange). */
max_logs
=
mpiuse_nr_logs
()
/
2
+
1
;
MPI_Allreduce
(
MPI_IN_PLACE
,
&
max_logs
,
1
,
MPI_AINT
,
MPI_MAX
,
MPI_COMM_WORLD
);
/* Extract the send and recv messages for our rank. */
size_t
maxsize
=
pick_logs
();
/* Largest Size of a message. Needs to align on size_t. */
max_size
=
toblocks
(
maxsize
)
+
HEADER_SIZE
;
/* We need to share all the offsets for each communicator with all the other
* ranks so they can push data into the correct parts of our receive
* window. */
MPI_Allreduce
(
MPI_IN_PLACE
,
ranktag_offsets
,
nr_ranks
*
nr_ranks
*
max_logs
,
MPI_AINT
,
MPI_SUM
,
MPI_COMM_WORLD
);
MPI_Allreduce
(
MPI_IN_PLACE
,
ranktag_lists
,
nr_ranks
*
nr_ranks
*
max_logs
,
MPI_AINT
,
MPI_SUM
,
MPI_COMM_WORLD
);
/* Now for the RDMA setup. We need the IP addresses of all the ranks. */
/* Each rank can find its name and IP. */
char
name
[
MPI_MAX_PROCESSOR_NAME
];
int
namelen
=
0
;
MPI_Get_processor_name
(
name
,
&
namelen
);
char
ip
[
MPI_MAX_PROCESSOR_NAME
];
strncpy
(
ip
,
toipaddr
(
name
),
MPI_MAX_PROCESSOR_NAME
);
/* And distribute, so we all know everyone's IPs. */
struct
servers
servers
;
servers
.
ip
=
(
char
*
)
malloc
(
sizeof
(
char
)
*
nr_ranks
*
MPI_MAX_PROCESSOR_NAME
);
MPI_Allgather
(
ip
,
MPI_MAX_PROCESSOR_NAME
,
MPI_BYTE
,
servers
.
ip
,
MPI_MAX_PROCESSOR_NAME
,
MPI_BYTE
,
MPI_COMM_WORLD
);
if
(
myrank
==
0
&&
verbose
)
{
message
(
"RDMA servers will listen on:"
);
for
(
int
j
=
0
;
j
<
nr_ranks
;
j
++
)
{
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
{
if
(
k
!=
j
)
{
message
(
" %d: %s on port %d"
,
j
,
&
servers
.
ip
[
j
*
MPI_MAX_PROCESSOR_NAME
],
BASE_PORT
+
k
);
}
}
}
}
/* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier
(
MPI_COMM_WORLD
);
clocks_set_cpufreq
(
cpufreq
);
if
(
myrank
==
0
)
{
message
(
"Start of MPI tests"
);
message
(
"=================="
);
if
(
verbose
)
{
if
(
datacheck
)
message
(
"checking data pattern on send and recv completion"
);
}
}
/* Server rank that listens for connections and receives data. */
pthread_t
recvthread
;
if
(
pthread_create
(
&
recvthread
,
NULL
,
&
recv_thread
,
&
servers
)
!=
0
)
error
(
"Failed to create recv thread."
);
/* Now we have a single thread to send the messages. */
pthread_t
sendthread
;
if
(
pthread_create
(
&
sendthread
,
NULL
,
&
send_thread
,
&
servers
)
!=
0
)
error
(
"Failed to create send thread."
);
/* Wait until all threads have exited and all message exchanges have
* completed. */
pthread_join
(
sendthread
,
NULL
);
pthread_join
(
recvthread
,
NULL
);
/* Dump the updated MPI logs. */
MPI_Barrier
(
MPI_COMM_WORLD
);
if
(
myrank
==
0
)
message
(
"Dumping updated log"
);
mpiuse_dump_logs
(
nranks
,
logfile
);
/* Shutdown MPI. */
res
=
MPI_Finalize
();
if
(
res
!=
MPI_SUCCESS
)
error
(
"call to MPI_Finalize failed with error %i."
,
res
);
/* Free resources. */
free
(
servers
.
ip
);
if
(
myrank
==
0
)
message
(
"Bye"
);
return
0
;
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment