Skip to content
Snippets Groups Projects
mpiuse.c 11.50 KiB
/* This file is part of SWIFT.
 * Copyright (c) 2019 Peter W. Draper (p.w.draper@durham.ac.uk)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 ******************************************************************************/

/**
 *  @file mpiuse.c
 *  @brief file of routines to report about MPI tasks used in SWIFT.
 */

/* Standard includes. */
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

/* Local defines. */
#include "mpiuse.h"

/* Local includes. */
#include "atomic.h"
#include "clocks.h"
#include "cycle.h"
#include "error.h"

/* Our rank. */
extern int myrank;

/* The initial size and increment of the log entries buffer. */
#define MPIUSE_INITLOG 1000000

/* The log of activations and handoffs. All volatile as accessed from threads
 * that use the value to synchronise. */
static struct mpiuse_log_entry *volatile mpiuse_log = NULL;
static volatile size_t mpiuse_log_size = 0;
static volatile size_t mpiuse_log_count = 0;
static volatile size_t mpiuse_log_done = 0;
static volatile int mpiuse_max_rank = 0;

/**
 * @brief reallocate the entries log if space is needed.
 */
static void mpiuse_log_reallocate(size_t ind) {

  if (ind == 0) {

    /* Need to perform initialization. Be generous. */
    if ((mpiuse_log = (struct mpiuse_log_entry *)malloc(
             sizeof(struct mpiuse_log_entry) * MPIUSE_INITLOG)) == NULL)
      error("Failed to allocate MPI use log.");

    /* Last action. */
    mpiuse_log_size = MPIUSE_INITLOG;

  } else {
    struct mpiuse_log_entry *new_log;
    if ((new_log = (struct mpiuse_log_entry *)malloc(
             sizeof(struct mpiuse_log_entry) *
             (mpiuse_log_size + MPIUSE_INITLOG))) == NULL)
      error("Failed to re-allocate MPI use log.");

    /* Wait for all writes to the old buffer to complete. */
    while (mpiuse_log_done < mpiuse_log_size)
      ;

    /* Copy to new buffer. */
    memcpy(new_log, mpiuse_log,
           sizeof(struct mpiuse_log_entry) * mpiuse_log_size);
    free(mpiuse_log);
    mpiuse_log = new_log;

    /* Last action, releases waiting threads. */
    atomic_add(&mpiuse_log_size, MPIUSE_INITLOG);
  }
}

/**
 * @brief Log an MPI request or handoff.
 *
 * @param rank the rank
 * @param step the step
 * @param tic the ticks at time of log, will be relative.
 * @param type the task type (send or recv).
 * @param subtype the task subtype.
 * @param activation if not is a successful MPI_Test, not MPI_Isend or
 *        MPI_Irecv.
 * @param size the size in bytes of memory to be transfered or received.
 *             0 for a deactivation.
 * @param otherrank other rank associated with the transfer.
 * @param tag the MPI tag.
 */
void mpiuse_log_allocation(int rank, int step, size_t tic, int type,
                           int subtype, int activation, size_t size,
                           int otherrank, int tag) {

  size_t ind = atomic_inc(&mpiuse_log_count);

  /* If we are at the current size we need more space. */
  if (ind == mpiuse_log_size) mpiuse_log_reallocate(ind);

  /* Other threads wait for space. */
  while (ind > mpiuse_log_size)
    ;

  /* Record the log. */
  mpiuse_log[ind].activation = activation;
  mpiuse_log[ind].data = NULL;
  mpiuse_log[ind].otherrank = otherrank;
  mpiuse_log[ind].rank = rank;
  mpiuse_log[ind].req = MPI_REQUEST_NULL;
  mpiuse_log[ind].size = size;
  mpiuse_log[ind].step = step;
  mpiuse_log[ind].subtype = subtype;
  mpiuse_log[ind].tag = tag;
  mpiuse_log[ind].tic = tic;
  mpiuse_log[ind].type = type;

  /* Keep number of ranks for convenience. */
  if (rank > mpiuse_max_rank) mpiuse_max_rank = rank;

  atomic_inc(&mpiuse_log_done);
}

/**
 * @brief restore the log from a dump.
 *
 * @param filename name of file with the previous dump in.
 */
void mpiuse_log_restore(const char *filename) {

  /* Open the input file. */
  FILE *fd;
  if ((fd = fopen(filename, "r")) == NULL) {
    message("Failed to open the MPI use log file '%s'.", filename);
    return;
  }

  /* Read until the end of the file is reached.*/
  char line[132];
  size_t stic, etic, dtic, size, sum;
  int step, rank, otherrank, itype, isubtype, activation, tag;
  char type[32], subtype[32];

  while (!feof(fd)) {
    if (fgets(line, 132, fd) != NULL) {
      if (line[0] != '#') {
        sscanf(line, "%zd %zd %zd %d %d %d %s %d %s %d %d %d %zd %zd", &stic,
               &etic, &dtic, &step, &rank, &otherrank, type, &itype, subtype,
               &isubtype, &activation, &tag, &size, &sum);

        mpiuse_log_allocation(rank, step, stic, itype, isubtype, activation,
                              size, otherrank, tag);
      }
    }
  }
  fclose(fd);
}

/**
 * @brief dump the logs for all ranks to a file.
 *
 * @param nranks the number of ranks.
 * @param dumpfile the file to write
 */
void mpiuse_dump_logs(int nranks, const char *dumpfile) {

  /* Make sure output file is empty, only on one rank. */
  FILE *fd;
  if (myrank == 0) {
    fd = fopen(dumpfile, "w");

    /* Header. */
    fprintf(fd,
            "# logticin logtic injtic endtic dtic step rank otherrank itype "
            " isubtype tag size nr_tests tsum tmin tmax\n");
    fclose(fd);
  }
  MPI_Barrier(MPI_COMM_WORLD);

  /* Loop over all ranks, one by one, getting each rank to append their
   * logs. */
  for (int k = 0; k < nranks; k++) {

    /* Rank 0 decides the index of the writing node, this happens
     * one-by-one. */
    int kk = k;
    MPI_Bcast(&kk, 1, MPI_INT, 0, MPI_COMM_WORLD);

    if (kk == myrank) {

      /* Open file and position at end. */
      fd = fopen(dumpfile, "a");

      /* And append our logs. Note log->tic is not necessarily from this
       * machine, so the conversion to ms may be suspect. We also rebase a
       * version to match the expected injection times for this new run. */
      size_t nlogs = mpiuse_log_count;
      ticks basetics = 0;
      for (size_t k = 0; k < nlogs; k++) {
        struct mpiuse_log_entry *log = &mpiuse_log[k];
        if (log->rank == myrank && log->endtic > 0) {
          if (basetics == 0) basetics = log->tic;
          fprintf(fd,
                  "%lld %.4f %.4f %.4f %.6f %d %d %d %d %d %d %zd %d %.4f %.6f "
                  "%.6f\n",
                  log->tic, clocks_from_ticks(log->tic - basetics),
                  clocks_from_ticks(log->injtic - clocks_start_ticks),
                  clocks_from_ticks(log->endtic - clocks_start_ticks),
                  clocks_from_ticks(log->endtic - log->injtic), log->step,
                  log->rank, log->otherrank, log->type, log->subtype, log->tag,
                  log->size, log->nr_tests, clocks_from_ticks(log->tsum),
                  clocks_from_ticks(log->tmin), clocks_from_ticks(log->tmax));
        }
      }
      fclose(fd);
    }

    /* Need to stay in step. */
    MPI_Barrier(MPI_COMM_WORLD);
  }
}

/**
 * @brief return the number of log entries.
 *
 * @result the number of log entries.
 */
int mpiuse_nr_logs(void) { return mpiuse_log_count; }

/**
 * @brief return the number of ranks in log.
 *
 * @result the number of ranks we've seen.
 */
int mpiuse_nr_ranks(void) { return mpiuse_max_rank + 1; }

/**
 * @brief get a log entry.
 *
 * @param ind the index of the entry required.
 * @result NULL if not available.
 */
struct mpiuse_log_entry *mpiuse_get_log(int ind) {

  if (ind < mpiuse_log_count && ind >= 0) return &mpiuse_log[ind];
  return NULL;
}

/**
 * @brief return random number from a upper part of gaussian distribution.
 *
 * @result the random.
 */
static double gauss_rand_upper(void) {

  double V1, V2, S;
  do {
    double U1 = drand48();
    double U2 = drand48();

    V1 = U1 - 1.0;
    V2 = U2 - 1.0;
    S = V1 * V1 + V2 * V2;
  } while (S >= 1.0 || S == 0.0);

  return fabs(V1 * sqrt(-2.0 * log(S) / S));
}

/**
 * @brief generate a list of fake exchanges as mpiuse logs.
 *
 * @param nr_nodes the number of ranks that will be used.
 * @param nr_logs the number of logs to generate per rank.
 * @param size bytes per message, unless random when this is the maximum
 *             and the minimum is 1 for uniform, if using a gaussian
 *             distribution the value is a 2.5 sigma, for a CDF based
 *             selection this is just a scale factor of the values.
 * @param random whether to use random sizes.
 * @param seed the random seed, use same for fixed sequences.
 * @param uniform whether to use a uniform distribution other gaussian, unless
 *                cdf is defined, in which case this parameter is ignored.
 * @param cdf text file containing a normalized CDF to use as a basis for
 * inverse transform sampling of the randoms. NULL for no file.
 */
void mpiuse_log_generate(int nr_nodes, int nr_logs, int size, int random,
                         long int seed, int uniform, const char *cdf) {

  /* Only used for CDF, may need to increase these. */
  int nvals = 0;
  double imin[1024], imax[1024], value[1024];

  /* Note that each rank exchanges messages with all the others and each "log"
   * has the same size. */
  /* Set seed. */
  if (random) srand48(seed);

  /* Check for CDF. This should be based on some real distribution, the format
   * is same as output from TOPCAT, i.e. bin-low, bin-high, value space
   * separated values. Note the value column should be normalised into the
   * range 0 to 1 so that it maps into a uniform random distribution. */
  if (cdf != NULL) {
    FILE *infile = fopen(cdf, "r");
    if (infile == NULL) error("Failed to open CDF file: %s", cdf);
    char line[132];
    while (!feof(infile)) {
      if (fgets(line, 132, infile) != NULL) {
        if (line[0] != '#') {
          int nread = sscanf(line, "%lf %lf %lf", &imin[nvals], &imax[nvals],
                             &value[nvals]);
          if (nread == 3) nvals++;
        }
      }
    }
    fclose(infile);
  }

  /* Message tags increment with across rank logs. */
  int tag = 1;
  for (int k = 0; k < nr_logs; k++) {

    /* Set size for this messages. */
    int logsize = size;
    if (random) {
      if (cdf) {
        /* CDF randoms. */
        double rand = drand48();

        /* Binary search for containing bin for this rand. */
        unsigned int lower = 0;
        unsigned int upper = nvals;
        unsigned int middle = 0;
        while (lower < upper) {
          middle = (upper + lower) / 2;
          if (rand > value[middle])
            lower = middle + 1;
          else
            upper = middle;
        }
        logsize = 0.5 * (imax[middle] + imin[middle]);

      } else if (uniform) {
        /* Uniform randoms in the range 0 to 1 */
        logsize = (drand48() * (double)size) + 1;
      } else {
        // Gaussian randoms so no maximum, assume size is 2.5 sigma.
        logsize = (gauss_rand_upper() * (double)size * 0.25) + 1;
      }
    }

    for (int i = 0; i < nr_nodes; i++) {
      for (int j = 0; j < nr_nodes; j++) {
        if (i != j) {
          mpiuse_log_allocation(i, 1, k, SEND_TYPE, NO_SUBTYPE, 1, logsize, j,
                                tag);
          mpiuse_log_allocation(j, 1, k, RECV_TYPE, NO_SUBTYPE, 1, logsize, i,
                                tag);
        }
      }
    }
    tag++;
  }
}