Skip to content
Snippets Groups Projects
Commit f027bf22 authored by Matthieu Schaller's avatar Matthieu Schaller
Browse files

Applied formatting script

parent 10127bbf
No related tags found
1 merge request!2121proxy tags exchange
......@@ -119,10 +119,10 @@ void proxy_tags_exchange(struct proxy *proxies, int num_proxies,
if (offset_out == NULL) error("Error allocating memory for tag offsets out");
for (int k = 0; k < s->nr_cells; k++) {
offset_out[k] = count_out;
if (s->cells_top[k].mpi.sendto) {
count_out += s->cells_top[k].mpi.pcell_size;
}
offset_out[k] = count_out;
if (s->cells_top[k].mpi.sendto) {
count_out += s->cells_top[k].mpi.pcell_size;
}
}
/* Calculate Incoming Sizes/Offsets */
......@@ -132,12 +132,12 @@ void proxy_tags_exchange(struct proxy *proxies, int num_proxies,
if (offset_in == NULL) error("Error allocating memory for tag offsets in");
for (int k = 0; k < num_proxies; k++) {
for (int j = 0; j < proxies[k].nr_cells_in; j++) {
// Calculate local cell index relative to the start of the cell array
const int cid = proxies[k].cells_in[j] - s->cells_top;
offset_in[cid] = count_in; // Store offset based on overall count
count_in += proxies[k].cells_in[j]->mpi.pcell_size;
}
for (int j = 0; j < proxies[k].nr_cells_in; j++) {
// Calculate local cell index relative to the start of the cell array
const int cid = proxies[k].cells_in[j] - s->cells_top;
offset_in[cid] = count_in; // Store offset based on overall count
count_in += proxies[k].cells_in[j]->mpi.pcell_size;
}
}
/* Allocate the Main Tag Buffers */
......@@ -147,7 +147,7 @@ void proxy_tags_exchange(struct proxy *proxies, int num_proxies,
sizeof(int) * count_in) != 0 ||
swift_memalign("tags_out", (void **)&tags_out, SWIFT_CACHE_ALIGNMENT,
sizeof(int) * count_out) != 0)
error("Failed to allocate main tags buffers.");
error("Failed to allocate main tags buffers.");
/* Pack the Local Tags into tags_out */
struct tag_mapper_data extra_data;
......@@ -161,207 +161,237 @@ void proxy_tags_exchange(struct proxy *proxies, int num_proxies,
threadpool_auto_chunk_size, &extra_data);
if (s->e->verbose)
message("Rank %d: Setup & Main Pack took %.3f %s.", rank,
clocks_from_ticks(getticks() - tic2), clocks_getunit());
message("Rank %d: Setup & Main Pack took %.3f %s.", rank,
clocks_from_ticks(getticks() - tic2), clocks_getunit());
tic2 = getticks();
/* Prepare for Per-Proxy Communication */
MPI_Request *proxy_reqs_in = NULL;
MPI_Request *proxy_reqs_out = NULL;
void **temp_send_buffers = NULL; // Array of pointers to temp send buffers
void **temp_recv_buffers = NULL; // Array of pointers to temp recv buffers
int *send_sizes = NULL; // Size of data to send to each proxy
int *recv_sizes = NULL; // Size of data to receive from each proxy
int num_active_sends = 0; // Count actual sends initiated
int num_active_recvs = 0; // Count actual receives initiated
void **temp_send_buffers = NULL; // Array of pointers to temp send buffers
void **temp_recv_buffers = NULL; // Array of pointers to temp recv buffers
int *send_sizes = NULL; // Size of data to send to each proxy
int *recv_sizes = NULL; // Size of data to receive from each proxy
int num_active_sends = 0; // Count actual sends initiated
int num_active_recvs = 0; // Count actual receives initiated
// Allocate helper arrays based on the number of proxies
if (num_proxies > 0) {
// Use calloc to initialize pointers to NULL and sizes to 0
proxy_reqs_in = (MPI_Request*)calloc(num_proxies, sizeof(MPI_Request));
proxy_reqs_out = (MPI_Request*)calloc(num_proxies, sizeof(MPI_Request));
temp_send_buffers = (void**)calloc(num_proxies, sizeof(void*));
temp_recv_buffers = (void**)calloc(num_proxies, sizeof(void*));
send_sizes = (int*)calloc(num_proxies, sizeof(int));
recv_sizes = (int*)calloc(num_proxies, sizeof(int));
if (!proxy_reqs_in || !proxy_reqs_out || !temp_send_buffers ||
!temp_recv_buffers || !send_sizes || !recv_sizes) {
error("Rank %d: Failed to allocate memory for proxy comm structures.", rank);
}
// Use calloc to initialize pointers to NULL and sizes to 0
proxy_reqs_in = (MPI_Request *)calloc(num_proxies, sizeof(MPI_Request));
proxy_reqs_out = (MPI_Request *)calloc(num_proxies, sizeof(MPI_Request));
temp_send_buffers = (void **)calloc(num_proxies, sizeof(void *));
temp_recv_buffers = (void **)calloc(num_proxies, sizeof(void *));
send_sizes = (int *)calloc(num_proxies, sizeof(int));
recv_sizes = (int *)calloc(num_proxies, sizeof(int));
if (!proxy_reqs_in || !proxy_reqs_out || !temp_send_buffers ||
!temp_recv_buffers || !send_sizes || !recv_sizes) {
error("Rank %d: Failed to allocate memory for proxy comm structures.",
rank);
}
}
/* Initiate Non-blocking Sends and Receives (Per Proxy) */
for (int k = 0; k < num_proxies; k++) {
int partner_rank = proxies[k].nodeID;
int partner_rank = proxies[k].nodeID;
// --- Handle Sends to Proxy k ---
if (proxies[k].nr_cells_out > 0) {
// Calculate total size
for (int j = 0; j < proxies[k].nr_cells_out; j++) {
send_sizes[k] += proxies[k].cells_out[j]->mpi.pcell_size;
}
// --- Handle Sends to Proxy k ---
if (proxies[k].nr_cells_out > 0) {
// Calculate total size
for (int j = 0; j < proxies[k].nr_cells_out; j++) {
send_sizes[k] += proxies[k].cells_out[j]->mpi.pcell_size;
}
if (send_sizes[k] > 0) { // Proceed only if there's data
temp_send_buffers[k] = malloc(send_sizes[k] * sizeof(int));
if (!temp_send_buffers[k]) error("Rank %d: Failed to allocate temp send buffer for proxy %d.", rank, partner_rank);
// Pack data from main tags_out into the temporary buffer
int current_offset = 0;
for (int j = 0; j < proxies[k].nr_cells_out; j++) {
const int cid = proxies[k].cells_out[j] - s->cells_top;
const int size = proxies[k].cells_out[j]->mpi.pcell_size;
if (size > 0) {
memcpy((char*)temp_send_buffers[k] + current_offset * sizeof(int),
&tags_out[offset_out[cid]],
size * sizeof(int));
current_offset += size;
}
}
// Issue single MPI_Isend using temp buffer
// Use the RECEIVER's rank ID as the tag
const int tag = partner_rank;
int err = MPI_Isend(temp_send_buffers[k], send_sizes[k], MPI_INT,
partner_rank, tag, MPI_COMM_WORLD,
&proxy_reqs_out[num_active_sends]); // Use counter as index
if (err != MPI_SUCCESS) mpi_error(err, "Failed to Isend aggregate tags.");
num_active_sends++; // Increment count of active sends
if (send_sizes[k] > 0) { // Proceed only if there's data
temp_send_buffers[k] = malloc(send_sizes[k] * sizeof(int));
if (!temp_send_buffers[k])
error("Rank %d: Failed to allocate temp send buffer for proxy %d.",
rank, partner_rank);
// Pack data from main tags_out into the temporary buffer
int current_offset = 0;
for (int j = 0; j < proxies[k].nr_cells_out; j++) {
const int cid = proxies[k].cells_out[j] - s->cells_top;
const int size = proxies[k].cells_out[j]->mpi.pcell_size;
if (size > 0) {
memcpy((char *)temp_send_buffers[k] + current_offset * sizeof(int),
&tags_out[offset_out[cid]], size * sizeof(int));
current_offset += size;
}
} // End if nr_cells_out > 0
}
// Issue single MPI_Isend using temp buffer
// Use the RECEIVER's rank ID as the tag
const int tag = partner_rank;
int err = MPI_Isend(
temp_send_buffers[k], send_sizes[k], MPI_INT, partner_rank, tag,
MPI_COMM_WORLD,
&proxy_reqs_out[num_active_sends]); // Use counter as index
if (err != MPI_SUCCESS)
mpi_error(err, "Failed to Isend aggregate tags.");
num_active_sends++; // Increment count of active sends
}
} // End if nr_cells_out > 0
// --- Handle Receives from Proxy k ---
if (proxies[k].nr_cells_in > 0) {
// Calculate total size
for (int j = 0; j < proxies[k].nr_cells_in; j++) {
recv_sizes[k] += proxies[k].cells_in[j]->mpi.pcell_size;
}
// --- Handle Receives from Proxy k ---
if (proxies[k].nr_cells_in > 0) {
// Calculate total size
for (int j = 0; j < proxies[k].nr_cells_in; j++) {
recv_sizes[k] += proxies[k].cells_in[j]->mpi.pcell_size;
}
if (recv_sizes[k] > 0) { // Proceed only if data is expected
temp_recv_buffers[k] = malloc(recv_sizes[k] * sizeof(int));
if (!temp_recv_buffers[k]) error("Rank %d: Failed to allocate temp recv buffer for proxy %d.", rank, partner_rank);
// Issue single MPI_Irecv into temp buffer
// Expect sender to use OUR rank ID as the tag
const int tag = rank;
int err = MPI_Irecv(temp_recv_buffers[k], recv_sizes[k], MPI_INT,
partner_rank, tag, MPI_COMM_WORLD,
&proxy_reqs_in[num_active_recvs]); // Use counter as index
if (err != MPI_SUCCESS) mpi_error(err, "Failed to Irecv aggregate tags.");
num_active_recvs++; // Increment count of active receives
}
} // End if nr_cells_in > 0
} // End loop over proxies
if (recv_sizes[k] > 0) { // Proceed only if data is expected
temp_recv_buffers[k] = malloc(recv_sizes[k] * sizeof(int));
if (!temp_recv_buffers[k])
error("Rank %d: Failed to allocate temp recv buffer for proxy %d.",
rank, partner_rank);
// Issue single MPI_Irecv into temp buffer
// Expect sender to use OUR rank ID as the tag
const int tag = rank;
int err = MPI_Irecv(
temp_recv_buffers[k], recv_sizes[k], MPI_INT, partner_rank, tag,
MPI_COMM_WORLD,
&proxy_reqs_in[num_active_recvs]); // Use counter as index
if (err != MPI_SUCCESS)
mpi_error(err, "Failed to Irecv aggregate tags.");
num_active_recvs++; // Increment count of active receives
}
} // End if nr_cells_in > 0
} // End loop over proxies
if (s->e->verbose)
message("Posted %d aggregate Sends / %d aggregate Recvs.", num_active_sends, num_active_recvs);
message("Posted %d aggregate Sends / %d aggregate Recvs.", num_active_sends,
num_active_recvs);
/* Wait for ALL Receives to Complete */
if (num_active_recvs > 0) {
if (MPI_Waitall(num_active_recvs, proxy_reqs_in, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
error("Rank %d: MPI_Waitall on aggregate receives failed.", rank);
if (MPI_Waitall(num_active_recvs, proxy_reqs_in, MPI_STATUSES_IGNORE) !=
MPI_SUCCESS)
error("Rank %d: MPI_Waitall on aggregate receives failed.", rank);
}
if (s->e->verbose)
message("Aggregate MPI_Waitall (Recvs) took %.3f %s.",
clocks_from_ticks(getticks() - tic2), clocks_getunit());
message("Aggregate MPI_Waitall (Recvs) took %.3f %s.",
clocks_from_ticks(getticks() - tic2), clocks_getunit());
/* Unpack Received Data (Serially) */
tic2 = getticks();
int current_recv_req_idx = 0; // Track index corresponding to proxy_reqs_in
int current_recv_req_idx = 0; // Track index corresponding to proxy_reqs_in
// Create the extra_data struct needed for cell_unpack_tags
// Note: We only need tags_in, offset_in, and space_cells now for unpacking
struct tag_mapper_data unpack_extra_data;
unpack_extra_data.tags_in = tags_in; // Pointer to the main receive buffer
unpack_extra_data.offset_in = offset_in; // Offsets within tags_in
unpack_extra_data.space_cells = s->cells_top;// Pointer to the beginning of cell array
unpack_extra_data.tags_in = tags_in; // Pointer to the main receive buffer
unpack_extra_data.offset_in = offset_in; // Offsets within tags_in
unpack_extra_data.space_cells =
s->cells_top; // Pointer to the beginning of cell array
for (int k = 0; k < num_proxies; k++) {
// Only unpack if we expected data and allocated a buffer for this proxy
if (proxies[k].nr_cells_in > 0 && recv_sizes[k] > 0) {
int current_offset_in_temp_buffer = 0; // Track position within temp buffer
for (int j = 0; j < proxies[k].nr_cells_in; j++) {
const struct cell *recv_cell_info = proxies[k].cells_in[j];
const int cid = recv_cell_info - s->cells_top; // Get the cell index
const int size = recv_cell_info->mpi.pcell_size; // Size of data for this cell
if (size > 0) {
// --- Copy data from temp buffer to final tags_in location ---
// Ensure offset_in[cid] is valid and within bounds of tags_in
if (offset_in[cid] + size > count_in) {
error("Rank %d: Unpack error - offset calculation mismatch for cell %d from proxy %d.", rank, cid, proxies[k].nodeID);
}
memcpy(&tags_in[offset_in[cid]], // Destination in main buffer
(char*)temp_recv_buffers[k] + current_offset_in_temp_buffer * sizeof(int), // Source in temp buffer
size * sizeof(int)); // Size to copy
// --- Call the cell-specific unpack function ---
// This uses the data *now residing* in tags_in[offset_in[cid]]
// to update the actual cell structure at space_cells[cid].
cell_unpack_tags(&tags_in[offset_in[cid]], // Pointer to data in main buffer
&unpack_extra_data.space_cells[cid]); // Pointer to target cell struct
// Update offset for the *next* cell's data within the temp buffer
current_offset_in_temp_buffer += size;
}
} // End loop over cells for this proxy
// Sanity check: Did we process the expected amount from the temp buffer?
if (current_offset_in_temp_buffer != recv_sizes[k]) {
error("Rank %d: Unpack size mismatch for proxy %d. Expected %d, processed %d.", rank, proxies[k].nodeID, recv_sizes[k], current_offset_in_temp_buffer);
}
free(temp_recv_buffers[k]); // Free the temp buffer now it's fully processed
temp_recv_buffers[k] = NULL;
current_recv_req_idx++; // Increment processed receive counter
} // End if(data expected from proxy)
} // End loop over proxies
// Only unpack if we expected data and allocated a buffer for this proxy
if (proxies[k].nr_cells_in > 0 && recv_sizes[k] > 0) {
int current_offset_in_temp_buffer =
0; // Track position within temp buffer
for (int j = 0; j < proxies[k].nr_cells_in; j++) {
const struct cell *recv_cell_info = proxies[k].cells_in[j];
const int cid = recv_cell_info - s->cells_top; // Get the cell index
const int size =
recv_cell_info->mpi.pcell_size; // Size of data for this cell
if (size > 0) {
// --- Copy data from temp buffer to final tags_in location ---
// Ensure offset_in[cid] is valid and within bounds of tags_in
if (offset_in[cid] + size > count_in) {
error(
"Rank %d: Unpack error - offset calculation mismatch for cell "
"%d from proxy %d.",
rank, cid, proxies[k].nodeID);
}
memcpy(&tags_in[offset_in[cid]], // Destination in main buffer
(char *)temp_recv_buffers[k] +
current_offset_in_temp_buffer *
sizeof(int), // Source in temp buffer
size * sizeof(int)); // Size to copy
// --- Call the cell-specific unpack function ---
// This uses the data *now residing* in tags_in[offset_in[cid]]
// to update the actual cell structure at space_cells[cid].
cell_unpack_tags(
&tags_in[offset_in[cid]], // Pointer to data in main buffer
&unpack_extra_data
.space_cells[cid]); // Pointer to target cell struct
// Update offset for the *next* cell's data within the temp buffer
current_offset_in_temp_buffer += size;
}
} // End loop over cells for this proxy
// Sanity check: Did we process the expected amount from the temp buffer?
if (current_offset_in_temp_buffer != recv_sizes[k]) {
error(
"Rank %d: Unpack size mismatch for proxy %d. Expected %d, "
"processed %d.",
rank, proxies[k].nodeID, recv_sizes[k],
current_offset_in_temp_buffer);
}
free(temp_recv_buffers[k]); // Free the temp buffer now it's fully
// processed
temp_recv_buffers[k] = NULL;
current_recv_req_idx++; // Increment processed receive counter
} // End if(data expected from proxy)
} // End loop over proxies
// Sanity check: Did we process the expected number of receives?
if (current_recv_req_idx != num_active_recvs) {
error("Rank %d: Processed %d receives during unpack, but expected %d.", rank, current_recv_req_idx, num_active_recvs);
error("Rank %d: Processed %d receives during unpack, but expected %d.",
rank, current_recv_req_idx, num_active_recvs);
}
if (s->e->verbose)
message("Unpacking aggregate data (serial) took %.3f %s.",
clocks_from_ticks(getticks() - tic2), clocks_getunit());
message("Unpacking aggregate data (serial) took %.3f %s.",
clocks_from_ticks(getticks() - tic2), clocks_getunit());
/* Wait for ALL Sends to Complete */
// (Often done after unpack, allows computation/unpack to overlap with sends finishing)
// (Often done after unpack, allows computation/unpack to overlap with sends
// finishing)
tic2 = getticks();
if (num_active_sends > 0) {
if (MPI_Waitall(num_active_sends, proxy_reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS)
error("Rank %d: MPI_Waitall on aggregate sends failed.", rank);
if (MPI_Waitall(num_active_sends, proxy_reqs_out, MPI_STATUSES_IGNORE) !=
MPI_SUCCESS)
error("Rank %d: MPI_Waitall on aggregate sends failed.", rank);
}
if (s->e->verbose)
message("Rank %d: Aggregate MPI_Waitall (Sends) took %.3f %s.", rank,
clocks_from_ticks(getticks() - tic2), clocks_getunit());
message("Rank %d: Aggregate MPI_Waitall (Sends) took %.3f %s.", rank,
clocks_from_ticks(getticks() - tic2), clocks_getunit());
/* Cleanup Temporary Communication Structures */
if (num_proxies > 0) {
for (int k = 0; k < num_proxies; k++) {
// Free any remaining send buffers (recv buffers freed during unpack)
if (temp_send_buffers[k] != NULL) {
free(temp_send_buffers[k]);
}
// Check if recv buffer was missed (shouldn't happen with current logic)
if (temp_recv_buffers[k] != NULL) {
warning("Rank %d: Temp recv buffer for proxy %d was not freed during unpack?", rank, k);
free(temp_recv_buffers[k]);
}
for (int k = 0; k < num_proxies; k++) {
// Free any remaining send buffers (recv buffers freed during unpack)
if (temp_send_buffers[k] != NULL) {
free(temp_send_buffers[k]);
}
// Free the helper arrays themselves
free(proxy_reqs_in);
free(proxy_reqs_out);
free(temp_send_buffers);
free(temp_recv_buffers);
free(send_sizes);
free(recv_sizes);
// Check if recv buffer was missed (shouldn't happen with current logic)
if (temp_recv_buffers[k] != NULL) {
warning(
"Rank %d: Temp recv buffer for proxy %d was not freed during "
"unpack?",
rank, k);
free(temp_recv_buffers[k]);
}
}
// Free the helper arrays themselves
free(proxy_reqs_in);
free(proxy_reqs_out);
free(temp_send_buffers);
free(temp_recv_buffers);
free(send_sizes);
free(recv_sizes);
}
/* Final Clean up - Main Buffers and Offsets */
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment