-
Peter W. Draper authoredPeter W. Draper authored
mpiuse.c 9.87 KiB
/* This file is part of SWIFT.
* Copyright (c) 2019 Peter W. Draper (p.w.draper@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/**
* @file mpiuse.c
* @brief file of routines to report about MPI tasks used in SWIFT.
*/
/* Config parameters. */
#include "../config.h"
#if defined(SWIFT_MPIUSE_REPORTS) && defined(WITH_MPI)
/* Standard includes. */
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
/* Local defines. */
#include "mpiuse.h"
/* Local includes. */
#include "atomic.h"
#include "clocks.h"
#include "engine.h"
#include "error.h"
#include "memuse_rnodes.h"
/* The initial size and increment of the log entries buffer. */
#define MPIUSE_INITLOG 1000000
/* A megabyte for conversions. */
#define MEGABYTE 1048576.0
/* Also recorded in logger. */
extern int engine_rank;
extern int engine_current_step;
/* Entry for logger of MPI send and recv requests in a step. */
struct mpiuse_log_entry {
/* Type and subtype of MPI task. */
int type;
int subtype;
/* Step of action. */
int step;
/* Whether an activation, send or recv, or if handoff completed. Not the
* same as delivered, need to match across ranks to see that. */
int activation;
/* Memory of the request. */
size_t size;
/* Pointer to the request associated with the call. Needs to be
* unique and match to the successful */
union {
void *ptr;
uint8_t vptr[sizeof(uintptr_t)]; /* For rnode keys. */
};
/* Relative time of this action. */
ticks dtic;
/* Whether request is still active, i.e. successful test not seen. */
int active;
/* Rank of otherside of communication. */
int otherrank;
/* The tag. */
int tag;
};
/* The log of activations and handoffs. All volatile as accessed from threads
* that use the value to synchronise. */
static struct mpiuse_log_entry *volatile mpiuse_log = NULL;
static volatile size_t mpiuse_log_size = 0;
static volatile size_t mpiuse_log_count = 0;
static volatile size_t mpiuse_old_count = 0;
static volatile size_t mpiuse_log_done = 0;
/**
* @brief reallocate the entries log if space is needed.
*/
static void mpiuse_log_reallocate(size_t ind) {
if (ind == 0) {
/* Need to perform initialization. Be generous. */
if ((mpiuse_log = (struct mpiuse_log_entry *)malloc(
sizeof(struct mpiuse_log_entry) * MPIUSE_INITLOG)) == NULL)
error("Failed to allocate MPI use log.");
/* Last action. */
mpiuse_log_size = MPIUSE_INITLOG;
} else {
struct mpiuse_log_entry *new_log;
if ((new_log = (struct mpiuse_log_entry *)malloc(
sizeof(struct mpiuse_log_entry) *
(mpiuse_log_size + MPIUSE_INITLOG))) == NULL)
error("Failed to re-allocate MPI use log.");
/* Wait for all writes to the old buffer to complete. */
while (mpiuse_log_done < mpiuse_log_size)
;
/* Copy to new buffer. */
memcpy(new_log, mpiuse_log,
sizeof(struct mpiuse_log_entry) * mpiuse_log_size);
free(mpiuse_log);
mpiuse_log = new_log;
/* Last action, releases waiting threads. */
atomic_add(&mpiuse_log_size, MPIUSE_INITLOG);
}
}
/**
* @brief Log an MPI request or handoff.
*
* @param type the task type (send or recv).
* @param subtype the task subtype.
* @param ptr pointer to the MPI request.
* @param activation if not is a successful MPI_Test, not MPI_Isend or
* MPI_Irecv.
* @param size the size in bytes of memory to be transfered or received.
* 0 for a deactivation.
* @param otherrank other rank associated with the transfer.
* @param tag the MPI tag.
*/
void mpiuse_log_allocation(int type, int subtype, void *ptr, int activation,
size_t size, int otherrank, int tag) {
size_t ind = atomic_inc(&mpiuse_log_count);
/* If we are at the current size we need more space. */
if (ind == mpiuse_log_size) mpiuse_log_reallocate(ind);
/* Other threads wait for space. */
while (ind > mpiuse_log_size)
;
/* Record the log. */
mpiuse_log[ind].step = engine_current_step;
mpiuse_log[ind].activation = activation;
mpiuse_log[ind].size = size;
mpiuse_log[ind].ptr = ptr;
mpiuse_log[ind].otherrank = otherrank;
mpiuse_log[ind].tag = tag;
mpiuse_log[ind].dtic = getticks() - clocks_start_ticks;
mpiuse_log[ind].active = 1;
atomic_inc(&mpiuse_log_done);
}
/**
* @brief dump the log to a file and reset, if anything to dump.
*
* @param filename name of file for log dump.
*/
void mpiuse_log_dump(const char *filename) {
/* Skip if nothing logged this step. */
if (mpiuse_log_count == mpiuse_old_count) return;
// ticks tic = getticks();
/* Create the radix tree root node. */
struct memuse_rnode *memuse_rnode_root =
(struct memuse_rnode *)calloc(1, sizeof(struct memuse_rnode));
/* Stop any new logs from being processed while we are dumping. */
size_t log_count = mpiuse_log_count;
size_t old_count = mpiuse_old_count;
/* Open the output file. */
FILE *fd;
if ((fd = fopen(filename, "w")) == NULL) {
message("Failed to create MPI use log file '%s', logs not dumped.",
filename);
return;
}
/* Write a header. */
fprintf(fd,
"# dtic step rank otherrank type subtype activation tag size sum\n");
size_t mpiuse_current = 0;
for (size_t k = old_count; k < log_count; k++) {
/* Check if this address has already been recorded. */
struct memuse_rnode *child = memuse_rnode_find_child(
memuse_rnode_root, 0, mpiuse_log[k].vptr, sizeof(uintptr_t));
if (child != NULL && child->ptr != NULL) {
/* Should be the handoff. Check that. */
if (mpiuse_log[k].activation) {
/* Used twice, this is an error, but just complain as not fatal. */
#if SWIFT_DEBUG_CHECKS
message(
"Used the same MPI request address twice "
"(%s/%s: %d->%d: %zd/%d)",
taskID_names[mpiuse_log[k].type],
subtaskID_names[mpiuse_log[k].subtype], engine_rank,
mpiuse_log[k].otherrank, mpiuse_log[k].size,
mpiuse_log[k].tag);
#endif
continue;
}
/* Free, update the missing fields, size of request is removed. */
struct mpiuse_log_entry *oldlog =
(struct mpiuse_log_entry *)child->ptr;
mpiuse_log[k].size = -oldlog->size;
mpiuse_log[k].otherrank = oldlog->otherrank;
mpiuse_log[k].tag = oldlog->tag;
/* And deactivate this key. */
child->ptr = NULL;
/* And mark this as handed off. */
mpiuse_log[k].active = 0;
oldlog->active = 0;
} else if (child == NULL && mpiuse_log[k].activation) {
/* Not found, so new send/recv which we store the log against the
* address. */
memuse_rnode_insert_child(memuse_rnode_root, 0, mpiuse_log[k].vptr,
sizeof(uintptr_t), &mpiuse_log[k]);
} else if (child == NULL && !mpiuse_log[k].activation) {
/* Unmatched handoff, not OK, but not fatal. */
#if SWIFT_DEBUG_CHECKS
if (mpiuse_log[k].ptr != NULL) {
message("Unmatched MPI_Test found: (%s/%s: %d->%d: %zd/%d)",
taskID_names[mpiuse_log[k].type],
subtaskID_names[mpiuse_log[k].subtype],
engine_rank, mpiuse_log[k].otherrank,
mpiuse_log[k].size, mpiuse_log[k].tag);
}
#endif
continue;
} else if (mpiuse_log[k].activation) {
/* Must be previously released request with the same address, so we
* store. */
memuse_rnode_insert_child(memuse_rnode_root, 0, mpiuse_log[k].vptr,
sizeof(uintptr_t), &mpiuse_log[k]);
} else {
/* Should not happen ... */
message("Weird MPI log record found: (%s/%s: %d->%d: %zd/%d)",
taskID_names[mpiuse_log[k].type],
subtaskID_names[mpiuse_log[k].subtype],
engine_rank, mpiuse_log[k].otherrank,
mpiuse_log[k].size, mpiuse_log[k].tag);
continue;
}
/* Sum of memory in flight. */
mpiuse_current += mpiuse_log[k].size;
/* And output. */
fprintf(fd, "%lld %d %d %d %s %s %d %d %zd %zd\n", mpiuse_log[k].dtic,
mpiuse_log[k].step, engine_rank,
mpiuse_log[k].otherrank, taskID_names[mpiuse_log[k].type],
subtaskID_names[mpiuse_log[k].subtype],
mpiuse_log[k].activation, mpiuse_log[k].tag,
mpiuse_log[k].size, mpiuse_current);
}
#ifdef MEMUSE_RNODE_DUMP
/* Debug dump of tree. */
// memuse_rnode_dump(0, memuse_rnode_root, 0);
#endif
/* Now check any still active logs, these are errors all should match. */
if (mpiuse_current != 0) {
message("Some MPI requests have not been completed");
for (size_t k = old_count; k < log_count; k++) {
if (mpiuse_log[k].active)
message("%s/%s: %d->%d: %zd/%d)", taskID_names[mpiuse_log[k].type],
subtaskID_names[mpiuse_log[k].subtype],
engine_rank, mpiuse_log[k].otherrank,
mpiuse_log[k].size, mpiuse_log[k].tag);
}
}
/* Finished with the rnodes. */
memuse_rnode_cleanup(memuse_rnode_root);
/* Close the file. */
fflush(fd);
fclose(fd);
// message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
// clocks_getunit());
}
#endif /* defined(SWIFT_MPIUSE_REPORTS) && defined(WITH_MPI) */