diff --git a/src/proxy.c b/src/proxy.c index 001b8684d2a02d1d0b2be58d6d99daff0ee507fd..6286ec45cd64586108391ea6a3b90a53539779e0 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -119,10 +119,10 @@ void proxy_tags_exchange(struct proxy *proxies, int num_proxies, if (offset_out == NULL) error("Error allocating memory for tag offsets out"); for (int k = 0; k < s->nr_cells; k++) { - offset_out[k] = count_out; - if (s->cells_top[k].mpi.sendto) { - count_out += s->cells_top[k].mpi.pcell_size; - } + offset_out[k] = count_out; + if (s->cells_top[k].mpi.sendto) { + count_out += s->cells_top[k].mpi.pcell_size; + } } /* Calculate Incoming Sizes/Offsets */ @@ -132,12 +132,12 @@ void proxy_tags_exchange(struct proxy *proxies, int num_proxies, if (offset_in == NULL) error("Error allocating memory for tag offsets in"); for (int k = 0; k < num_proxies; k++) { - for (int j = 0; j < proxies[k].nr_cells_in; j++) { - // Calculate local cell index relative to the start of the cell array - const int cid = proxies[k].cells_in[j] - s->cells_top; - offset_in[cid] = count_in; // Store offset based on overall count - count_in += proxies[k].cells_in[j]->mpi.pcell_size; - } + for (int j = 0; j < proxies[k].nr_cells_in; j++) { + // Calculate local cell index relative to the start of the cell array + const int cid = proxies[k].cells_in[j] - s->cells_top; + offset_in[cid] = count_in; // Store offset based on overall count + count_in += proxies[k].cells_in[j]->mpi.pcell_size; + } } /* Allocate the Main Tag Buffers */ @@ -147,7 +147,7 @@ void proxy_tags_exchange(struct proxy *proxies, int num_proxies, sizeof(int) * count_in) != 0 || swift_memalign("tags_out", (void **)&tags_out, SWIFT_CACHE_ALIGNMENT, sizeof(int) * count_out) != 0) - error("Failed to allocate main tags buffers."); + error("Failed to allocate main tags buffers."); /* Pack the Local Tags into tags_out */ struct tag_mapper_data extra_data; @@ -161,207 +161,237 @@ void proxy_tags_exchange(struct proxy *proxies, int num_proxies, threadpool_auto_chunk_size, &extra_data); if (s->e->verbose) - message("Rank %d: Setup & Main Pack took %.3f %s.", rank, - clocks_from_ticks(getticks() - tic2), clocks_getunit()); + message("Rank %d: Setup & Main Pack took %.3f %s.", rank, + clocks_from_ticks(getticks() - tic2), clocks_getunit()); tic2 = getticks(); /* Prepare for Per-Proxy Communication */ MPI_Request *proxy_reqs_in = NULL; MPI_Request *proxy_reqs_out = NULL; - void **temp_send_buffers = NULL; // Array of pointers to temp send buffers - void **temp_recv_buffers = NULL; // Array of pointers to temp recv buffers - int *send_sizes = NULL; // Size of data to send to each proxy - int *recv_sizes = NULL; // Size of data to receive from each proxy - int num_active_sends = 0; // Count actual sends initiated - int num_active_recvs = 0; // Count actual receives initiated + void **temp_send_buffers = NULL; // Array of pointers to temp send buffers + void **temp_recv_buffers = NULL; // Array of pointers to temp recv buffers + int *send_sizes = NULL; // Size of data to send to each proxy + int *recv_sizes = NULL; // Size of data to receive from each proxy + int num_active_sends = 0; // Count actual sends initiated + int num_active_recvs = 0; // Count actual receives initiated // Allocate helper arrays based on the number of proxies if (num_proxies > 0) { - // Use calloc to initialize pointers to NULL and sizes to 0 - proxy_reqs_in = (MPI_Request*)calloc(num_proxies, sizeof(MPI_Request)); - proxy_reqs_out = (MPI_Request*)calloc(num_proxies, sizeof(MPI_Request)); - temp_send_buffers = (void**)calloc(num_proxies, sizeof(void*)); - temp_recv_buffers = (void**)calloc(num_proxies, sizeof(void*)); - send_sizes = (int*)calloc(num_proxies, sizeof(int)); - recv_sizes = (int*)calloc(num_proxies, sizeof(int)); - - if (!proxy_reqs_in || !proxy_reqs_out || !temp_send_buffers || - !temp_recv_buffers || !send_sizes || !recv_sizes) { - error("Rank %d: Failed to allocate memory for proxy comm structures.", rank); - } + // Use calloc to initialize pointers to NULL and sizes to 0 + proxy_reqs_in = (MPI_Request *)calloc(num_proxies, sizeof(MPI_Request)); + proxy_reqs_out = (MPI_Request *)calloc(num_proxies, sizeof(MPI_Request)); + temp_send_buffers = (void **)calloc(num_proxies, sizeof(void *)); + temp_recv_buffers = (void **)calloc(num_proxies, sizeof(void *)); + send_sizes = (int *)calloc(num_proxies, sizeof(int)); + recv_sizes = (int *)calloc(num_proxies, sizeof(int)); + + if (!proxy_reqs_in || !proxy_reqs_out || !temp_send_buffers || + !temp_recv_buffers || !send_sizes || !recv_sizes) { + error("Rank %d: Failed to allocate memory for proxy comm structures.", + rank); + } } /* Initiate Non-blocking Sends and Receives (Per Proxy) */ for (int k = 0; k < num_proxies; k++) { - int partner_rank = proxies[k].nodeID; + int partner_rank = proxies[k].nodeID; - // --- Handle Sends to Proxy k --- - if (proxies[k].nr_cells_out > 0) { - // Calculate total size - for (int j = 0; j < proxies[k].nr_cells_out; j++) { - send_sizes[k] += proxies[k].cells_out[j]->mpi.pcell_size; - } + // --- Handle Sends to Proxy k --- + if (proxies[k].nr_cells_out > 0) { + // Calculate total size + for (int j = 0; j < proxies[k].nr_cells_out; j++) { + send_sizes[k] += proxies[k].cells_out[j]->mpi.pcell_size; + } - if (send_sizes[k] > 0) { // Proceed only if there's data - temp_send_buffers[k] = malloc(send_sizes[k] * sizeof(int)); - if (!temp_send_buffers[k]) error("Rank %d: Failed to allocate temp send buffer for proxy %d.", rank, partner_rank); - - // Pack data from main tags_out into the temporary buffer - int current_offset = 0; - for (int j = 0; j < proxies[k].nr_cells_out; j++) { - const int cid = proxies[k].cells_out[j] - s->cells_top; - const int size = proxies[k].cells_out[j]->mpi.pcell_size; - if (size > 0) { - memcpy((char*)temp_send_buffers[k] + current_offset * sizeof(int), - &tags_out[offset_out[cid]], - size * sizeof(int)); - current_offset += size; - } - } - - // Issue single MPI_Isend using temp buffer - // Use the RECEIVER's rank ID as the tag - const int tag = partner_rank; - int err = MPI_Isend(temp_send_buffers[k], send_sizes[k], MPI_INT, - partner_rank, tag, MPI_COMM_WORLD, - &proxy_reqs_out[num_active_sends]); // Use counter as index - if (err != MPI_SUCCESS) mpi_error(err, "Failed to Isend aggregate tags."); - num_active_sends++; // Increment count of active sends + if (send_sizes[k] > 0) { // Proceed only if there's data + temp_send_buffers[k] = malloc(send_sizes[k] * sizeof(int)); + if (!temp_send_buffers[k]) + error("Rank %d: Failed to allocate temp send buffer for proxy %d.", + rank, partner_rank); + + // Pack data from main tags_out into the temporary buffer + int current_offset = 0; + for (int j = 0; j < proxies[k].nr_cells_out; j++) { + const int cid = proxies[k].cells_out[j] - s->cells_top; + const int size = proxies[k].cells_out[j]->mpi.pcell_size; + if (size > 0) { + memcpy((char *)temp_send_buffers[k] + current_offset * sizeof(int), + &tags_out[offset_out[cid]], size * sizeof(int)); + current_offset += size; } - } // End if nr_cells_out > 0 + } + + // Issue single MPI_Isend using temp buffer + // Use the RECEIVER's rank ID as the tag + const int tag = partner_rank; + int err = MPI_Isend( + temp_send_buffers[k], send_sizes[k], MPI_INT, partner_rank, tag, + MPI_COMM_WORLD, + &proxy_reqs_out[num_active_sends]); // Use counter as index + if (err != MPI_SUCCESS) + mpi_error(err, "Failed to Isend aggregate tags."); + num_active_sends++; // Increment count of active sends + } + } // End if nr_cells_out > 0 - // --- Handle Receives from Proxy k --- - if (proxies[k].nr_cells_in > 0) { - // Calculate total size - for (int j = 0; j < proxies[k].nr_cells_in; j++) { - recv_sizes[k] += proxies[k].cells_in[j]->mpi.pcell_size; - } + // --- Handle Receives from Proxy k --- + if (proxies[k].nr_cells_in > 0) { + // Calculate total size + for (int j = 0; j < proxies[k].nr_cells_in; j++) { + recv_sizes[k] += proxies[k].cells_in[j]->mpi.pcell_size; + } - if (recv_sizes[k] > 0) { // Proceed only if data is expected - temp_recv_buffers[k] = malloc(recv_sizes[k] * sizeof(int)); - if (!temp_recv_buffers[k]) error("Rank %d: Failed to allocate temp recv buffer for proxy %d.", rank, partner_rank); - - // Issue single MPI_Irecv into temp buffer - // Expect sender to use OUR rank ID as the tag - const int tag = rank; - int err = MPI_Irecv(temp_recv_buffers[k], recv_sizes[k], MPI_INT, - partner_rank, tag, MPI_COMM_WORLD, - &proxy_reqs_in[num_active_recvs]); // Use counter as index - if (err != MPI_SUCCESS) mpi_error(err, "Failed to Irecv aggregate tags."); - num_active_recvs++; // Increment count of active receives - } - } // End if nr_cells_in > 0 - } // End loop over proxies + if (recv_sizes[k] > 0) { // Proceed only if data is expected + temp_recv_buffers[k] = malloc(recv_sizes[k] * sizeof(int)); + if (!temp_recv_buffers[k]) + error("Rank %d: Failed to allocate temp recv buffer for proxy %d.", + rank, partner_rank); + + // Issue single MPI_Irecv into temp buffer + // Expect sender to use OUR rank ID as the tag + const int tag = rank; + int err = MPI_Irecv( + temp_recv_buffers[k], recv_sizes[k], MPI_INT, partner_rank, tag, + MPI_COMM_WORLD, + &proxy_reqs_in[num_active_recvs]); // Use counter as index + if (err != MPI_SUCCESS) + mpi_error(err, "Failed to Irecv aggregate tags."); + num_active_recvs++; // Increment count of active receives + } + } // End if nr_cells_in > 0 + } // End loop over proxies if (s->e->verbose) - message("Posted %d aggregate Sends / %d aggregate Recvs.", num_active_sends, num_active_recvs); + message("Posted %d aggregate Sends / %d aggregate Recvs.", num_active_sends, + num_active_recvs); /* Wait for ALL Receives to Complete */ if (num_active_recvs > 0) { - if (MPI_Waitall(num_active_recvs, proxy_reqs_in, MPI_STATUSES_IGNORE) != MPI_SUCCESS) - error("Rank %d: MPI_Waitall on aggregate receives failed.", rank); + if (MPI_Waitall(num_active_recvs, proxy_reqs_in, MPI_STATUSES_IGNORE) != + MPI_SUCCESS) + error("Rank %d: MPI_Waitall on aggregate receives failed.", rank); } if (s->e->verbose) - message("Aggregate MPI_Waitall (Recvs) took %.3f %s.", - clocks_from_ticks(getticks() - tic2), clocks_getunit()); + message("Aggregate MPI_Waitall (Recvs) took %.3f %s.", + clocks_from_ticks(getticks() - tic2), clocks_getunit()); /* Unpack Received Data (Serially) */ tic2 = getticks(); - int current_recv_req_idx = 0; // Track index corresponding to proxy_reqs_in + int current_recv_req_idx = 0; // Track index corresponding to proxy_reqs_in // Create the extra_data struct needed for cell_unpack_tags // Note: We only need tags_in, offset_in, and space_cells now for unpacking struct tag_mapper_data unpack_extra_data; - unpack_extra_data.tags_in = tags_in; // Pointer to the main receive buffer - unpack_extra_data.offset_in = offset_in; // Offsets within tags_in - unpack_extra_data.space_cells = s->cells_top;// Pointer to the beginning of cell array + unpack_extra_data.tags_in = tags_in; // Pointer to the main receive buffer + unpack_extra_data.offset_in = offset_in; // Offsets within tags_in + unpack_extra_data.space_cells = + s->cells_top; // Pointer to the beginning of cell array for (int k = 0; k < num_proxies; k++) { - // Only unpack if we expected data and allocated a buffer for this proxy - if (proxies[k].nr_cells_in > 0 && recv_sizes[k] > 0) { - int current_offset_in_temp_buffer = 0; // Track position within temp buffer - - for (int j = 0; j < proxies[k].nr_cells_in; j++) { - const struct cell *recv_cell_info = proxies[k].cells_in[j]; - const int cid = recv_cell_info - s->cells_top; // Get the cell index - const int size = recv_cell_info->mpi.pcell_size; // Size of data for this cell - - if (size > 0) { - // --- Copy data from temp buffer to final tags_in location --- - // Ensure offset_in[cid] is valid and within bounds of tags_in - if (offset_in[cid] + size > count_in) { - error("Rank %d: Unpack error - offset calculation mismatch for cell %d from proxy %d.", rank, cid, proxies[k].nodeID); - } - memcpy(&tags_in[offset_in[cid]], // Destination in main buffer - (char*)temp_recv_buffers[k] + current_offset_in_temp_buffer * sizeof(int), // Source in temp buffer - size * sizeof(int)); // Size to copy - - // --- Call the cell-specific unpack function --- - // This uses the data *now residing* in tags_in[offset_in[cid]] - // to update the actual cell structure at space_cells[cid]. - cell_unpack_tags(&tags_in[offset_in[cid]], // Pointer to data in main buffer - &unpack_extra_data.space_cells[cid]); // Pointer to target cell struct - - // Update offset for the *next* cell's data within the temp buffer - current_offset_in_temp_buffer += size; - } - } // End loop over cells for this proxy - - // Sanity check: Did we process the expected amount from the temp buffer? - if (current_offset_in_temp_buffer != recv_sizes[k]) { - error("Rank %d: Unpack size mismatch for proxy %d. Expected %d, processed %d.", rank, proxies[k].nodeID, recv_sizes[k], current_offset_in_temp_buffer); - } - - free(temp_recv_buffers[k]); // Free the temp buffer now it's fully processed - temp_recv_buffers[k] = NULL; - current_recv_req_idx++; // Increment processed receive counter - } // End if(data expected from proxy) - } // End loop over proxies + // Only unpack if we expected data and allocated a buffer for this proxy + if (proxies[k].nr_cells_in > 0 && recv_sizes[k] > 0) { + int current_offset_in_temp_buffer = + 0; // Track position within temp buffer + + for (int j = 0; j < proxies[k].nr_cells_in; j++) { + const struct cell *recv_cell_info = proxies[k].cells_in[j]; + const int cid = recv_cell_info - s->cells_top; // Get the cell index + const int size = + recv_cell_info->mpi.pcell_size; // Size of data for this cell + + if (size > 0) { + // --- Copy data from temp buffer to final tags_in location --- + // Ensure offset_in[cid] is valid and within bounds of tags_in + if (offset_in[cid] + size > count_in) { + error( + "Rank %d: Unpack error - offset calculation mismatch for cell " + "%d from proxy %d.", + rank, cid, proxies[k].nodeID); + } + memcpy(&tags_in[offset_in[cid]], // Destination in main buffer + (char *)temp_recv_buffers[k] + + current_offset_in_temp_buffer * + sizeof(int), // Source in temp buffer + size * sizeof(int)); // Size to copy + + // --- Call the cell-specific unpack function --- + // This uses the data *now residing* in tags_in[offset_in[cid]] + // to update the actual cell structure at space_cells[cid]. + cell_unpack_tags( + &tags_in[offset_in[cid]], // Pointer to data in main buffer + &unpack_extra_data + .space_cells[cid]); // Pointer to target cell struct + + // Update offset for the *next* cell's data within the temp buffer + current_offset_in_temp_buffer += size; + } + } // End loop over cells for this proxy + + // Sanity check: Did we process the expected amount from the temp buffer? + if (current_offset_in_temp_buffer != recv_sizes[k]) { + error( + "Rank %d: Unpack size mismatch for proxy %d. Expected %d, " + "processed %d.", + rank, proxies[k].nodeID, recv_sizes[k], + current_offset_in_temp_buffer); + } + + free(temp_recv_buffers[k]); // Free the temp buffer now it's fully + // processed + temp_recv_buffers[k] = NULL; + current_recv_req_idx++; // Increment processed receive counter + } // End if(data expected from proxy) + } // End loop over proxies // Sanity check: Did we process the expected number of receives? if (current_recv_req_idx != num_active_recvs) { - error("Rank %d: Processed %d receives during unpack, but expected %d.", rank, current_recv_req_idx, num_active_recvs); + error("Rank %d: Processed %d receives during unpack, but expected %d.", + rank, current_recv_req_idx, num_active_recvs); } if (s->e->verbose) - message("Unpacking aggregate data (serial) took %.3f %s.", - clocks_from_ticks(getticks() - tic2), clocks_getunit()); + message("Unpacking aggregate data (serial) took %.3f %s.", + clocks_from_ticks(getticks() - tic2), clocks_getunit()); /* Wait for ALL Sends to Complete */ - // (Often done after unpack, allows computation/unpack to overlap with sends finishing) + // (Often done after unpack, allows computation/unpack to overlap with sends + // finishing) tic2 = getticks(); if (num_active_sends > 0) { - if (MPI_Waitall(num_active_sends, proxy_reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS) - error("Rank %d: MPI_Waitall on aggregate sends failed.", rank); + if (MPI_Waitall(num_active_sends, proxy_reqs_out, MPI_STATUSES_IGNORE) != + MPI_SUCCESS) + error("Rank %d: MPI_Waitall on aggregate sends failed.", rank); } if (s->e->verbose) - message("Rank %d: Aggregate MPI_Waitall (Sends) took %.3f %s.", rank, - clocks_from_ticks(getticks() - tic2), clocks_getunit()); - + message("Rank %d: Aggregate MPI_Waitall (Sends) took %.3f %s.", rank, + clocks_from_ticks(getticks() - tic2), clocks_getunit()); /* Cleanup Temporary Communication Structures */ if (num_proxies > 0) { - for (int k = 0; k < num_proxies; k++) { - // Free any remaining send buffers (recv buffers freed during unpack) - if (temp_send_buffers[k] != NULL) { - free(temp_send_buffers[k]); - } - // Check if recv buffer was missed (shouldn't happen with current logic) - if (temp_recv_buffers[k] != NULL) { - warning("Rank %d: Temp recv buffer for proxy %d was not freed during unpack?", rank, k); - free(temp_recv_buffers[k]); - } + for (int k = 0; k < num_proxies; k++) { + // Free any remaining send buffers (recv buffers freed during unpack) + if (temp_send_buffers[k] != NULL) { + free(temp_send_buffers[k]); } - // Free the helper arrays themselves - free(proxy_reqs_in); - free(proxy_reqs_out); - free(temp_send_buffers); - free(temp_recv_buffers); - free(send_sizes); - free(recv_sizes); + // Check if recv buffer was missed (shouldn't happen with current logic) + if (temp_recv_buffers[k] != NULL) { + warning( + "Rank %d: Temp recv buffer for proxy %d was not freed during " + "unpack?", + rank, k); + free(temp_recv_buffers[k]); + } + } + // Free the helper arrays themselves + free(proxy_reqs_in); + free(proxy_reqs_out); + free(temp_send_buffers); + free(temp_recv_buffers); + free(send_sizes); + free(recv_sizes); } /* Final Clean up - Main Buffers and Offsets */