Skip to content
Snippets Groups Projects
mpiuse.c 14.28 KiB
/* This file is part of SWIFT.
 * Copyright (c) 2019 Peter W. Draper (p.w.draper@durham.ac.uk)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 ******************************************************************************/

/**
 *  @file mpiuse.c
 *  @brief file of routines to report about MPI tasks used in SWIFT.
 */

/* Standard includes. */
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

/* Local defines. */
#include "mpiuse.h"

/* Local includes. */
#include "atomic.h"
#include "clocks.h"
#include "cycle.h"
#include "error.h"
#include "histogram.h"

/* Our rank. */
extern int myrank;

/* The initial size and increment of the log entries buffer. */
#define MPIUSE_INITLOG 1000000

/* The log of activations and handoffs. All volatile as accessed from threads
 * that use the value to synchronise. */
static struct mpiuse_log_entry *volatile mpiuse_log = NULL;
static volatile size_t mpiuse_log_size = 0;
static volatile size_t mpiuse_log_count = 0;
static volatile size_t mpiuse_log_done = 0;
static volatile int mpiuse_max_rank = 0;

/**
 * @brief reallocate the entries log if space is needed.
 */
static void mpiuse_log_reallocate(size_t ind) {

  if (ind == 0) {

    /* Need to perform initialization. Be generous. */
    if ((mpiuse_log = (struct mpiuse_log_entry *)malloc(
             sizeof(struct mpiuse_log_entry) * MPIUSE_INITLOG)) == NULL)
      error("Failed to allocate MPI use log.");

    /* Last action. */
    mpiuse_log_size = MPIUSE_INITLOG;

  } else {
    struct mpiuse_log_entry *new_log;
    if ((new_log = (struct mpiuse_log_entry *)malloc(
             sizeof(struct mpiuse_log_entry) *
             (mpiuse_log_size + MPIUSE_INITLOG))) == NULL)
      error("Failed to re-allocate MPI use log.");

    /* Wait for all writes to the old buffer to complete. */
    while (mpiuse_log_done < mpiuse_log_size)
      ;

    /* Copy to new buffer. */
    memcpy(new_log, mpiuse_log,
           sizeof(struct mpiuse_log_entry) * mpiuse_log_size);
    free(mpiuse_log);
    mpiuse_log = new_log;

    /* Last action, releases waiting threads. */
    atomic_add(&mpiuse_log_size, MPIUSE_INITLOG);
  }
}

/**
 * @brief Log an MPI request or handoff.
 *
 * @param rank the rank
 * @param step the step
 * @param tic the ticks at time of log, will be relative.
 * @param type the task type (send or recv).
 * @param subtype the task subtype.
 * @param activation if not is a successful MPI_Test, not MPI_Isend or
 *        MPI_Irecv.
 * @param size the size in bytes of memory to be transfered or received.
 *             0 for a deactivation.
 * @param otherrank other rank associated with the transfer.
 * @param tag the MPI tag.
 */
void mpiuse_log_allocation(int rank, int step, size_t tic, int type,
                           int subtype, int activation, size_t size,
                           int otherrank, int tag) {

  size_t ind = atomic_inc(&mpiuse_log_count);

  /* If we are at the current size we need more space. */
  if (ind == mpiuse_log_size) mpiuse_log_reallocate(ind);

  /* Other threads wait for space. */
  while (ind > mpiuse_log_size)
    ;

  /* Record the log. */
  mpiuse_log[ind].activation = activation;
  mpiuse_log[ind].data = NULL;
  mpiuse_log[ind].otherrank = otherrank;
  mpiuse_log[ind].rank = rank;
  mpiuse_log[ind].req = MPI_REQUEST_NULL;
  mpiuse_log[ind].size = size;
  mpiuse_log[ind].step = step;
  mpiuse_log[ind].subtype = subtype;
  mpiuse_log[ind].tag = tag;
  mpiuse_log[ind].tic = tic;
  mpiuse_log[ind].type = type;

  /* Keep number of ranks for convenience. */
  if (rank > mpiuse_max_rank) mpiuse_max_rank = rank;

  atomic_inc(&mpiuse_log_done);
}

/**
 * @brief restore the log from a dump.
 *
 * @param filename name of file with the previous dump in.
 */
void mpiuse_log_restore(const char *filename) {

  /* Open the input file. */
  FILE *fd;
  if ((fd = fopen(filename, "r")) == NULL) {
    message("Failed to open the MPI use log file '%s'.", filename);
    return;
  }

  /* Read until the end of the file is reached.*/
  char line[132];
  size_t stic, etic, dtic, size, sum;
  int step, rank, otherrank, itype, isubtype, activation, tag;
  char type[32], subtype[32];

  while (!feof(fd)) {
    if (fgets(line, 132, fd) != NULL) {
      if (line[0] != '#') {
        sscanf(line, "%zd %zd %zd %d %d %d %s %d %s %d %d %d %zd %zd", &stic,
               &etic, &dtic, &step, &rank, &otherrank, type, &itype, subtype,
               &isubtype, &activation, &tag, &size, &sum);

        mpiuse_log_allocation(rank, step, stic, itype, isubtype, activation,
                              size, otherrank, tag);
      }
    }
  }
  fclose(fd);
}

/**
 * @brief dump the logs for all ranks to a file.
 *
 * @param nranks the number of ranks.
 * @param standard only write a standard log, this can be used as input to
 *                 other runs.
 * @param dumpfile the file to write
 */
void mpiuse_dump_logs(int nranks, int standard, const char *dumpfile) {

  /* Make sure output file is empty, only on one rank. */
  FILE *fd;
  if (myrank == 0) {
    fd = fopen(dumpfile, "w");

    /* Header. */
    if (standard) {
      fprintf(fd,
              "# stic etic dtic step rank otherrank type itype "
              " subtype isubtype activation tag size sum\n");
    } else {
      fprintf(fd,
              "# logticin logtic injtic endtic dtic step rank otherrank itype "
              " isubtype tag size nr_tests tsum tmin tmax\n");
    }
    fclose(fd);
  }
  MPI_Barrier(MPI_COMM_WORLD);

  const char *types[] = {"send", "recv"};

  /* Loop over all ranks, one by one, getting each rank to append their
   * logs. */
  for (int k = 0; k < nranks; k++) {

    /* Rank 0 decides the index of the writing node, this happens
     * one-by-one. */
    int kk = k;
    MPI_Bcast(&kk, 1, MPI_INT, 0, MPI_COMM_WORLD);

    if (kk == myrank) {

      /* Open file and position at end. */
      fd = fopen(dumpfile, "a");

      /* And append our logs. Note log->tic is not necessarily from this
       * machine, so the conversion to ms may be suspect. We also rebase a
       * version to match the expected injection times for this new run. */
      size_t nlogs = mpiuse_log_count;
      ticks basetics = 0;
      long long sum = 0;
      for (size_t k = 0; k < nlogs; k++) {
        struct mpiuse_log_entry *log = &mpiuse_log[k];
        if (log->rank == myrank && log->endtic > 0) {
          if (basetics == 0) basetics = log->tic;
          if (standard) {
            fprintf(fd, "%lld %lld %lld %d %d %d %s %d %s %d %d %d %lld %lld\n",
                    log->injtic, log->injtic, log->endtic - log->injtic,
                    log->step, log->rank, log->otherrank,
                    types[log->type - SEND_TYPE], log->type, "none",
                    log->subtype, log->activation, log->tag, log->size, sum);
            sum += log->size;
          } else {
            fprintf(
                fd,
                "%lld %.4f %.4f %.4f %.6f %d %d %d %d %d %d %zd %d %.4f %.6f "
                "%.6f\n",
                log->tic, clocks_from_ticks(log->tic - basetics),
                clocks_from_ticks(log->injtic - clocks_start_ticks),
                clocks_from_ticks(log->endtic - clocks_start_ticks),
                clocks_from_ticks(log->endtic - log->injtic), log->step,
                log->rank, log->otherrank, log->type, log->subtype, log->tag,
                log->size, log->nr_tests, clocks_from_ticks(log->tsum),
                clocks_from_ticks(log->tmin), clocks_from_ticks(log->tmax));
          }
        }
      }
      fclose(fd);
    }

    /* Need to stay in step. */
    MPI_Barrier(MPI_COMM_WORLD);
  }
}

/**
 * @brief return the number of log entries.
 *
 * @result the number of log entries.
 */
int mpiuse_nr_logs(void) { return mpiuse_log_count; }

/**
 * @brief return the number of ranks in log.
 *
 * @result the number of ranks we've seen.
 */
int mpiuse_nr_ranks(void) { return mpiuse_max_rank + 1; }

/**
 * @brief get a log entry.
 *
 * @param ind the index of the entry required.
 * @result NULL if not available.
 */
struct mpiuse_log_entry *mpiuse_get_log(int ind) {

  if (ind < mpiuse_log_count && ind >= 0) return &mpiuse_log[ind];
  return NULL;
}

/**
 * @brief return random number from a upper part of gaussian distribution.
 *
 * @result the random.
 */
static double gauss_rand_upper(void) {

  double V1, V2, S;
  do {
    double U1 = drand48();
    double U2 = drand48();

    V1 = U1 - 1.0;
    V2 = U2 - 1.0;
    S = V1 * V1 + V2 * V2;
  } while (S >= 1.0 || S == 0.0);

  return fabs(V1 * sqrt(-2.0 * log(S) / S));
}

/**
 * @brief generate a list of fake exchanges as mpiuse logs.
 *
 * @param nr_nodes the number of ranks that will be used.
 * @param nr_logs the number of logs to generate per rank.
 * @param size bytes per message, unless random when this is the maximum
 *             and the minimum is 1 for uniform, if using a gaussian
 *             distribution the value is a 2.5 sigma, for a CDF based
 *             selection this is just a scale factor of the values.
 * @param random whether to use random sizes.
 * @param seed the random seed, use same for fixed sequences.
 * @param uniform whether to use a uniform distribution or gaussian, unless
 *                cdf is defined, in which case this parameter is ignored.
 * @param cdf text file containing a normalized CDF to use as a basis for
 *              inverse transform sampling of the randoms. NULL for no file.
 * @param odata text file containing a values representing occurences of the
 *              expected distribution -- converted into a normalised CDF to
 *              use as a basis for inverse transform sampling of the
 *              randoms. NULL for no file. Not used if cdf is not NULL.
 */
void mpiuse_log_generate(int nr_nodes, int nr_logs, int size, int random,
                         long int seed, int uniform, const char *cdf,
                         const char *odata) {

  /* Only used for CDFs, may need to increase these. */
  int nvals = 0;
  double imin[NHIST], imax[NHIST], value[NHIST];

  /* Note that each rank exchanges messages with all the others and each "log"
   * has the same size. */
  /* Set seed. */
  if (random) srand48(seed);

  /* Check for CDF. This should be based on some real distribution, the format
   * is same as output from TOPCAT, i.e. bin-low, bin-high, value space
   * separated values. Note the value column should be normalised into the
   * range 0 to 1 so that it maps into a uniform random distribution. */
  if (cdf != NULL) {
    FILE *infile = fopen(cdf, "r");
    if (infile == NULL) error("Failed to open CDF file: %s", cdf);
    char line[132];
    while (!feof(infile)) {
      if (fgets(line, 132, infile) != NULL) {
        if (line[0] != '#') {
          int nread = sscanf(line, "%lf %lf %lf", &imin[nvals], &imax[nvals],
                             &value[nvals]);
          if (nread == 3) nvals++;
        }
      }
    }
    fclose(infile);
  } else if (odata != NULL) {
    double *values;
    int nvalues;
    if (histread(odata, &values, &nvalues)) {
      // printf("## Read %d occurence values from %s\n", nvalues, odata);
      struct histogram *h = calloc(1, sizeof(struct histogram));
      histmake(nvalues, values, h);
      // printf("## Created cumulative histogram with %d values:\n",
      // h->nvalues);  printf("# value sum\n");
      imin[0] = 0.0;
      imax[0] = h->values[0];
      value[0] = h->sums[0];
      for (int k = 1; k < h->nvalues; k++) {
        imin[k] = h->values[k - 1];
        imax[k] = h->values[k];
        value[k] = h->sums[k];
        // printf("%f %24.17g\n", h->values[k], h->sums[k]);
      }
      nvals = h->nvalues;

      free(h);
      free(values);
    } else {
      error("Failed to read occurrence data from file: %s", odata);
    }
  }

  /* Message tags increment with across rank logs. */
  int tag = 1;
  for (int k = 0; k < nr_logs; k++) {

    /* Set size for this messages. */
    double logsize = size;
    if (random) {
      if (cdf || odata) {
        /* CDF based randoms. */
        double rand = drand48();

        /* Binary search for containing bin for this rand. */
        unsigned int lower = 0;
        unsigned int upper = nvals;
        unsigned int middle = 0;
        while (lower < upper) {
          middle = (upper + lower) / 2;
          if (rand > value[middle])
            lower = middle + 1;
          else
            upper = middle;
        }
        logsize = 0.5 * (imax[middle] + imin[middle]);

      } else if (uniform) {
        /* Uniform randoms in the range 0 to 1 */
        logsize = (drand48() * (double)size) + 1;
      } else {
        // Gaussian randoms so no maximum, assume size is 2.5 sigma.
        logsize = (gauss_rand_upper() * (double)size * 0.25) + 1;
      }
    }

    /* Cannot send more than 2^31-1 bytes at a time, so truncate. */
    if (logsize > 2147483647.0) {
      message("CDF size too large : %f, truncating", logsize);
      logsize = 2147483647.0;
    }
    for (int i = 0; i < nr_nodes; i++) {
      for (int j = 0; j < nr_nodes; j++) {
        if (i != j) {
          mpiuse_log_allocation(i, 1, k, SEND_TYPE, NO_SUBTYPE, 1, logsize, j,
                                tag);
          mpiuse_log_allocation(j, 1, k, RECV_TYPE, NO_SUBTYPE, 1, logsize, i,
                                tag);
        }
      }
    }
    tag++;
  }
}

/**
 * Shuffle log pointers randomizing the order.
 *
 * Note assumes dran48() has been seeded.
 *
 * @param logs the log pointers to shuffle.
 * @param nlogs the number of logs.
 */
void mpiuse_shuffle_logs(struct mpiuse_log_entry **logs, int nlogs) {

  struct mpiuse_log_entry tmp;
  for (int k = nlogs - 1; k > 0; k--) {
    unsigned int j = (unsigned int)(drand48() * (k + 1));
    memcpy(&tmp, &logs[j], sizeof(struct mpiuse_log_entry *));
    memcpy(&logs[j], &logs[k], sizeof(struct mpiuse_log_entry *));
    memcpy(&logs[k], &tmp, sizeof(struct mpiuse_log_entry *));
  }
}