Skip to content
Snippets Groups Projects
mpiuse.c 9.87 KiB
/* This file is part of SWIFT.
 * Copyright (c) 2019 Peter W. Draper (p.w.draper@durham.ac.uk)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 ******************************************************************************/

/**
 *  @file mpiuse.c
 *  @brief file of routines to report about MPI tasks used in SWIFT.
 */
/* Config parameters. */
#include "../config.h"

#if defined(SWIFT_MPIUSE_REPORTS) && defined(WITH_MPI)

/* Standard includes. */
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>

/* Local defines. */
#include "mpiuse.h"

/* Local includes. */
#include "atomic.h"
#include "clocks.h"
#include "engine.h"
#include "error.h"
#include "memuse_rnodes.h"

/* The initial size and increment of the log entries buffer. */
#define MPIUSE_INITLOG 1000000

/* A megabyte for conversions. */
#define MEGABYTE 1048576.0

/* Also recorded in logger. */
extern int engine_rank;
extern int engine_current_step;

/* Entry for logger of MPI send and recv requests in a step. */
struct mpiuse_log_entry {

  /* Type and subtype of MPI task. */
  int type;
  int subtype;

  /* Step of action. */
  int step;

  /* Whether an activation, send or recv, or if handoff completed. Not the
   * same as delivered, need to match across ranks to see that. */
  int activation;

  /* Memory of the request. */
  size_t size;

  /* Pointer to the request associated with the call. Needs to be
   * unique and match to the successful */
  union {
    void *ptr;
    uint8_t vptr[sizeof(uintptr_t)]; /* For rnode keys. */
  };

  /* Relative time of this action. */
  ticks dtic;

  /* Whether request is still active, i.e. successful test not seen. */
  int active;

  /* Rank of otherside of communication. */
  int otherrank;

  /* The tag. */
  int tag;
};

/* The log of activations and handoffs. All volatile as accessed from threads
 * that use the value to synchronise. */
static struct mpiuse_log_entry *volatile mpiuse_log = NULL;
static volatile size_t mpiuse_log_size = 0;
static volatile size_t mpiuse_log_count = 0;
static volatile size_t mpiuse_old_count = 0;
static volatile size_t mpiuse_log_done = 0;

/**
 * @brief reallocate the entries log if space is needed.
 */
static void mpiuse_log_reallocate(size_t ind) {

  if (ind == 0) {

    /* Need to perform initialization. Be generous. */
    if ((mpiuse_log = (struct mpiuse_log_entry *)malloc(
             sizeof(struct mpiuse_log_entry) * MPIUSE_INITLOG)) == NULL)
      error("Failed to allocate MPI use log.");

    /* Last action. */
    mpiuse_log_size = MPIUSE_INITLOG;

  } else {
    struct mpiuse_log_entry *new_log;
    if ((new_log = (struct mpiuse_log_entry *)malloc(
             sizeof(struct mpiuse_log_entry) *
             (mpiuse_log_size + MPIUSE_INITLOG))) == NULL)
      error("Failed to re-allocate MPI use log.");

    /* Wait for all writes to the old buffer to complete. */
    while (mpiuse_log_done < mpiuse_log_size)
      ;

    /* Copy to new buffer. */
    memcpy(new_log, mpiuse_log,
           sizeof(struct mpiuse_log_entry) * mpiuse_log_size);
    free(mpiuse_log);
    mpiuse_log = new_log;

    /* Last action, releases waiting threads. */
    atomic_add(&mpiuse_log_size, MPIUSE_INITLOG);
  }
}

/**
 * @brief Log an MPI request or handoff.
 *
 * @param type the task type (send or recv).
 * @param subtype the task subtype.
 * @param ptr pointer to the MPI request.
 * @param activation if not is a successful MPI_Test, not MPI_Isend or
 *        MPI_Irecv.
 * @param size the size in bytes of memory to be transfered or received.
 *             0 for a deactivation.
 * @param otherrank other rank associated with the transfer.
 * @param tag the MPI tag.
 */
void mpiuse_log_allocation(int type, int subtype, void *ptr, int activation,
                               size_t size, int otherrank, int tag) {

  size_t ind = atomic_inc(&mpiuse_log_count);

  /* If we are at the current size we need more space. */
  if (ind == mpiuse_log_size) mpiuse_log_reallocate(ind);

  /* Other threads wait for space. */
  while (ind > mpiuse_log_size)
    ;

  /* Record the log. */
  mpiuse_log[ind].step = engine_current_step;
  mpiuse_log[ind].activation = activation;
  mpiuse_log[ind].size = size;
  mpiuse_log[ind].ptr = ptr;
  mpiuse_log[ind].otherrank = otherrank;
  mpiuse_log[ind].tag = tag;
  mpiuse_log[ind].dtic = getticks() - clocks_start_ticks;
  mpiuse_log[ind].active = 1;
  atomic_inc(&mpiuse_log_done);
}

/**
 * @brief dump the log to a file and reset, if anything to dump.
 *
 * @param filename name of file for log dump.
 */
void mpiuse_log_dump(const char *filename) {

  /* Skip if nothing logged this step. */
  if (mpiuse_log_count == mpiuse_old_count) return;

  // ticks tic = getticks();

  /* Create the radix tree root node. */
  struct memuse_rnode *memuse_rnode_root =
      (struct memuse_rnode *)calloc(1, sizeof(struct memuse_rnode));

  /* Stop any new logs from being processed while we are dumping. */
  size_t log_count = mpiuse_log_count;
  size_t old_count = mpiuse_old_count;

  /* Open the output file. */
  FILE *fd;
  if ((fd = fopen(filename, "w")) == NULL) {
    message("Failed to create MPI use log file '%s', logs not dumped.",
            filename);
    return;
  }

  /* Write a header. */
  fprintf(fd,
          "# dtic step rank otherrank type subtype activation tag size sum\n");

  size_t mpiuse_current = 0;
  for (size_t k = old_count; k < log_count; k++) {

    /* Check if this address has already been recorded. */
    struct memuse_rnode *child = memuse_rnode_find_child(
        memuse_rnode_root, 0, mpiuse_log[k].vptr, sizeof(uintptr_t));

    if (child != NULL && child->ptr != NULL) {

      /* Should be the handoff. Check that. */
      if (mpiuse_log[k].activation) {

      /* Used twice, this is an error, but just complain as not fatal. */
#if SWIFT_DEBUG_CHECKS
        message(
            "Used the same MPI request address twice "
            "(%s/%s: %d->%d: %zd/%d)",
            taskID_names[mpiuse_log[k].type],
            subtaskID_names[mpiuse_log[k].subtype], engine_rank,
            mpiuse_log[k].otherrank, mpiuse_log[k].size,
            mpiuse_log[k].tag);
#endif
        continue;
      }

      /* Free, update the missing fields, size of request is removed. */
      struct mpiuse_log_entry *oldlog =
          (struct mpiuse_log_entry *)child->ptr;
      mpiuse_log[k].size = -oldlog->size;
      mpiuse_log[k].otherrank = oldlog->otherrank;
      mpiuse_log[k].tag = oldlog->tag;

      /* And deactivate this key. */
      child->ptr = NULL;

      /* And mark this as handed off. */
      mpiuse_log[k].active = 0;
      oldlog->active = 0;

    } else if (child == NULL && mpiuse_log[k].activation) {

      /* Not found, so new send/recv which we store the log against the
       * address. */
      memuse_rnode_insert_child(memuse_rnode_root, 0, mpiuse_log[k].vptr,
                                sizeof(uintptr_t), &mpiuse_log[k]);

    } else if (child == NULL && !mpiuse_log[k].activation) {

    /* Unmatched handoff, not OK, but not fatal. */
#if SWIFT_DEBUG_CHECKS
      if (mpiuse_log[k].ptr != NULL) {
        message("Unmatched MPI_Test found: (%s/%s: %d->%d: %zd/%d)",
                taskID_names[mpiuse_log[k].type],
                subtaskID_names[mpiuse_log[k].subtype],
                engine_rank, mpiuse_log[k].otherrank,
                mpiuse_log[k].size, mpiuse_log[k].tag);
      }
#endif
      continue;
    } else if (mpiuse_log[k].activation) {

      /* Must be previously released request with the same address, so we
       * store. */
      memuse_rnode_insert_child(memuse_rnode_root, 0, mpiuse_log[k].vptr,
                                sizeof(uintptr_t), &mpiuse_log[k]);

    } else {
      /* Should not happen ... */
      message("Weird MPI log record found: (%s/%s: %d->%d: %zd/%d)",
              taskID_names[mpiuse_log[k].type],
              subtaskID_names[mpiuse_log[k].subtype],
              engine_rank, mpiuse_log[k].otherrank,
              mpiuse_log[k].size, mpiuse_log[k].tag);
      continue;
    }

    /* Sum of memory in flight. */
    mpiuse_current += mpiuse_log[k].size;

    /* And output. */
    fprintf(fd, "%lld %d %d %d %s %s %d %d %zd %zd\n", mpiuse_log[k].dtic,
            mpiuse_log[k].step, engine_rank,
            mpiuse_log[k].otherrank, taskID_names[mpiuse_log[k].type],
            subtaskID_names[mpiuse_log[k].subtype],
            mpiuse_log[k].activation, mpiuse_log[k].tag,
            mpiuse_log[k].size, mpiuse_current);
  }

#ifdef MEMUSE_RNODE_DUMP
    /* Debug dump of tree. */
    // memuse_rnode_dump(0, memuse_rnode_root, 0);
#endif

  /* Now check any still active logs, these are errors all should match. */
  if (mpiuse_current != 0) {
    message("Some MPI requests have not been completed");
    for (size_t k = old_count; k < log_count; k++) {
      if (mpiuse_log[k].active)
        message("%s/%s: %d->%d: %zd/%d)", taskID_names[mpiuse_log[k].type],
                subtaskID_names[mpiuse_log[k].subtype],
                engine_rank, mpiuse_log[k].otherrank,
                mpiuse_log[k].size, mpiuse_log[k].tag);
    }
  }

  /* Finished with the rnodes. */
  memuse_rnode_cleanup(memuse_rnode_root);

  /* Close the file. */
  fflush(fd);
  fclose(fd);

  // message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
  //        clocks_getunit());
}

#endif /* defined(SWIFT_MPIUSE_REPORTS) && defined(WITH_MPI) */