Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
SWIFTsim
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Deploy
Releases
Model registry
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
SWIFT
SWIFTsim
Commits
10127bbf
Commit
10127bbf
authored
4 months ago
by
Stuart McAlpine
Browse files
Options
Downloads
Patches
Plain Diff
Modify proxy_tags_exchange to reduce number of MPI calls
parent
413bdbfa
No related branches found
No related tags found
1 merge request
!2121
proxy tags exchange
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
src/proxy.c
+215
-85
215 additions, 85 deletions
src/proxy.c
with
215 additions
and
85 deletions
src/proxy.c
+
215
−
85
View file @
10127bbf
...
...
@@ -107,138 +107,268 @@ void proxy_tags_exchange(struct proxy *proxies, int num_proxies,
#ifdef WITH_MPI
/* ticks tic2 = getticks(); */
int
rank
;
MPI_Comm_rank
(
MPI_COMM_WORLD
,
&
rank
);
/* Run through the cells and get the size of the tags that will be sent off.
*/
ticks
tic2
=
getticks
();
/* Calculate Outgoing Sizes/Offsets */
int
count_out
=
0
;
int
*
offset_out
=
(
int
*
)
swift_malloc
(
"tags_offsets_out"
,
s
->
nr_cells
*
sizeof
(
int
));
if
(
offset_out
==
NULL
)
error
(
"Error allocating memory for tag offsets"
);
if
(
offset_out
==
NULL
)
error
(
"Error allocating memory for tag offsets
out
"
);
for
(
int
k
=
0
;
k
<
s
->
nr_cells
;
k
++
)
{
offset_out
[
k
]
=
count_out
;
if
(
s
->
cells_top
[
k
].
mpi
.
sendto
)
{
count_out
+=
s
->
cells_top
[
k
].
mpi
.
pcell_size
;
}
offset_out
[
k
]
=
count_out
;
if
(
s
->
cells_top
[
k
].
mpi
.
sendto
)
{
count_out
+=
s
->
cells_top
[
k
].
mpi
.
pcell_size
;
}
}
/*
Run through the proxies and get the count of incoming tags.
*/
/*
Calculate Incoming Sizes/Offsets
*/
int
count_in
=
0
;
int
*
offset_in
=
(
int
*
)
swift_malloc
(
"tags_offsets_in"
,
s
->
nr_cells
*
sizeof
(
int
));
if
(
offset_in
==
NULL
)
error
(
"Error allocating memory for tag offsets"
);
if
(
offset_in
==
NULL
)
error
(
"Error allocating memory for tag offsets
in
"
);
for
(
int
k
=
0
;
k
<
num_proxies
;
k
++
)
{
for
(
int
j
=
0
;
j
<
proxies
[
k
].
nr_cells_in
;
j
++
)
{
offset_in
[
proxies
[
k
].
cells_in
[
j
]
-
s
->
cells_top
]
=
count_in
;
count_in
+=
proxies
[
k
].
cells_in
[
j
]
->
mpi
.
pcell_size
;
}
for
(
int
j
=
0
;
j
<
proxies
[
k
].
nr_cells_in
;
j
++
)
{
// Calculate local cell index relative to the start of the cell array
const
int
cid
=
proxies
[
k
].
cells_in
[
j
]
-
s
->
cells_top
;
offset_in
[
cid
]
=
count_in
;
// Store offset based on overall count
count_in
+=
proxies
[
k
].
cells_in
[
j
]
->
mpi
.
pcell_size
;
}
}
/* Allocate the
tags.
*/
/* Allocate the
Main Tag Buffers
*/
int
*
tags_in
=
NULL
;
int
*
tags_out
=
NULL
;
if
(
swift_memalign
(
"tags_in"
,
(
void
**
)
&
tags_in
,
SWIFT_CACHE_ALIGNMENT
,
sizeof
(
int
)
*
count_in
)
!=
0
||
swift_memalign
(
"tags_out"
,
(
void
**
)
&
tags_out
,
SWIFT_CACHE_ALIGNMENT
,
sizeof
(
int
)
*
count_out
)
!=
0
)
error
(
"Failed to allocate tags buffers."
);
error
(
"Failed to allocate
main
tags buffers."
);
/* Pack the Local Tags into tags_out */
struct
tag_mapper_data
extra_data
;
extra_data
.
tags_out
=
tags_out
;
extra_data
.
offset_out
=
offset_out
;
extra_data
.
space_cells
=
s
->
cells_top
;
/
* Pack the local tags. */
/
/ Using threadpool_map for parallel packing into the main tags_out buffer
threadpool_map
(
&
s
->
e
->
threadpool
,
proxy_tags_exchange_pack_mapper
,
s
->
cells_top
,
s
->
nr_cells
,
sizeof
(
struct
cell
),
threadpool_auto_chunk_size
,
&
extra_data
);
/*
if (s->e->verbose)
*/
/*
message("
Cell pack tags
took %.3f %s.",
*/
/*
clocks_from_ticks(getticks() - tic2), clocks_getunit());
*/
if
(
s
->
e
->
verbose
)
message
(
"
Rank %d: Setup & Main Pack
took %.3f %s."
,
rank
,
clocks_from_ticks
(
getticks
()
-
tic2
),
clocks_getunit
());
/*
tic2 = getticks();
*/
tic2
=
getticks
();
/* Allocate the incoming and outgoing request handles. */
int
num_reqs_out
=
0
;
int
num_reqs_in
=
0
;
/* Prepare for Per-Proxy Communication */
MPI_Request
*
proxy_reqs_in
=
NULL
;
MPI_Request
*
proxy_reqs_out
=
NULL
;
void
**
temp_send_buffers
=
NULL
;
// Array of pointers to temp send buffers
void
**
temp_recv_buffers
=
NULL
;
// Array of pointers to temp recv buffers
int
*
send_sizes
=
NULL
;
// Size of data to send to each proxy
int
*
recv_sizes
=
NULL
;
// Size of data to receive from each proxy
int
num_active_sends
=
0
;
// Count actual sends initiated
int
num_active_recvs
=
0
;
// Count actual receives initiated
// Allocate helper arrays based on the number of proxies
if
(
num_proxies
>
0
)
{
// Use calloc to initialize pointers to NULL and sizes to 0
proxy_reqs_in
=
(
MPI_Request
*
)
calloc
(
num_proxies
,
sizeof
(
MPI_Request
));
proxy_reqs_out
=
(
MPI_Request
*
)
calloc
(
num_proxies
,
sizeof
(
MPI_Request
));
temp_send_buffers
=
(
void
**
)
calloc
(
num_proxies
,
sizeof
(
void
*
));
temp_recv_buffers
=
(
void
**
)
calloc
(
num_proxies
,
sizeof
(
void
*
));
send_sizes
=
(
int
*
)
calloc
(
num_proxies
,
sizeof
(
int
));
recv_sizes
=
(
int
*
)
calloc
(
num_proxies
,
sizeof
(
int
));
if
(
!
proxy_reqs_in
||
!
proxy_reqs_out
||
!
temp_send_buffers
||
!
temp_recv_buffers
||
!
send_sizes
||
!
recv_sizes
)
{
error
(
"Rank %d: Failed to allocate memory for proxy comm structures."
,
rank
);
}
}
/* Initiate Non-blocking Sends and Receives (Per Proxy) */
for
(
int
k
=
0
;
k
<
num_proxies
;
k
++
)
{
num_reqs_in
+=
proxies
[
k
].
nr_cells_in
;
num_reqs_out
+=
proxies
[
k
].
nr_cells_out
;
}
MPI_Request
*
reqs_in
=
NULL
;
int
*
cids_in
=
NULL
;
if
((
reqs_in
=
(
MPI_Request
*
)
malloc
(
sizeof
(
MPI_Request
)
*
(
num_reqs_in
+
num_reqs_out
)))
==
NULL
||
(
cids_in
=
(
int
*
)
malloc
(
sizeof
(
int
)
*
(
num_reqs_in
+
num_reqs_out
)))
==
NULL
)
error
(
"Failed to allocate MPI_Request arrays."
);
MPI_Request
*
reqs_out
=
&
reqs_in
[
num_reqs_in
];
int
*
cids_out
=
&
cids_in
[
num_reqs_in
];
/* Emit the sends and recvs. */
for
(
int
send_rid
=
0
,
recv_rid
=
0
,
k
=
0
;
k
<
num_proxies
;
k
++
)
{
for
(
int
j
=
0
;
j
<
proxies
[
k
].
nr_cells_in
;
j
++
)
{
const
int
cid
=
proxies
[
k
].
cells_in
[
j
]
-
s
->
cells_top
;
cids_in
[
recv_rid
]
=
cid
;
int
err
=
MPI_Irecv
(
&
tags_in
[
offset_in
[
cid
]],
proxies
[
k
].
cells_in
[
j
]
->
mpi
.
pcell_size
,
MPI_INT
,
proxies
[
k
].
nodeID
,
cid
,
MPI_COMM_WORLD
,
&
reqs_in
[
recv_rid
]);
if
(
err
!=
MPI_SUCCESS
)
mpi_error
(
err
,
"Failed to irecv tags."
);
recv_rid
+=
1
;
}
for
(
int
j
=
0
;
j
<
proxies
[
k
].
nr_cells_out
;
j
++
)
{
const
int
cid
=
proxies
[
k
].
cells_out
[
j
]
-
s
->
cells_top
;
cids_out
[
send_rid
]
=
cid
;
int
err
=
MPI_Isend
(
&
tags_out
[
offset_out
[
cid
]],
proxies
[
k
].
cells_out
[
j
]
->
mpi
.
pcell_size
,
MPI_INT
,
proxies
[
k
].
nodeID
,
cid
,
MPI_COMM_WORLD
,
&
reqs_out
[
send_rid
]);
if
(
err
!=
MPI_SUCCESS
)
mpi_error
(
err
,
"Failed to isend tags."
);
send_rid
+=
1
;
}
}
int
partner_rank
=
proxies
[
k
].
nodeID
;
// --- Handle Sends to Proxy k ---
if
(
proxies
[
k
].
nr_cells_out
>
0
)
{
// Calculate total size
for
(
int
j
=
0
;
j
<
proxies
[
k
].
nr_cells_out
;
j
++
)
{
send_sizes
[
k
]
+=
proxies
[
k
].
cells_out
[
j
]
->
mpi
.
pcell_size
;
}
if
(
send_sizes
[
k
]
>
0
)
{
// Proceed only if there's data
temp_send_buffers
[
k
]
=
malloc
(
send_sizes
[
k
]
*
sizeof
(
int
));
if
(
!
temp_send_buffers
[
k
])
error
(
"Rank %d: Failed to allocate temp send buffer for proxy %d."
,
rank
,
partner_rank
);
// Pack data from main tags_out into the temporary buffer
int
current_offset
=
0
;
for
(
int
j
=
0
;
j
<
proxies
[
k
].
nr_cells_out
;
j
++
)
{
const
int
cid
=
proxies
[
k
].
cells_out
[
j
]
-
s
->
cells_top
;
const
int
size
=
proxies
[
k
].
cells_out
[
j
]
->
mpi
.
pcell_size
;
if
(
size
>
0
)
{
memcpy
((
char
*
)
temp_send_buffers
[
k
]
+
current_offset
*
sizeof
(
int
),
&
tags_out
[
offset_out
[
cid
]],
size
*
sizeof
(
int
));
current_offset
+=
size
;
}
}
// Issue single MPI_Isend using temp buffer
// Use the RECEIVER's rank ID as the tag
const
int
tag
=
partner_rank
;
int
err
=
MPI_Isend
(
temp_send_buffers
[
k
],
send_sizes
[
k
],
MPI_INT
,
partner_rank
,
tag
,
MPI_COMM_WORLD
,
&
proxy_reqs_out
[
num_active_sends
]);
// Use counter as index
if
(
err
!=
MPI_SUCCESS
)
mpi_error
(
err
,
"Failed to Isend aggregate tags."
);
num_active_sends
++
;
// Increment count of active sends
}
}
// End if nr_cells_out > 0
// --- Handle Receives from Proxy k ---
if
(
proxies
[
k
].
nr_cells_in
>
0
)
{
// Calculate total size
for
(
int
j
=
0
;
j
<
proxies
[
k
].
nr_cells_in
;
j
++
)
{
recv_sizes
[
k
]
+=
proxies
[
k
].
cells_in
[
j
]
->
mpi
.
pcell_size
;
}
if
(
recv_sizes
[
k
]
>
0
)
{
// Proceed only if data is expected
temp_recv_buffers
[
k
]
=
malloc
(
recv_sizes
[
k
]
*
sizeof
(
int
));
if
(
!
temp_recv_buffers
[
k
])
error
(
"Rank %d: Failed to allocate temp recv buffer for proxy %d."
,
rank
,
partner_rank
);
// Issue single MPI_Irecv into temp buffer
// Expect sender to use OUR rank ID as the tag
const
int
tag
=
rank
;
int
err
=
MPI_Irecv
(
temp_recv_buffers
[
k
],
recv_sizes
[
k
],
MPI_INT
,
partner_rank
,
tag
,
MPI_COMM_WORLD
,
&
proxy_reqs_in
[
num_active_recvs
]);
// Use counter as index
if
(
err
!=
MPI_SUCCESS
)
mpi_error
(
err
,
"Failed to Irecv aggregate tags."
);
num_active_recvs
++
;
// Increment count of active receives
}
}
// End if nr_cells_in > 0
}
// End loop over proxies
/* if (s->e->verbose) */
/* message("Emitting Send/Recv for tags took %.3f %s.", */
/* clocks_from_ticks(getticks() - tic2), clocks_getunit()); */
if
(
s
->
e
->
verbose
)
message
(
"Posted %d aggregate Sends / %d aggregate Recvs."
,
num_active_sends
,
num_active_recvs
);
/* tic2 = getticks(); */
/* Wait for ALL Receives to Complete */
if
(
num_active_recvs
>
0
)
{
if
(
MPI_Waitall
(
num_active_recvs
,
proxy_reqs_in
,
MPI_STATUSES_IGNORE
)
!=
MPI_SUCCESS
)
error
(
"Rank %d: MPI_Waitall on aggregate receives failed."
,
rank
);
}
/* Wait for all the sends to have completed. */
if
(
MPI_Waitall
(
num_reqs_in
,
reqs_in
,
MPI_STATUSES_IGNORE
)
!=
MPI_SUCCESS
)
error
(
"MPI_Waitall on sends failed."
);
if
(
s
->
e
->
verbose
)
message
(
"Aggregate MPI_Waitall (Recvs) took %.3f %s."
,
clocks_from_ticks
(
getticks
()
-
tic2
),
clocks_getunit
()
);
/*
if (s->e->verbose
) */
/* message("WaitAll on tags took %.3f %s.", */
/* clocks_from_ticks(getticks() - tic2), clocks_getunit()); */
/*
Unpack Received Data (Serially
) */
tic2
=
getticks
();
int
current_recv_req_idx
=
0
;
// Track index corresponding to proxy_reqs_in
/* tic2 = getticks(); */
// Create the extra_data struct needed for cell_unpack_tags
// Note: We only need tags_in, offset_in, and space_cells now for unpacking
struct
tag_mapper_data
unpack_extra_data
;
unpack_extra_data
.
tags_in
=
tags_in
;
// Pointer to the main receive buffer
unpack_extra_data
.
offset_in
=
offset_in
;
// Offsets within tags_in
unpack_extra_data
.
space_cells
=
s
->
cells_top
;
// Pointer to the beginning of cell array
/* Unpack the tags we received */
extra_data
.
tags_in
=
tags_in
;
extra_data
.
offset_in
=
offset_in
;
extra_data
.
space_cells
=
s
->
cells_top
;
threadpool_map
(
&
s
->
e
->
threadpool
,
proxy_tags_exchange_unpack_mapper
,
cids_in
,
num_reqs_in
,
sizeof
(
int
),
threadpool_auto_chunk_size
,
&
extra_data
);
for
(
int
k
=
0
;
k
<
num_proxies
;
k
++
)
{
// Only unpack if we expected data and allocated a buffer for this proxy
if
(
proxies
[
k
].
nr_cells_in
>
0
&&
recv_sizes
[
k
]
>
0
)
{
int
current_offset_in_temp_buffer
=
0
;
// Track position within temp buffer
for
(
int
j
=
0
;
j
<
proxies
[
k
].
nr_cells_in
;
j
++
)
{
const
struct
cell
*
recv_cell_info
=
proxies
[
k
].
cells_in
[
j
];
const
int
cid
=
recv_cell_info
-
s
->
cells_top
;
// Get the cell index
const
int
size
=
recv_cell_info
->
mpi
.
pcell_size
;
// Size of data for this cell
if
(
size
>
0
)
{
// --- Copy data from temp buffer to final tags_in location ---
// Ensure offset_in[cid] is valid and within bounds of tags_in
if
(
offset_in
[
cid
]
+
size
>
count_in
)
{
error
(
"Rank %d: Unpack error - offset calculation mismatch for cell %d from proxy %d."
,
rank
,
cid
,
proxies
[
k
].
nodeID
);
}
memcpy
(
&
tags_in
[
offset_in
[
cid
]],
// Destination in main buffer
(
char
*
)
temp_recv_buffers
[
k
]
+
current_offset_in_temp_buffer
*
sizeof
(
int
),
// Source in temp buffer
size
*
sizeof
(
int
));
// Size to copy
// --- Call the cell-specific unpack function ---
// This uses the data *now residing* in tags_in[offset_in[cid]]
// to update the actual cell structure at space_cells[cid].
cell_unpack_tags
(
&
tags_in
[
offset_in
[
cid
]],
// Pointer to data in main buffer
&
unpack_extra_data
.
space_cells
[
cid
]);
// Pointer to target cell struct
// Update offset for the *next* cell's data within the temp buffer
current_offset_in_temp_buffer
+=
size
;
}
}
// End loop over cells for this proxy
// Sanity check: Did we process the expected amount from the temp buffer?
if
(
current_offset_in_temp_buffer
!=
recv_sizes
[
k
])
{
error
(
"Rank %d: Unpack size mismatch for proxy %d. Expected %d, processed %d."
,
rank
,
proxies
[
k
].
nodeID
,
recv_sizes
[
k
],
current_offset_in_temp_buffer
);
}
free
(
temp_recv_buffers
[
k
]);
// Free the temp buffer now it's fully processed
temp_recv_buffers
[
k
]
=
NULL
;
current_recv_req_idx
++
;
// Increment processed receive counter
}
// End if(data expected from proxy)
}
// End loop over proxies
// Sanity check: Did we process the expected number of receives?
if
(
current_recv_req_idx
!=
num_active_recvs
)
{
error
(
"Rank %d: Processed %d receives during unpack, but expected %d."
,
rank
,
current_recv_req_idx
,
num_active_recvs
);
}
/*
if (s->e->verbose)
*/
/*
message("
Cell unpack tags
took %.3f %s.",
*/
/*
clocks_from_ticks(getticks() - tic2), clocks_getunit());
*/
if
(
s
->
e
->
verbose
)
message
(
"
Unpacking aggregate data (serial)
took %.3f %s."
,
clocks_from_ticks
(
getticks
()
-
tic2
),
clocks_getunit
());
/* Wait for all the sends to have completed. */
if
(
MPI_Waitall
(
num_reqs_out
,
reqs_out
,
MPI_STATUSES_IGNORE
)
!=
MPI_SUCCESS
)
error
(
"MPI_Waitall on sends failed."
);
/* Wait for ALL Sends to Complete */
// (Often done after unpack, allows computation/unpack to overlap with sends finishing)
tic2
=
getticks
();
if
(
num_active_sends
>
0
)
{
if
(
MPI_Waitall
(
num_active_sends
,
proxy_reqs_out
,
MPI_STATUSES_IGNORE
)
!=
MPI_SUCCESS
)
error
(
"Rank %d: MPI_Waitall on aggregate sends failed."
,
rank
);
}
/* Clean up. */
if
(
s
->
e
->
verbose
)
message
(
"Rank %d: Aggregate MPI_Waitall (Sends) took %.3f %s."
,
rank
,
clocks_from_ticks
(
getticks
()
-
tic2
),
clocks_getunit
());
/* Cleanup Temporary Communication Structures */
if
(
num_proxies
>
0
)
{
for
(
int
k
=
0
;
k
<
num_proxies
;
k
++
)
{
// Free any remaining send buffers (recv buffers freed during unpack)
if
(
temp_send_buffers
[
k
]
!=
NULL
)
{
free
(
temp_send_buffers
[
k
]);
}
// Check if recv buffer was missed (shouldn't happen with current logic)
if
(
temp_recv_buffers
[
k
]
!=
NULL
)
{
warning
(
"Rank %d: Temp recv buffer for proxy %d was not freed during unpack?"
,
rank
,
k
);
free
(
temp_recv_buffers
[
k
]);
}
}
// Free the helper arrays themselves
free
(
proxy_reqs_in
);
free
(
proxy_reqs_out
);
free
(
temp_send_buffers
);
free
(
temp_recv_buffers
);
free
(
send_sizes
);
free
(
recv_sizes
);
}
/* Final Clean up - Main Buffers and Offsets */
swift_free
(
"tags_in"
,
tags_in
);
swift_free
(
"tags_out"
,
tags_out
);
swift_free
(
"tags_offsets_in"
,
offset_in
);
swift_free
(
"tags_offsets_out"
,
offset_out
);
free
(
reqs_in
);
free
(
cids_in
);
#else
error
(
"SWIFT was not compiled with MPI support."
);
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment