Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
swiftmpistepsim
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container registry
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
SWIFT
swiftmpistepsim
Commits
4cd5e977
Commit
4cd5e977
authored
4 years ago
by
Peter W. Draper
Browse files
Options
Downloads
Patches
Plain Diff
Variation of standard MPI that uses synchronous MPI_Send
parent
3a085e82
No related branches found
No related tags found
1 merge request
!8
Draft: RDMA version with wrapped infinity calls
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
Makefile
+9
-2
9 additions, 2 deletions
Makefile
swiftmpistepsim2.c
+478
-0
478 additions, 0 deletions
swiftmpistepsim2.c
with
487 additions
and
2 deletions
Makefile
+
9
−
2
View file @
4cd5e977
#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined
#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined
CFLAGS
=
-g
-O3
-Wall
-Iinfinity
/include
-DINFINITY_ASSERT_ON
CFLAGS
=
-g
-O3
-Wall
-Iinfinity
/include
#CFLAGS = -g -O3 -Wall -Iinfinity/include -DINFINITY_ASSERT_ON
#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=thread
#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=thread
INCLUDES
=
mpiuse.h atomic.h cycle.h clocks.h error.h
INCLUDES
=
mpiuse.h atomic.h cycle.h clocks.h error.h
...
@@ -9,11 +10,16 @@ DEPS = Makefile $(SOURCES) $(INCLUDES)
...
@@ -9,11 +10,16 @@ DEPS = Makefile $(SOURCES) $(INCLUDES)
INFINITY
=
-Linfinity
-linfinity
-libverbs
INFINITY
=
-Linfinity
-linfinity
-libverbs
all
:
swiftmpistepsim swiftmpirdmastepsim swiftmpirdmaonestepsim swiftmpirdmastepsim2 swiftmpirdmastepsim3
all
:
swiftmpistepsim swiftmpistepsim2
\
swiftmpirdmastepsim swiftmpirdmastepsim2 swiftmpirdmastepsim3
\
swiftmpirdmaonestepsim
swiftmpistepsim
:
swiftmpistepsim.c $(DEPS)
swiftmpistepsim
:
swiftmpistepsim.c $(DEPS)
mpicxx
$(
CFLAGS
)
-o
swiftmpistepsim swiftmpistepsim.c
$(
SOURCES
)
mpicxx
$(
CFLAGS
)
-o
swiftmpistepsim swiftmpistepsim.c
$(
SOURCES
)
swiftmpistepsim2
:
swiftmpistepsim2.c $(DEPS)
mpicxx
$(
CFLAGS
)
-o
swiftmpistepsim2 swiftmpistepsim2.c
$(
SOURCES
)
swiftmpirdmastepsim
:
swiftmpirdmastepsim.c $(DEPS)
swiftmpirdmastepsim
:
swiftmpirdmastepsim.c $(DEPS)
mpicxx
$(
CFLAGS
)
-o
swiftmpirdmastepsim swiftmpirdmastepsim.c
$(
SOURCES
)
$(
INFINITY
)
mpicxx
$(
CFLAGS
)
-o
swiftmpirdmastepsim swiftmpirdmastepsim.c
$(
SOURCES
)
$(
INFINITY
)
...
@@ -28,6 +34,7 @@ swiftmpirdmastepsim3: swiftmpirdmastepsim3.c $(DEPS)
...
@@ -28,6 +34,7 @@ swiftmpirdmastepsim3: swiftmpirdmastepsim3.c $(DEPS)
clean
:
clean
:
rm
-f
swiftmpistepsim
rm
-f
swiftmpistepsim
rm
-f
swiftmpistepsim2
rm
-f
swiftmpirdmastepsim
rm
-f
swiftmpirdmastepsim
rm
-f
swiftmpirdmaonestepsim
rm
-f
swiftmpirdmaonestepsim
rm
-f
swiftmpirdmastepsim2
rm
-f
swiftmpirdmastepsim2
...
...
This diff is collapsed.
Click to expand it.
swiftmpistepsim2.c
0 → 100644
+
478
−
0
View file @
4cd5e977
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2019 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
#include
<limits.h>
#include
<mpi.h>
#include
<pthread.h>
#include
<stdio.h>
#include
<stdlib.h>
#include
<unistd.h>
#include
"atomic.h"
#include
"clocks.h"
#include
"error.h"
#include
"mpiuse.h"
/* Global: Our rank for all to see. */
int
myrank
=
-
1
;
/* Are we verbose. */
static
int
verbose
=
0
;
/* Scale to apply to the size of the messages we send. */
static
float
messagescale
=
1
.
0
;
/* Set a data pattern and check we get this back, slow... */
static
int
datacheck
=
0
;
/* Integer types of send and recv tasks, must match log. */
static
const
int
task_type_send
=
22
;
static
const
int
task_type_recv
=
23
;
/* Global communicators for each of the subtypes. */
static
const
int
task_subtype_count
=
30
;
// Just some upper limit on subtype.
static
MPI_Comm
subtypeMPI_comms
[
30
];
/* The local queues. */
static
struct
mpiuse_log_entry
**
volatile
reqs_queue
;
static
int
volatile
ind_req
=
0
;
static
int
volatile
nr_reqs
=
0
;
static
struct
mpiuse_log_entry
**
volatile
recvs_queue
;
static
int
volatile
nr_recvs
=
0
;
static
int
volatile
ind_recv
=
0
;
static
int
volatile
todo_recv
=
0
;
static
struct
mpiuse_log_entry
**
volatile
sends_queue
;
static
int
volatile
nr_sends
=
0
;
static
int
volatile
ind_send
=
0
;
/* CPU frequency of the machine that created the MPI log. */
// XXX need to store this in the data file.
static
double
log_clocks_cpufreq
=
2194844448
.
0
;
/**
* @brief fill a data area with a pattern that can be checked for changes.
*
* @param size size of data in bytes.
* @param data the data to fill.
*/
static
void
datacheck_fill
(
size_t
size
,
void
*
data
)
{
unsigned
char
*
p
=
(
unsigned
char
*
)
data
;
for
(
size_t
i
=
0
;
i
<
size
;
i
++
)
{
p
[
i
]
=
170
;
/* 10101010 in bits. */
}
}
/**
* @brief test a filled data area for our pattern.
*
* @param size size of data in bytes.
* @param data the data to fill.
*
* @result 1 on success, 0 otherwise.
*/
static
int
datacheck_test
(
size_t
size
,
void
*
data
)
{
unsigned
char
*
p
=
(
unsigned
char
*
)
data
;
for
(
size_t
i
=
0
;
i
<
size
;
i
++
)
{
if
(
p
[
i
]
!=
170
)
return
0
;
}
return
1
;
}
/**
* @brief Injection thread, initiates MPI_Irecv requests.
*/
static
void
*
inject_thread
(
void
*
arg
)
{
if
(
verbose
)
message
(
"%d: injection thread starts"
,
*
((
int
*
)
arg
));
ticks
starttics
=
getticks
();
while
(
ind_req
<
nr_reqs
)
{
struct
mpiuse_log_entry
*
log
=
reqs_queue
[
ind_req
];
/* Initialise new log elements. */
log
->
injtic
=
getticks
();
log
->
done
=
0
;
log
->
nr_tests
=
0
;
log
->
tsum
=
0
.
0
;
log
->
tmax
=
0
;
log
->
tmin
=
INT_MAX
;
log
->
endtic
=
0
;
/* Differences to SWIFT: MPI_BYTE not the MPI_Type. */
int
err
=
0
;
if
(
log
->
type
==
task_type_recv
)
{
/* Start ready to receive. */
log
->
data
=
calloc
(
log
->
size
,
1
);
//message("recv frm %d, tag %d, size %zd", log->otherrank, log->tag, log->size);
err
=
MPI_Irecv
(
log
->
data
,
log
->
size
,
MPI_BYTE
,
log
->
otherrank
,
log
->
tag
,
subtypeMPI_comms
[
log
->
subtype
],
&
log
->
req
);
/* Add a new recv request. */
int
ind
=
atomic_inc
(
&
nr_recvs
);
recvs_queue
[
ind
]
=
log
;
atomic_inc
(
&
todo_recv
);
}
else
{
log
->
data
=
calloc
(
log
->
size
,
1
);
/* Fill data with pattern. */
if
(
datacheck
)
datacheck_fill
(
log
->
size
,
log
->
data
);
/* And add to queue for sending. */
int
ind
=
atomic_inc
(
&
nr_sends
);
sends_queue
[
ind
]
=
log
;
}
if
(
err
!=
MPI_SUCCESS
)
error
(
"Failed to activate send or recv"
);
ind_req
++
;
}
/* All done, thread exiting. */
if
(
verbose
)
{
message
(
"%d injections completed recvs = %d, sends = %d"
,
ind_req
,
nr_recvs
,
nr_sends
);
}
message
(
"took %.3f %s."
,
clocks_from_ticks
(
getticks
()
-
starttics
),
clocks_getunit
());
return
NULL
;
}
/**
* @brief send thread sends synchronous MPI_Send. Cannot start until
* injections are complete, so different to optimal behaviour.
*/
static
void
*
send_thread
(
void
*
arg
)
{
if
(
verbose
)
message
(
"%d: send thread starts"
,
*
((
int
*
)
arg
));
ticks
starttics
=
getticks
();
for
(
int
k
=
0
;
k
<
nr_sends
;
k
++
)
{
struct
mpiuse_log_entry
*
log
=
sends_queue
[
k
];
log
->
injtic
=
getticks
();
//message("sending to %d, tag %d, size %zd", log->otherrank, log->tag, log->size);
MPI_Send
(
log
->
data
,
log
->
size
,
MPI_BYTE
,
log
->
otherrank
,
log
->
tag
,
subtypeMPI_comms
[
log
->
subtype
]);
log
->
done
=
1
;
log
->
endtic
=
getticks
();
free
(
log
->
data
);
}
/* All done, thread exiting. */
message
(
"took %.3f %s."
,
clocks_from_ticks
(
getticks
()
-
starttics
),
clocks_getunit
());
return
NULL
;
}
/**
* @brief main loop to run over a queue of MPI requests and test for when they
* complete. Returns the total amount of time spent in calls to MPI_Test and
* the number of times it was called.
*
* @param logs the list of logs pointing to requests.
* @param nr_logs pointer to the variable containing the current number of
* logs.
* @param todos pointer to the variable containing the number of requests that
* are still active.
* @param sum the total number of ticks spent in calls to MPI_Test.
* @param ncalls the total number of calls to MPI_Test.
* @param mint the minimum ticks an MPI_Test call took.
* @param maxt the maximum ticks an MPI_Test call took.
*/
static
void
queue_runner
(
struct
mpiuse_log_entry
**
logs
,
int
volatile
*
nr_logs
,
int
volatile
*
todos
,
double
*
sum
,
int
*
ncalls
,
ticks
*
mint
,
ticks
*
maxt
)
{
/* Global MPI_Test statistics. */
int
lncalls
=
0
;
double
lsum
=
0
.
0
;
ticks
lmint
=
INT_MAX
;
ticks
lmaxt
=
0
;
/* We loop while new requests are being injected and we still have requests
* to complete. */
while
(
*
todos
>
0
)
{
int
nlogs
=
*
nr_logs
;
for
(
int
k
=
0
;
k
<
nlogs
;
k
++
)
{
struct
mpiuse_log_entry
*
log
=
logs
[
k
];
if
(
log
!=
NULL
&&
!
log
->
done
)
{
ticks
tics
=
getticks
();
int
res
;
MPI_Status
stat
;
int
err
=
MPI_Test
(
&
log
->
req
,
&
res
,
&
stat
);
if
(
err
!=
MPI_SUCCESS
)
{
error
(
"MPI_Test call failed"
);
}
/* Increment etc. of statistics about time in MPI_Test. */
ticks
dt
=
getticks
()
-
tics
;
log
->
tsum
+=
(
double
)
dt
;
lsum
+=
(
double
)
dt
;
log
->
nr_tests
++
;
lncalls
++
;
if
(
dt
<
log
->
tmin
)
log
->
tmin
=
dt
;
if
(
dt
>
log
->
tmax
)
log
->
tmax
=
dt
;
if
(
dt
<
lmint
)
lmint
=
dt
;
if
(
dt
>
lmaxt
)
lmaxt
=
dt
;
if
(
res
)
{
/* Check data sent data is unchanged and received data is as
* expected. */
if
(
datacheck
&&
!
datacheck_test
(
log
->
size
,
log
->
data
))
{
error
(
"Data mismatch on completion"
);
}
/* Done, clean up. */
log
->
done
=
1
;
log
->
endtic
=
getticks
();
free
(
log
->
data
);
atomic_dec
(
todos
);
}
}
}
}
/* All done. */
*
sum
=
lsum
;
*
ncalls
=
lncalls
;
*
mint
=
lmint
;
*
maxt
=
lmaxt
;
return
;
}
/**
* @brief Recv thread, checks if MPI_Irecv requests have completed.
*/
static
void
*
recv_thread
(
void
*
arg
)
{
if
(
verbose
)
message
(
"%d: recv thread starts"
,
*
((
int
*
)
arg
));
ticks
starttics
=
getticks
();
int
ncalls
;
double
sum
;
ticks
mint
;
ticks
maxt
;
queue_runner
(
recvs_queue
,
&
nr_recvs
,
&
todo_recv
,
&
sum
,
&
ncalls
,
&
mint
,
&
maxt
);
message
(
"%d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, max time "
"%.3f (%s)"
,
ncalls
,
clocks_from_ticks
(
sum
),
clocks_from_ticks
(
sum
/
ncalls
),
clocks_from_ticks
(
mint
),
clocks_from_ticks
(
maxt
),
clocks_getunit
());
if
(
verbose
)
message
(
"took %.3f %s."
,
clocks_from_ticks
(
getticks
()
-
starttics
),
clocks_getunit
());
/* Thread exits. */
return
NULL
;
}
/**
* @brief Comparison function for logged times.
*/
static
int
cmp_logs
(
const
void
*
p1
,
const
void
*
p2
)
{
struct
mpiuse_log_entry
*
l1
=
*
(
struct
mpiuse_log_entry
**
)
p1
;
struct
mpiuse_log_entry
*
l2
=
*
(
struct
mpiuse_log_entry
**
)
p2
;
/* Large unsigned values, so take care. */
if
(
l1
->
tic
>
l2
->
tic
)
return
1
;
if
(
l1
->
tic
<
l2
->
tic
)
return
-
1
;
return
0
;
}
/**
* @brief Pick out the relevant logging data for our rank, i.e. all
* activations of sends and recvs. We ignore the original completions.
* The final list is sorted into increasing time of activation.
*/
static
void
pick_logs
(
void
)
{
size_t
nlogs
=
mpiuse_nr_logs
();
/* Duplicate of logs. */
reqs_queue
=
(
struct
mpiuse_log_entry
**
)
calloc
(
nlogs
,
sizeof
(
struct
mpiuse_log_entry
*
));
nr_reqs
=
0
;
sends_queue
=
(
struct
mpiuse_log_entry
**
)
calloc
(
nlogs
,
sizeof
(
struct
mpiuse_log_entry
*
));
nr_sends
=
0
;
recvs_queue
=
(
struct
mpiuse_log_entry
**
)
calloc
(
nlogs
,
sizeof
(
struct
mpiuse_log_entry
*
));
nr_recvs
=
0
;
for
(
size_t
k
=
0
;
k
<
nlogs
;
k
++
)
{
struct
mpiuse_log_entry
*
log
=
mpiuse_get_log
(
k
);
if
(
log
->
rank
==
myrank
&&
log
->
activation
)
{
if
(
log
->
type
==
task_type_send
||
log
->
type
==
task_type_recv
)
{
/* And keep this log. */
log
->
data
=
NULL
;
reqs_queue
[
nr_reqs
]
=
log
;
nr_reqs
++
;
/* Scale size. */
log
->
size
*=
messagescale
;
}
else
{
error
(
"task type '%d' is not a known send or recv task"
,
log
->
type
);
}
}
}
/* Sort into increasing time. */
qsort
(
reqs_queue
,
nr_reqs
,
sizeof
(
struct
mpiuse_log_entry
*
),
cmp_logs
);
/* Check. */
for
(
int
k
=
0
;
k
<
nr_reqs
-
1
;
k
++
)
{
if
(
reqs_queue
[
k
]
->
tic
>
reqs_queue
[
k
+
1
]
->
tic
)
message
(
"reqs_queue: %lld > %lld"
,
reqs_queue
[
k
]
->
tic
,
reqs_queue
[
k
+
1
]
->
tic
);
}
}
/**
* @brief usage help.
*/
static
void
usage
(
char
*
argv
[])
{
fprintf
(
stderr
,
"Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat
\n
"
,
argv
[
0
]);
fprintf
(
stderr
,
" options: -v verbose, -f fast injections
\n
"
);
fflush
(
stderr
);
}
/**
* @brief main function.
*/
int
main
(
int
argc
,
char
*
argv
[])
{
/* Initiate MPI. */
int
prov
=
0
;
int
res
=
MPI_Init_thread
(
&
argc
,
&
argv
,
MPI_THREAD_MULTIPLE
,
&
prov
);
if
(
res
!=
MPI_SUCCESS
)
error
(
"Call to MPI_Init_thread failed with error %i."
,
res
);
int
nr_nodes
=
0
;
res
=
MPI_Comm_size
(
MPI_COMM_WORLD
,
&
nr_nodes
);
if
(
res
!=
MPI_SUCCESS
)
error
(
"MPI_Comm_size failed with error %i."
,
res
);
res
=
MPI_Comm_rank
(
MPI_COMM_WORLD
,
&
myrank
);
if
(
res
!=
MPI_SUCCESS
)
error
(
"Call to MPI_Comm_rank failed with error %i."
,
res
);
/* Handle the command-line, we expect a mpiuse data file to read and various
* options. */
int
opt
;
while
((
opt
=
getopt
(
argc
,
argv
,
"vds:"
))
!=
-
1
)
{
switch
(
opt
)
{
case
'd'
:
datacheck
=
1
;
break
;
case
's'
:
messagescale
=
atof
(
optarg
);
break
;
case
'v'
:
verbose
=
1
;
break
;
default:
if
(
myrank
==
0
)
usage
(
argv
);
return
1
;
}
}
if
(
optind
>=
argc
-
1
)
{
if
(
myrank
==
0
)
usage
(
argv
);
return
1
;
}
char
*
infile
=
argv
[
optind
];
char
*
logfile
=
argv
[
optind
+
1
];
/* Now we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
* log per rank per step, so you need to combine all ranks from a step. */
mpiuse_log_restore
(
infile
);
int
nranks
=
mpiuse_nr_ranks
();
/* This should match the expected size. */
if
(
nr_nodes
!=
nranks
)
error
(
"The number of MPI ranks %d does not match the expected value %d"
,
nranks
,
nr_nodes
);
/* Create communicators for each subtype of the tasks. */
for
(
int
i
=
0
;
i
<
task_subtype_count
;
i
++
)
{
MPI_Comm_dup
(
MPI_COMM_WORLD
,
&
subtypeMPI_comms
[
i
]);
}
/* Each rank requires its own queue, so extract them. */
pick_logs
();
/* Time to start time. Try to make it synchronous across the ranks. */
clocks_set_cpufreq
(
0
);
long
long
freq
=
clocks_get_cpufreq
();
MPI_Barrier
(
MPI_COMM_WORLD
);
clocks_set_cpufreq
(
freq
);
if
(
myrank
==
0
)
{
message
(
"Start of MPI tests"
);
message
(
"=================="
);
if
(
verbose
)
{
if
(
datacheck
)
message
(
"checking data pattern on send and recv completion"
);
}
}
/* Make three threads, one for injecting recv tasks. One to do the sends and
* the other to poll for the recv completions. */
pthread_t
injectthread
;
if
(
pthread_create
(
&
injectthread
,
NULL
,
&
inject_thread
,
&
myrank
)
!=
0
)
error
(
"Failed to create injection thread."
);
// XXX need to wait for this phase to stop.
pthread_join
(
injectthread
,
NULL
);
fflush
(
stdout
);
MPI_Barrier
(
MPI_COMM_WORLD
);
pthread_t
sendthread
;
if
(
pthread_create
(
&
sendthread
,
NULL
,
&
send_thread
,
&
myrank
)
!=
0
)
error
(
"Failed to create send thread."
);
pthread_t
recvthread
;
if
(
pthread_create
(
&
recvthread
,
NULL
,
&
recv_thread
,
&
myrank
)
!=
0
)
error
(
"Failed to create recv thread."
);
/* Wait until all threads have exited and all MPI requests have completed. */
pthread_join
(
sendthread
,
NULL
);
pthread_join
(
recvthread
,
NULL
);
/* Dump the updated MPI logs. */
MPI_Barrier
(
MPI_COMM_WORLD
);
fflush
(
stdout
);
if
(
myrank
==
0
)
message
(
"Dumping updated log"
);
mpiuse_dump_logs
(
nranks
,
logfile
);
/* Shutdown MPI. */
res
=
MPI_Finalize
();
if
(
res
!=
MPI_SUCCESS
)
error
(
"call to MPI_Finalize failed with error %i."
,
res
);
if
(
myrank
==
0
)
message
(
"Bye"
);
return
0
;
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment