diff --git a/src/proxy.c b/src/proxy.c index 77703e248d57ebabc132054598dc41bd2b75a710..001b8684d2a02d1d0b2be58d6d99daff0ee507fd 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -107,138 +107,268 @@ void proxy_tags_exchange(struct proxy *proxies, int num_proxies, #ifdef WITH_MPI - /* ticks tic2 = getticks(); */ + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); - /* Run through the cells and get the size of the tags that will be sent off. - */ + ticks tic2 = getticks(); + + /* Calculate Outgoing Sizes/Offsets */ int count_out = 0; int *offset_out = (int *)swift_malloc("tags_offsets_out", s->nr_cells * sizeof(int)); - if (offset_out == NULL) error("Error allocating memory for tag offsets"); + if (offset_out == NULL) error("Error allocating memory for tag offsets out"); for (int k = 0; k < s->nr_cells; k++) { - offset_out[k] = count_out; - if (s->cells_top[k].mpi.sendto) { - count_out += s->cells_top[k].mpi.pcell_size; - } + offset_out[k] = count_out; + if (s->cells_top[k].mpi.sendto) { + count_out += s->cells_top[k].mpi.pcell_size; + } } - /* Run through the proxies and get the count of incoming tags. */ + /* Calculate Incoming Sizes/Offsets */ int count_in = 0; int *offset_in = (int *)swift_malloc("tags_offsets_in", s->nr_cells * sizeof(int)); - if (offset_in == NULL) error("Error allocating memory for tag offsets"); + if (offset_in == NULL) error("Error allocating memory for tag offsets in"); for (int k = 0; k < num_proxies; k++) { - for (int j = 0; j < proxies[k].nr_cells_in; j++) { - offset_in[proxies[k].cells_in[j] - s->cells_top] = count_in; - count_in += proxies[k].cells_in[j]->mpi.pcell_size; - } + for (int j = 0; j < proxies[k].nr_cells_in; j++) { + // Calculate local cell index relative to the start of the cell array + const int cid = proxies[k].cells_in[j] - s->cells_top; + offset_in[cid] = count_in; // Store offset based on overall count + count_in += proxies[k].cells_in[j]->mpi.pcell_size; + } } - /* Allocate the tags. */ + /* Allocate the Main Tag Buffers */ int *tags_in = NULL; int *tags_out = NULL; if (swift_memalign("tags_in", (void **)&tags_in, SWIFT_CACHE_ALIGNMENT, sizeof(int) * count_in) != 0 || swift_memalign("tags_out", (void **)&tags_out, SWIFT_CACHE_ALIGNMENT, sizeof(int) * count_out) != 0) - error("Failed to allocate tags buffers."); + error("Failed to allocate main tags buffers."); + /* Pack the Local Tags into tags_out */ struct tag_mapper_data extra_data; extra_data.tags_out = tags_out; extra_data.offset_out = offset_out; extra_data.space_cells = s->cells_top; - /* Pack the local tags. */ + // Using threadpool_map for parallel packing into the main tags_out buffer threadpool_map(&s->e->threadpool, proxy_tags_exchange_pack_mapper, s->cells_top, s->nr_cells, sizeof(struct cell), threadpool_auto_chunk_size, &extra_data); - /* if (s->e->verbose) */ - /* message("Cell pack tags took %.3f %s.", */ - /* clocks_from_ticks(getticks() - tic2), clocks_getunit()); */ + if (s->e->verbose) + message("Rank %d: Setup & Main Pack took %.3f %s.", rank, + clocks_from_ticks(getticks() - tic2), clocks_getunit()); - /* tic2 = getticks(); */ + tic2 = getticks(); - /* Allocate the incoming and outgoing request handles. */ - int num_reqs_out = 0; - int num_reqs_in = 0; + /* Prepare for Per-Proxy Communication */ + MPI_Request *proxy_reqs_in = NULL; + MPI_Request *proxy_reqs_out = NULL; + void **temp_send_buffers = NULL; // Array of pointers to temp send buffers + void **temp_recv_buffers = NULL; // Array of pointers to temp recv buffers + int *send_sizes = NULL; // Size of data to send to each proxy + int *recv_sizes = NULL; // Size of data to receive from each proxy + int num_active_sends = 0; // Count actual sends initiated + int num_active_recvs = 0; // Count actual receives initiated + + // Allocate helper arrays based on the number of proxies + if (num_proxies > 0) { + // Use calloc to initialize pointers to NULL and sizes to 0 + proxy_reqs_in = (MPI_Request*)calloc(num_proxies, sizeof(MPI_Request)); + proxy_reqs_out = (MPI_Request*)calloc(num_proxies, sizeof(MPI_Request)); + temp_send_buffers = (void**)calloc(num_proxies, sizeof(void*)); + temp_recv_buffers = (void**)calloc(num_proxies, sizeof(void*)); + send_sizes = (int*)calloc(num_proxies, sizeof(int)); + recv_sizes = (int*)calloc(num_proxies, sizeof(int)); + + if (!proxy_reqs_in || !proxy_reqs_out || !temp_send_buffers || + !temp_recv_buffers || !send_sizes || !recv_sizes) { + error("Rank %d: Failed to allocate memory for proxy comm structures.", rank); + } + } + + /* Initiate Non-blocking Sends and Receives (Per Proxy) */ for (int k = 0; k < num_proxies; k++) { - num_reqs_in += proxies[k].nr_cells_in; - num_reqs_out += proxies[k].nr_cells_out; - } - MPI_Request *reqs_in = NULL; - int *cids_in = NULL; - if ((reqs_in = (MPI_Request *)malloc(sizeof(MPI_Request) * - (num_reqs_in + num_reqs_out))) == NULL || - (cids_in = (int *)malloc(sizeof(int) * (num_reqs_in + num_reqs_out))) == - NULL) - error("Failed to allocate MPI_Request arrays."); - MPI_Request *reqs_out = &reqs_in[num_reqs_in]; - int *cids_out = &cids_in[num_reqs_in]; - - /* Emit the sends and recvs. */ - for (int send_rid = 0, recv_rid = 0, k = 0; k < num_proxies; k++) { - for (int j = 0; j < proxies[k].nr_cells_in; j++) { - const int cid = proxies[k].cells_in[j] - s->cells_top; - cids_in[recv_rid] = cid; - int err = MPI_Irecv( - &tags_in[offset_in[cid]], proxies[k].cells_in[j]->mpi.pcell_size, - MPI_INT, proxies[k].nodeID, cid, MPI_COMM_WORLD, &reqs_in[recv_rid]); - if (err != MPI_SUCCESS) mpi_error(err, "Failed to irecv tags."); - recv_rid += 1; - } - for (int j = 0; j < proxies[k].nr_cells_out; j++) { - const int cid = proxies[k].cells_out[j] - s->cells_top; - cids_out[send_rid] = cid; - int err = MPI_Isend( - &tags_out[offset_out[cid]], proxies[k].cells_out[j]->mpi.pcell_size, - MPI_INT, proxies[k].nodeID, cid, MPI_COMM_WORLD, &reqs_out[send_rid]); - if (err != MPI_SUCCESS) mpi_error(err, "Failed to isend tags."); - send_rid += 1; - } - } + int partner_rank = proxies[k].nodeID; + + // --- Handle Sends to Proxy k --- + if (proxies[k].nr_cells_out > 0) { + // Calculate total size + for (int j = 0; j < proxies[k].nr_cells_out; j++) { + send_sizes[k] += proxies[k].cells_out[j]->mpi.pcell_size; + } + + if (send_sizes[k] > 0) { // Proceed only if there's data + temp_send_buffers[k] = malloc(send_sizes[k] * sizeof(int)); + if (!temp_send_buffers[k]) error("Rank %d: Failed to allocate temp send buffer for proxy %d.", rank, partner_rank); + + // Pack data from main tags_out into the temporary buffer + int current_offset = 0; + for (int j = 0; j < proxies[k].nr_cells_out; j++) { + const int cid = proxies[k].cells_out[j] - s->cells_top; + const int size = proxies[k].cells_out[j]->mpi.pcell_size; + if (size > 0) { + memcpy((char*)temp_send_buffers[k] + current_offset * sizeof(int), + &tags_out[offset_out[cid]], + size * sizeof(int)); + current_offset += size; + } + } + + // Issue single MPI_Isend using temp buffer + // Use the RECEIVER's rank ID as the tag + const int tag = partner_rank; + int err = MPI_Isend(temp_send_buffers[k], send_sizes[k], MPI_INT, + partner_rank, tag, MPI_COMM_WORLD, + &proxy_reqs_out[num_active_sends]); // Use counter as index + if (err != MPI_SUCCESS) mpi_error(err, "Failed to Isend aggregate tags."); + num_active_sends++; // Increment count of active sends + } + } // End if nr_cells_out > 0 + + // --- Handle Receives from Proxy k --- + if (proxies[k].nr_cells_in > 0) { + // Calculate total size + for (int j = 0; j < proxies[k].nr_cells_in; j++) { + recv_sizes[k] += proxies[k].cells_in[j]->mpi.pcell_size; + } + + if (recv_sizes[k] > 0) { // Proceed only if data is expected + temp_recv_buffers[k] = malloc(recv_sizes[k] * sizeof(int)); + if (!temp_recv_buffers[k]) error("Rank %d: Failed to allocate temp recv buffer for proxy %d.", rank, partner_rank); + + // Issue single MPI_Irecv into temp buffer + // Expect sender to use OUR rank ID as the tag + const int tag = rank; + int err = MPI_Irecv(temp_recv_buffers[k], recv_sizes[k], MPI_INT, + partner_rank, tag, MPI_COMM_WORLD, + &proxy_reqs_in[num_active_recvs]); // Use counter as index + if (err != MPI_SUCCESS) mpi_error(err, "Failed to Irecv aggregate tags."); + num_active_recvs++; // Increment count of active receives + } + } // End if nr_cells_in > 0 + } // End loop over proxies - /* if (s->e->verbose) */ - /* message("Emitting Send/Recv for tags took %.3f %s.", */ - /* clocks_from_ticks(getticks() - tic2), clocks_getunit()); */ + if (s->e->verbose) + message("Posted %d aggregate Sends / %d aggregate Recvs.", num_active_sends, num_active_recvs); - /* tic2 = getticks(); */ + /* Wait for ALL Receives to Complete */ + if (num_active_recvs > 0) { + if (MPI_Waitall(num_active_recvs, proxy_reqs_in, MPI_STATUSES_IGNORE) != MPI_SUCCESS) + error("Rank %d: MPI_Waitall on aggregate receives failed.", rank); + } - /* Wait for all the sends to have completed. */ - if (MPI_Waitall(num_reqs_in, reqs_in, MPI_STATUSES_IGNORE) != MPI_SUCCESS) - error("MPI_Waitall on sends failed."); + if (s->e->verbose) + message("Aggregate MPI_Waitall (Recvs) took %.3f %s.", + clocks_from_ticks(getticks() - tic2), clocks_getunit()); - /* if (s->e->verbose) */ - /* message("WaitAll on tags took %.3f %s.", */ - /* clocks_from_ticks(getticks() - tic2), clocks_getunit()); */ + /* Unpack Received Data (Serially) */ + tic2 = getticks(); + int current_recv_req_idx = 0; // Track index corresponding to proxy_reqs_in - /* tic2 = getticks(); */ + // Create the extra_data struct needed for cell_unpack_tags + // Note: We only need tags_in, offset_in, and space_cells now for unpacking + struct tag_mapper_data unpack_extra_data; + unpack_extra_data.tags_in = tags_in; // Pointer to the main receive buffer + unpack_extra_data.offset_in = offset_in; // Offsets within tags_in + unpack_extra_data.space_cells = s->cells_top;// Pointer to the beginning of cell array - /* Unpack the tags we received */ - extra_data.tags_in = tags_in; - extra_data.offset_in = offset_in; - extra_data.space_cells = s->cells_top; - threadpool_map(&s->e->threadpool, proxy_tags_exchange_unpack_mapper, cids_in, - num_reqs_in, sizeof(int), threadpool_auto_chunk_size, - &extra_data); + for (int k = 0; k < num_proxies; k++) { + // Only unpack if we expected data and allocated a buffer for this proxy + if (proxies[k].nr_cells_in > 0 && recv_sizes[k] > 0) { + int current_offset_in_temp_buffer = 0; // Track position within temp buffer + + for (int j = 0; j < proxies[k].nr_cells_in; j++) { + const struct cell *recv_cell_info = proxies[k].cells_in[j]; + const int cid = recv_cell_info - s->cells_top; // Get the cell index + const int size = recv_cell_info->mpi.pcell_size; // Size of data for this cell + + if (size > 0) { + // --- Copy data from temp buffer to final tags_in location --- + // Ensure offset_in[cid] is valid and within bounds of tags_in + if (offset_in[cid] + size > count_in) { + error("Rank %d: Unpack error - offset calculation mismatch for cell %d from proxy %d.", rank, cid, proxies[k].nodeID); + } + memcpy(&tags_in[offset_in[cid]], // Destination in main buffer + (char*)temp_recv_buffers[k] + current_offset_in_temp_buffer * sizeof(int), // Source in temp buffer + size * sizeof(int)); // Size to copy + + // --- Call the cell-specific unpack function --- + // This uses the data *now residing* in tags_in[offset_in[cid]] + // to update the actual cell structure at space_cells[cid]. + cell_unpack_tags(&tags_in[offset_in[cid]], // Pointer to data in main buffer + &unpack_extra_data.space_cells[cid]); // Pointer to target cell struct + + // Update offset for the *next* cell's data within the temp buffer + current_offset_in_temp_buffer += size; + } + } // End loop over cells for this proxy + + // Sanity check: Did we process the expected amount from the temp buffer? + if (current_offset_in_temp_buffer != recv_sizes[k]) { + error("Rank %d: Unpack size mismatch for proxy %d. Expected %d, processed %d.", rank, proxies[k].nodeID, recv_sizes[k], current_offset_in_temp_buffer); + } + + free(temp_recv_buffers[k]); // Free the temp buffer now it's fully processed + temp_recv_buffers[k] = NULL; + current_recv_req_idx++; // Increment processed receive counter + } // End if(data expected from proxy) + } // End loop over proxies + + // Sanity check: Did we process the expected number of receives? + if (current_recv_req_idx != num_active_recvs) { + error("Rank %d: Processed %d receives during unpack, but expected %d.", rank, current_recv_req_idx, num_active_recvs); + } - /* if (s->e->verbose) */ - /* message("Cell unpack tags took %.3f %s.", */ - /* clocks_from_ticks(getticks() - tic2), clocks_getunit()); */ + if (s->e->verbose) + message("Unpacking aggregate data (serial) took %.3f %s.", + clocks_from_ticks(getticks() - tic2), clocks_getunit()); - /* Wait for all the sends to have completed. */ - if (MPI_Waitall(num_reqs_out, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS) - error("MPI_Waitall on sends failed."); + /* Wait for ALL Sends to Complete */ + // (Often done after unpack, allows computation/unpack to overlap with sends finishing) + tic2 = getticks(); + if (num_active_sends > 0) { + if (MPI_Waitall(num_active_sends, proxy_reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS) + error("Rank %d: MPI_Waitall on aggregate sends failed.", rank); + } - /* Clean up. */ + if (s->e->verbose) + message("Rank %d: Aggregate MPI_Waitall (Sends) took %.3f %s.", rank, + clocks_from_ticks(getticks() - tic2), clocks_getunit()); + + + /* Cleanup Temporary Communication Structures */ + if (num_proxies > 0) { + for (int k = 0; k < num_proxies; k++) { + // Free any remaining send buffers (recv buffers freed during unpack) + if (temp_send_buffers[k] != NULL) { + free(temp_send_buffers[k]); + } + // Check if recv buffer was missed (shouldn't happen with current logic) + if (temp_recv_buffers[k] != NULL) { + warning("Rank %d: Temp recv buffer for proxy %d was not freed during unpack?", rank, k); + free(temp_recv_buffers[k]); + } + } + // Free the helper arrays themselves + free(proxy_reqs_in); + free(proxy_reqs_out); + free(temp_send_buffers); + free(temp_recv_buffers); + free(send_sizes); + free(recv_sizes); + } + + /* Final Clean up - Main Buffers and Offsets */ swift_free("tags_in", tags_in); swift_free("tags_out", tags_out); swift_free("tags_offsets_in", offset_in); swift_free("tags_offsets_out", offset_out); - free(reqs_in); - free(cids_in); #else error("SWIFT was not compiled with MPI support.");