Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
swiftmpistepsim
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Container registry
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
SWIFT
swiftmpistepsim
Commits
2e8cbf16
Commit
2e8cbf16
authored
Jul 7, 2020
by
Peter W. Draper
Browse files
Options
Downloads
Patches
Plain Diff
Start to add one-sided RDMA version
parent
988a1b06
Branches
Branches containing commit
No related tags found
1 merge request
!8
Draft: RDMA version with wrapped infinity calls
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
Makefile
+6
-2
6 additions, 2 deletions
Makefile
swiftmpirdmaonestepsim.c
+612
-0
612 additions, 0 deletions
swiftmpirdmaonestepsim.c
with
618 additions
and
2 deletions
Makefile
+
6
−
2
View file @
2e8cbf16
#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined
CFLAGS
=
-g
-O
0
-Wall
-Iinfinity
/include
CFLAGS
=
-g
-O
3
-Wall
-Iinfinity
/include
#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=thread
all
:
swiftmpistepsim swiftmpirdmastepsim
all
:
swiftmpistepsim swiftmpirdmastepsim
swiftmpirdmaonestepsim
swiftmpistepsim
:
swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h
mpicxx
$(
CFLAGS
)
-o
swiftmpistepsim swiftmpistepsim.c mpiuse.c clocks.c
...
...
@@ -12,8 +12,12 @@ swiftmpistepsim: swiftmpistepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h c
swiftmpirdmastepsim
:
swiftmpirdmastepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h
mpicxx
$(
CFLAGS
)
-o
swiftmpirdmastepsim swiftmpirdmastepsim.c mpiuse.c clocks.c
-Linfinity
-linfinity
-libverbs
swiftmpirdmaonestepsim
:
swiftmpirdmaonestepsim.c mpiuse.c mpiuse.h atomic.h cycle.h clocks.h clocks.c error.h
mpicxx
$(
CFLAGS
)
-o
swiftmpirdmaonestepsim swiftmpirdmaonestepsim.c mpiuse.c clocks.c
-Linfinity
-linfinity
-libverbs
clean
:
rm
-f
swiftmpistepsim
rm
-f
swiftmpirdmastepsim
rm
-f
swiftmpirdmaonestepsim
This diff is collapsed.
Click to expand it.
swiftmpirdmaonestepsim.c
0 → 100644
+
612
−
0
View file @
2e8cbf16
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2020 Peter W. Draper
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
// Pure RDMA version, we use MPI for process control and synchronization.
#include
<arpa/inet.h>
#include
<limits.h>
#include
<mpi.h>
#include
<netdb.h>
#include
<pthread.h>
#include
<stdio.h>
#include
<stdlib.h>
#include
<string.h>
#include
<unistd.h>
#include
<infinity/core/Context.h>
#include
<infinity/memory/Buffer.h>
#include
<infinity/memory/RegionToken.h>
#include
<infinity/queues/QueuePair.h>
#include
<infinity/queues/QueuePairFactory.h>
#include
<infinity/requests/RequestToken.h>
#include
"atomic.h"
#include
"clocks.h"
#include
"error.h"
#include
"mpiuse.h"
/* Our rank for all to see. */
int
myrank
=
-
1
;
/* Number of ranks. */
static
int
nr_ranks
;
/* Base port no. Ranks use +rank. */
static
int
BASE_PORT
=
27771
;
/* Size of a block of memory. MESSAGE_SIZE needs to be a multiple of this as
* as we need to align in memory. */
#define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
static
const
int
BYTESINBLOCK
=
sizeof
(
BLOCKTYPE
);
/* Size of message header in blocks. The rank, subtype, size and tag. */
static
const
size_t
HEADER_SIZE
=
4
;
/* Size of a message board in blocks, we have one of these per rank per
* communicator (i.e. per window). */
static
size_t
MESSAGE_SIZE
=
0
;
/* Are we verbose. */
static
int
verbose
=
0
;
/* Set a data pattern and check we get this back, slow... */
static
int
datacheck
=
0
;
/* Integer types of send and recv tasks, must match log. */
static
const
int
task_type_send
=
22
;
static
const
int
task_type_recv
=
23
;
/* Global communicators for each of the subtypes. */
#define task_subtype_count 22 // Just some upper limit on subtype.
/* The local send queue. */
static
struct
mpiuse_log_entry
**
volatile
send_queue
;
static
int
volatile
nr_send
=
0
;
static
int
volatile
todo_send
=
0
;
/* Local receive queues separated by rank. XXX needs to be dynamic. */
static
int
volatile
nr_recvs
[
16
]
=
{
0
};
static
struct
mpiuse_log_entry
**
volatile
recvs_queue
[
16
];
/* Starting up the server ends. */
static
int
volatile
starting
[
16
]
=
{
1
};
// XXX needs to be dynamic...
/**
* @brief Find an IP address for the given hostname.
*
* @param hostname the hostname
*
* @result the IP address, note copy away to keep.
*/
static
char
*
toipaddr
(
char
*
hostname
)
{
struct
hostent
*
hostent
=
gethostbyname
(
hostname
);
if
(
hostent
==
NULL
)
{
error
(
"Failed to convert hostname '%s' to an IP address"
,
hostname
);
}
struct
in_addr
**
addr_list
=
(
struct
in_addr
**
)
hostent
->
h_addr_list
;
return
inet_ntoa
(
*
addr_list
[
0
]);
}
/**
* @brief Convert a byte count into a number of blocks, rounds up.
*
* @param nr_bytes the number of bytes.
*
* @result the number of blocks needed.
*/
static
int
toblocks
(
BLOCKTYPE
nr_bytes
)
{
return
(
nr_bytes
+
(
BYTESINBLOCK
-
1
))
/
BYTESINBLOCK
;
}
/**
* @brief Convert a block count into a number of bytes.
*
* @param nr_block the number of blocks.
*
* @result the number of bytes.
*/
static
BLOCKTYPE
tobytes
(
int
nr_blocks
)
{
return
(
nr_blocks
*
BYTESINBLOCK
);
}
/**
* @brief fill a data area with given value.
*
* @param size size of data in bytes.
* @param data the data to fill.
* @param value the value to fill.
*/
static
void
datacheck_fill
(
BLOCKTYPE
size
,
BLOCKTYPE
*
data
,
BLOCKTYPE
value
)
{
for
(
BLOCKTYPE
i
=
0
;
i
<
size
;
i
++
)
{
data
[
i
]
=
value
;
}
}
/**
* @brief test a filled data area for a value.
*
* @param size size of data in bytes.
* @param data the data to check.
* @param value the value expected.
*
* @result 1 on success, 0 otherwise.
*/
static
int
datacheck_test
(
BLOCKTYPE
size
,
BLOCKTYPE
*
data
,
BLOCKTYPE
value
)
{
for
(
size_t
i
=
0
;
i
<
size
;
i
++
)
{
if
(
data
[
i
]
!=
value
)
{
message
(
"see %zd expected %zd @ %zd (%zd to go)"
,
data
[
i
],
value
,
i
,
size
);
return
0
;
}
}
return
1
;
}
struct
stuff
{
char
server_ip
[
32
];
int
rank
;
};
/**
* @brief Send thread, sends RDMA messages to another rank.
*
* Messages are all considered in order.
*/
static
void
*
send_thread
(
void
*
arg
)
{
ticks
starttics
=
getticks
();
// Get the destination IP and rank.
struct
stuff
*
stuff
=
(
struct
stuff
*
)
arg
;
char
*
server_ip
=
stuff
->
server_ip
;
int
rank
=
stuff
->
rank
;
// Need a factory to create QP.
infinity
::
core
::
Context
*
context
=
new
infinity
::
core
::
Context
();
infinity
::
queues
::
QueuePairFactory
*
qpFactory
=
new
infinity
::
queues
::
QueuePairFactory
(
context
);
// Get QP to the other rank. Note we cannot do this until the related
// server is up and running, so make sure that is true..
//message("%d waiting for connection to remote server %s %d on %d", myrank,
// server_ip, rank, BASE_PORT + myrank);
auto
*
qp
=
qpFactory
->
connectToRemoteHost
(
server_ip
,
BASE_PORT
+
myrank
);
//message("%d connected to remote server %s %d on %d", myrank, server_ip, rank,
// BASE_PORT + myrank);
for
(
int
k
=
0
;
k
<
nr_send
;
k
++
)
{
struct
mpiuse_log_entry
*
log
=
send_queue
[
k
];
// Only send messages to the expected rank.
if
(
log
->
otherrank
!=
rank
)
continue
;
/* Data has the actual data and room for the header. */
BLOCKTYPE
datasize
=
toblocks
(
log
->
size
)
+
HEADER_SIZE
;
BLOCKTYPE
*
dataptr
=
(
BLOCKTYPE
*
)
calloc
(
datasize
,
BYTESINBLOCK
);
log
->
data
=
dataptr
;
log
->
injtic
=
getticks
();
/* Fill data with pattern. */
if
(
datacheck
)
datacheck_fill
(
toblocks
(
log
->
size
),
&
dataptr
[
HEADER_SIZE
],
log
->
tag
);
/* First element has our rank, other elements replicate what we need to
* define an MPI message. */
dataptr
[
0
]
=
myrank
;
dataptr
[
1
]
=
log
->
subtype
;
dataptr
[
2
]
=
log
->
size
;
dataptr
[
3
]
=
log
->
tag
;
/* Need to assign to a buffer to register memory. */
auto
*
sendBuffer
=
new
infinity
::
memory
::
Buffer
(
context
,
dataptr
,
tobytes
(
datasize
));
// And send
infinity
::
requests
::
RequestToken
requestToken
(
context
);
qp
->
send
(
sendBuffer
,
&
requestToken
);
requestToken
.
waitUntilCompleted
();
log
->
endtic
=
getticks
();
delete
sendBuffer
;
// XXX Can we reuse ?
}
message
(
"took %.3f %s."
,
clocks_from_ticks
(
getticks
()
-
starttics
),
clocks_getunit
());
delete
qp
;
delete
qpFactory
;
delete
context
;
return
NULL
;
}
/**
* @brief recv thread, listens for remote sends from another rank.
*/
static
void
*
recv_thread
(
void
*
arg
)
{
int
rank
=
*
(
int
*
)
arg
;
ticks
starttics
=
getticks
();
// Each receive port needs a factory to create QPs.
auto
*
context
=
new
infinity
::
core
::
Context
();
auto
*
qpFactory
=
new
infinity
::
queues
::
QueuePairFactory
(
context
);
// Create buffer to receive messages. Only size for one, or not...
auto
*
receiveBuffer
=
new
infinity
::
memory
::
Buffer
(
context
,
16
*
tobytes
(
MESSAGE_SIZE
));
context
->
postReceiveBuffer
(
receiveBuffer
);
// Port binding.
//message("%d binding to %d on port %d", myrank, rank, BASE_PORT + rank);
fflush
(
stdout
);
qpFactory
->
bindToPort
(
BASE_PORT
+
rank
);
//message("Blocking for first message on %d", BASE_PORT + rank);
starting
[
rank
]
=
0
;
// really need to do this in acceptIncomingConnection().
auto
qp
=
qpFactory
->
acceptIncomingConnection
();
// We block here for first message.
//message("Accepting incoming connections on %d", BASE_PORT + rank);
/* No. of receives to process and associated queue. */
int
todo_recv
=
nr_recvs
[
rank
];
struct
mpiuse_log_entry
**
recv_queue
=
recvs_queue
[
rank
];
/* We loop while new requests are being send and we still have messages
* to receive. */
infinity
::
core
::
receive_element_t
receiveElement
;
while
(
todo_recv
>
0
)
{
while
(
!
context
->
receive
(
&
receiveElement
))
;
// Unpack the header.
BLOCKTYPE
*
dataptr
=
(
BLOCKTYPE
*
)
receiveElement
.
buffer
->
getData
();
int
rank
=
dataptr
[
0
];
int
subtype
=
dataptr
[
1
];
size_t
size
=
dataptr
[
2
];
int
tag
=
dataptr
[
3
];
/* Now find the associated log. XXX speed this up, local queue. */
int
found
=
0
;
for
(
int
k
=
0
;
k
<
nr_recvs
[
rank
];
k
++
)
{
struct
mpiuse_log_entry
*
log
=
recv_queue
[
k
];
if
(
log
!=
NULL
&&
!
log
->
done
)
{
/* On the first attempt we start listening for this receive. */
if
(
log
->
injtic
==
0
)
log
->
injtic
=
getticks
();
if
(
log
->
otherrank
==
rank
&&
log
->
subtype
==
subtype
&&
log
->
size
==
size
&&
log
->
tag
==
tag
)
{
found
=
1
;
if
(
verbose
)
message
(
"receive message subtype %d from %d on %d"
,
log
->
subtype
,
rank
,
myrank
);
/* Check data sent data is unchanged and received data is as
* expected. */
if
(
datacheck
&&
!
datacheck_test
(
toblocks
(
log
->
size
),
&
dataptr
[
HEADER_SIZE
],
log
->
tag
))
{
message
(
"Data mismatch on completion"
);
}
/* Done, clean up. */
log
->
done
=
1
;
// free(log->data); // XXX should really offload the data to be fair.
log
->
endtic
=
getticks
();
todo_recv
--
;
}
}
}
if
(
!
found
)
{
error
(
"No matching receive on connections to %d (%d of %d todo:"
" rank = %d otherrank = %d subtype = %d size = %zd tag = %d)"
,
BASE_PORT
+
rank
,
todo_recv
,
nr_recvs
[
rank
],
myrank
,
rank
,
subtype
,
size
,
tag
);
}
// Ready for next use of buffer?
context
->
postReceiveBuffer
(
receiveElement
.
buffer
);
}
message
(
"took %.3f %s."
,
clocks_from_ticks
(
getticks
()
-
starttics
),
clocks_getunit
());
delete
receiveBuffer
;
delete
qp
;
delete
qpFactory
;
/* Thread exits. */
return
NULL
;
}
/**
* @brief Comparison function for tags.
*/
static
int
cmp_logs
(
const
void
*
p1
,
const
void
*
p2
)
{
struct
mpiuse_log_entry
*
l1
=
*
(
struct
mpiuse_log_entry
**
)
p1
;
struct
mpiuse_log_entry
*
l2
=
*
(
struct
mpiuse_log_entry
**
)
p2
;
if
(
l1
->
tic
>
l2
->
tic
)
return
1
;
if
(
l1
->
tic
<
l2
->
tic
)
return
-
1
;
return
0
;
}
/**
* @brief Pick out the relevant logging data for our rank.
*/
static
size_t
pick_logs
()
{
size_t
nlogs
=
mpiuse_nr_logs
();
size_t
maxsize
=
0
;
/* Queues of send and receive logs. */
send_queue
=
(
struct
mpiuse_log_entry
**
)
calloc
(
nlogs
,
sizeof
(
struct
mpiuse_log_entry
*
));
nr_send
=
0
;
struct
mpiuse_log_entry
**
recv_queue
=
(
struct
mpiuse_log_entry
**
)
calloc
(
nlogs
,
sizeof
(
struct
mpiuse_log_entry
*
));
int
nr_recv
=
0
;
for
(
size_t
k
=
0
;
k
<
nlogs
;
k
++
)
{
struct
mpiuse_log_entry
*
log
=
mpiuse_get_log
(
k
);
if
(
log
->
activation
)
{
if
(
log
->
rank
==
myrank
)
{
log
->
done
=
0
;
log
->
injtic
=
0
;
log
->
endtic
=
0
;
log
->
data
=
NULL
;
if
(
log
->
type
==
task_type_send
)
{
send_queue
[
nr_send
]
=
log
;
nr_send
++
;
}
else
if
(
log
->
type
==
task_type_recv
)
{
recv_queue
[
nr_recv
]
=
log
;
nr_recv
++
;
}
else
{
error
(
"task type '%d' is not a known send or recv task"
,
log
->
type
);
}
}
/* Across all ranks. */
if
(
log
->
size
>
maxsize
)
maxsize
=
log
->
size
;
}
}
/* Sort into increasing tic. */
qsort
(
recv_queue
,
nr_recv
,
sizeof
(
struct
mpiuse_log_entry
*
),
cmp_logs
);
qsort
(
send_queue
,
nr_send
,
sizeof
(
struct
mpiuse_log_entry
*
),
cmp_logs
);
/* Now we need to count the numbers of messages to send per rank
* and create sub-queues for these.*/
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
nr_recvs
[
k
]
=
0
;
for
(
int
k
=
0
;
k
<
nr_recv
;
k
++
)
{
struct
mpiuse_log_entry
*
log
=
recv_queue
[
k
];
nr_recvs
[
log
->
otherrank
]
++
;
}
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
{
if
(
nr_recvs
[
k
]
>
0
)
{
recvs_queue
[
k
]
=
(
struct
mpiuse_log_entry
**
)
calloc
(
nr_recvs
[
k
],
sizeof
(
struct
mpiuse_log_entry
*
));
int
i
=
0
;
for
(
int
j
=
0
;
j
<
nr_recv
;
j
++
)
{
struct
mpiuse_log_entry
*
log
=
recv_queue
[
j
];
if
(
log
->
otherrank
==
k
)
{
recvs_queue
[
k
][
i
]
=
recv_queue
[
j
];
i
++
;
}
}
}
else
{
recvs_queue
[
k
]
=
NULL
;
}
}
free
(
recv_queue
);
if
(
verbose
)
{
message
(
"maxsize = %zd, nr_send = %d, nr_recv = %d"
,
maxsize
,
nr_send
,
nr_recv
);
}
return
maxsize
;
}
/**
* @brief usage help.
*/
static
void
usage
(
char
*
argv
[])
{
fprintf
(
stderr
,
"Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat
\n
"
,
argv
[
0
]);
fprintf
(
stderr
,
" options: -v verbose
\n
"
);
fflush
(
stderr
);
}
/**
* @brief main function.
*/
int
main
(
int
argc
,
char
*
argv
[])
{
/* Initiate MPI. */
int
prov
=
0
;
int
res
=
MPI_Init_thread
(
&
argc
,
&
argv
,
MPI_THREAD_MULTIPLE
,
&
prov
);
if
(
res
!=
MPI_SUCCESS
)
error
(
"Call to MPI_Init_thread failed with error %i."
,
res
);
res
=
MPI_Comm_size
(
MPI_COMM_WORLD
,
&
nr_ranks
);
if
(
res
!=
MPI_SUCCESS
)
error
(
"MPI_Comm_size failed with error %i."
,
res
);
res
=
MPI_Comm_rank
(
MPI_COMM_WORLD
,
&
myrank
);
if
(
res
!=
MPI_SUCCESS
)
error
(
"Call to MPI_Comm_rank failed with error %i."
,
res
);
/* Handle the command-line, we expect a mpiuse data file to read and various
* options. */
int
opt
;
while
((
opt
=
getopt
(
argc
,
argv
,
"vd"
))
!=
-
1
)
{
switch
(
opt
)
{
case
'd'
:
datacheck
=
1
;
break
;
case
'v'
:
verbose
=
1
;
break
;
default:
if
(
myrank
==
0
)
usage
(
argv
);
return
1
;
}
}
if
(
optind
>=
argc
-
1
)
{
if
(
myrank
==
0
)
usage
(
argv
);
return
1
;
}
char
*
infile
=
argv
[
optind
];
char
*
logfile
=
argv
[
optind
+
1
];
/* Now we read the SWIFT MPI logger output that defines the communcations
* we will undertake and the time differences between injections into the
* queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
* log per rank per step, so you need to combine all ranks from a step. */
mpiuse_log_restore
(
infile
);
int
nranks
=
mpiuse_nr_ranks
();
/* This should match the expected size. */
if
(
nr_ranks
!=
nranks
)
error
(
"The number of MPI ranks %d does not match the expected value %d"
,
nranks
,
nr_ranks
);
/* Extract the send and recv messages for our rank. */
size_t
maxsize
=
pick_logs
();
/* Size of a message board. Needs to align on size_t. */
MESSAGE_SIZE
=
toblocks
(
maxsize
)
+
HEADER_SIZE
;
/* Now for the RDMA setup. We need the IP addresses of all the ranks. */
/* Each rank can find its name and IP. */
char
name
[
MPI_MAX_PROCESSOR_NAME
];
int
namelen
=
0
;
MPI_Get_processor_name
(
name
,
&
namelen
);
char
ip
[
MPI_MAX_PROCESSOR_NAME
];
strncpy
(
ip
,
toipaddr
(
name
),
MPI_MAX_PROCESSOR_NAME
);
/* And distribute, so we all know everyone's IPs. */
char
*
server_ips
=
(
char
*
)
malloc
(
sizeof
(
char
)
*
nr_ranks
*
MPI_MAX_PROCESSOR_NAME
);
MPI_Allgather
(
ip
,
MPI_MAX_PROCESSOR_NAME
,
MPI_BYTE
,
server_ips
,
MPI_MAX_PROCESSOR_NAME
,
MPI_BYTE
,
MPI_COMM_WORLD
);
if
(
myrank
==
0
)
{
message
(
"RDMA servers will listen on:"
);
for
(
int
j
=
0
;
j
<
nr_ranks
;
j
++
)
{
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
{
if
(
k
!=
j
)
{
message
(
" %d: %s on port %d"
,
j
,
&
server_ips
[
j
*
MPI_MAX_PROCESSOR_NAME
],
BASE_PORT
+
k
);
}
}
}
}
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
starting
[
k
]
=
1
;
/* Time to start time. Try to make it synchronous across the ranks. */
MPI_Barrier
(
MPI_COMM_WORLD
);
clocks_set_cpufreq
(
0
);
if
(
myrank
==
0
)
{
message
(
"Start of MPI tests"
);
message
(
"=================="
);
if
(
verbose
)
{
if
(
datacheck
)
message
(
"checking data pattern on send and recv completion"
);
}
}
/* Make a thread per rank, each one has a QP that connects between this rank
* and that rank. We need to start all the server threads first. */
pthread_t
recvthread
[
nr_ranks
];
int
*
ranks
=
(
int
*
)
malloc
(
nr_ranks
*
sizeof
(
int
));
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
{
if
(
k
!=
myrank
)
{
ranks
[
k
]
=
k
;
if
(
pthread_create
(
&
recvthread
[
k
],
NULL
,
&
recv_thread
,
&
ranks
[
k
])
!=
0
)
error
(
"Failed to create recv thread."
);
}
}
// Wait on all the local servers to start.
int
ready
=
0
;
while
(
ready
!=
nr_ranks
-
1
)
{
ready
=
0
;
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
{
if
(
k
!=
myrank
)
{
if
(
!
starting
[
k
])
ready
++
;
}
}
}
message
(
"All servers are started"
);
// And make sure all remotes are also ready.
MPI_Barrier
(
MPI_COMM_WORLD
);
// Vital...
message
(
"All synchronized"
);
/* Now we have a thread per rank to send the messages. */
pthread_t
sendthread
[
nr_ranks
];
struct
stuff
*
stuff
=
(
struct
stuff
*
)
malloc
(
nr_ranks
*
sizeof
(
struct
stuff
));
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
{
if
(
k
!=
myrank
)
{
strcpy
(
stuff
[
k
].
server_ip
,
&
server_ips
[
k
*
MPI_MAX_PROCESSOR_NAME
]);
stuff
[
k
].
rank
=
k
;
if
(
pthread_create
(
&
sendthread
[
k
],
NULL
,
&
send_thread
,
&
stuff
[
k
])
!=
0
)
error
(
"Failed to create send thread."
);
}
}
/* Wait until all threads have exited and all message exchanges have
* completed. */
for
(
int
k
=
0
;
k
<
nr_ranks
;
k
++
)
{
if
(
k
!=
myrank
)
{
pthread_join
(
sendthread
[
k
],
NULL
);
pthread_join
(
recvthread
[
k
],
NULL
);
}
}
MPI_Barrier
(
MPI_COMM_WORLD
);
/* Dump the updated MPI logs. */
fflush
(
stdout
);
if
(
myrank
==
0
)
message
(
"Dumping updated log"
);
mpiuse_dump_logs
(
nranks
,
logfile
);
/* Shutdown MPI. */
res
=
MPI_Finalize
();
if
(
res
!=
MPI_SUCCESS
)
error
(
"call to MPI_Finalize failed with error %i."
,
res
);
/* Free resources. */
free
(
server_ips
);
free
(
stuff
);
free
(
ranks
);
if
(
myrank
==
0
)
message
(
"Bye"
);
return
0
;
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
sign in
to comment