diff --git a/src/proxy.c b/src/proxy.c index f279eaaee6120588e65a5429b50ac04bcf54bad2..325ed78644b07a497374e40bfc8518edcb018593 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -269,105 +269,6 @@ void proxy_cells_exchange_second(struct proxy *p) { #endif } -#ifdef WITH_MPI - -void proxy_cells_count_mapper(void *map_data, int num_elements, - void *extra_data) { - struct cell *cells = (struct cell *)map_data; - - for (int k = 0; k < num_elements; k++) { - if (cells[k].mpi.sendto) cells[k].mpi.pcell_size = cell_getsize(&cells[k]); - } -} - -struct pack_mapper_data { - struct space *s; - int *offset; - struct pcell *pcells; - int with_gravity; -}; - -void proxy_cells_pack_mapper(void *map_data, int num_elements, - void *extra_data) { - struct cell *cells = (struct cell *)map_data; - struct pack_mapper_data *data = (struct pack_mapper_data *)extra_data; - - for (int k = 0; k < num_elements; k++) { - if (cells[k].mpi.sendto) { - ptrdiff_t ind = &cells[k] - data->s->cells_top; - cells[k].mpi.pcell = &data->pcells[data->offset[ind]]; - cell_pack(&cells[k], cells[k].mpi.pcell, data->with_gravity); - } - } -} - -void proxy_cells_exchange_first_mapper(void *map_data, int num_elements, - void *extra_data) { - struct proxy *proxies = (struct proxy *)map_data; - - for (int k = 0; k < num_elements; k++) { - proxy_cells_exchange_first(&proxies[k]); - } -} - -struct wait_and_unpack_mapper_data { - struct space *s; - int num_proxies; - MPI_Request *reqs_in; - struct proxy *proxies; - int with_gravity; - swift_lock_type lock; -}; - -void proxy_cells_wait_and_unpack_mapper(void *unused_map_data, int num_elements, - void *extra_data) { - struct wait_and_unpack_mapper_data *data = - (struct wait_and_unpack_mapper_data *)extra_data; - - for (int k = 0; k < num_elements; k++) { - int pid = MPI_UNDEFINED; - MPI_Status status; - int res; - - /* We need a lock to prevent concurrent calls to MPI_Waitany on - the same array of requests since this is not supported in the MPI - standard (v3.1). This is not really a problem since the threads - would block inside MPI_Waitany anyway. */ - lock_lock(&data->lock); - if ((res = MPI_Waitany(data->num_proxies, data->reqs_in, &pid, &status)) != - MPI_SUCCESS || - pid == MPI_UNDEFINED) - mpi_error(res, "MPI_Waitany failed."); - if (lock_unlock(&data->lock) != 0) { - error("Failed to release lock."); - } - - // message( "cell data from proxy %i has arrived." , pid ); - for (int count = 0, j = 0; j < data->proxies[pid].nr_cells_in; j++) - count += cell_unpack(&data->proxies[pid].pcells_in[count], - data->proxies[pid].cells_in[j], data->s, - data->with_gravity); - } -} - -void proxy_cells_unpack_mapper(void *map_data, int num_elements, - void *extra_data) { - - struct space *s = (struct space *)extra_data; - struct proxy *proxies = (struct proxy *)map_data; - - for (int k = 0; k < num_elements; ++k) { - - int count = 0; - for (int j = 0; j < proxies[k].nr_cells_in; j++) { - count += cell_unpack(&proxies[k].pcells_in[count], proxies[k].cells_in[j], - s, s->gravity); - } - } -} - -#endif // WITH_MPI - /** * @brief Exchange the cell structures with all proxies. * @@ -393,14 +294,13 @@ void proxy_cells_exchange(struct proxy *proxies, int num_proxies, /* Run through the cells and get the size of the ones that will be sent off. */ - threadpool_map(&s->e->threadpool, proxy_cells_count_mapper, s->cells_top, - s->nr_cells, sizeof(struct cell), /*chunk=*/0, - /*extra_data=*/NULL); int count_out = 0; int offset[s->nr_cells]; for (int k = 0; k < s->nr_cells; k++) { offset[k] = count_out; - if (s->cells_top[k].mpi.sendto) count_out += s->cells_top[k].mpi.pcell_size; + if (s->cells_top[k].mpi.sendto) + count_out += + (s->cells_top[k].mpi.pcell_size = cell_getsize(&s->cells_top[k])); } if (s->e->verbose) @@ -416,19 +316,19 @@ void proxy_cells_exchange(struct proxy *proxies, int num_proxies, tic2 = getticks(); /* Pack the cells. */ - struct pack_mapper_data data = {s, offset, pcells, with_gravity}; - threadpool_map(&s->e->threadpool, proxy_cells_pack_mapper, s->cells_top, - s->nr_cells, sizeof(struct cell), /*chunk=*/0, &data); + for (int k = 0; k < s->nr_cells; k++) + if (s->cells_top[k].mpi.sendto) { + cell_pack(&s->cells_top[k], &pcells[offset[k]], with_gravity); + s->cells_top[k].mpi.pcell = &pcells[offset[k]]; + } if (s->e->verbose) message("Packing cells took %.3f %s.", clocks_from_ticks(getticks() - tic2), clocks_getunit()); /* Launch the first part of the exchange. */ - threadpool_map(&s->e->threadpool, proxy_cells_exchange_first_mapper, proxies, - num_proxies, sizeof(struct proxy), /*chunk=*/0, - /*extra_data=*/NULL); for (int k = 0; k < num_proxies; k++) { + proxy_cells_exchange_first(&proxies[k]); reqs_in[k] = proxies[k].req_cells_count_in; reqs_out[k] = proxies[k].req_cells_count_out; } @@ -456,46 +356,23 @@ void proxy_cells_exchange(struct proxy *proxies, int num_proxies, tic2 = getticks(); - if (MPI_Waitall(num_proxies, reqs_in, MPI_STATUSES_IGNORE) != MPI_SUCCESS) - error("MPI_Waitall on recvs failed."); - - if (s->e->verbose) - message("MPI_Waitall on recvs took %.3f %s.", - clocks_from_ticks(getticks() - tic2), clocks_getunit()); - - int *counters = malloc(s->e->nr_nodes * sizeof(int)); - bzero(counters, s->e->nr_nodes * sizeof(int)); - - tic2 = getticks(); - - message("num proxies: %d", num_proxies); - - threadpool_map(&s->e->threadpool, proxy_cells_unpack_mapper, proxies, - num_proxies, sizeof(struct proxy), 0, s); + /* Wait for each pcell array to come in from the proxies. */ + for (int k = 0; k < num_proxies; k++) { + int pid = MPI_UNDEFINED; + MPI_Status status; + if (MPI_Waitany(num_proxies, reqs_in, &pid, &status) != MPI_SUCCESS || + pid == MPI_UNDEFINED) + error("MPI_Waitany failed."); + // message( "cell data from proxy %i has arrived." , pid ); + for (int count = 0, j = 0; j < proxies[pid].nr_cells_in; j++) + count += cell_unpack(&proxies[pid].pcells_in[count], + proxies[pid].cells_in[j], s, with_gravity); + } if (s->e->verbose) message("Un-packing cells took %.3f %s.", clocks_from_ticks(getticks() - tic2), clocks_getunit()); - for (int i = 0; i < num_proxies; ++i) { - counters[i] += proxies[i].nr_cells_in; - } - - for (int i = 0; i < s->e->nr_nodes; ++i) { - if (i == s->e->nodeID) { - printf("Rank %d: |", i); - int total = 0; - for (int j = 0; j < s->e->nr_nodes; ++j) { - total += counters[j]; - printf(" %d", counters[j]); - } - printf("| %d |\n", total); - } - fflush(stdout); - MPI_Barrier(MPI_COMM_WORLD); - } - free(counters); - /* Wait for all the sends to have finished too. */ if (MPI_Waitall(num_proxies, reqs_out, MPI_STATUSES_IGNORE) != MPI_SUCCESS) error("MPI_Waitall on sends failed.");