Skip to content
Snippets Groups Projects
swiftmpistepsim.c 18.66 KiB
/*******************************************************************************
 * This file is part of SWIFT.
 * Copyright (c) 2020 Peter W. Draper
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 ******************************************************************************/

#include <limits.h>
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"

/* Number of threads used to partition the requests. Should match the number
 * of progress threads and communicators should be assigned to one? */
#define NTHREADS 2

/* Global: Our rank for all to see. */
int myrank = -1;

/* Are we verbose. */
static int verbose = 0;

/* Attempt to keep original injection time differences. */
static int usetics = 1;

/* Scale to apply to the size of the messages we send. */
static float messagescale = 1.0;

/* Set a data pattern and check we get this back, slow... */
static int datacheck = 0;

/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;

/* Global communicators for each of the subtypes. */
static const int task_subtype_count = 30;  // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[30];

/* The local queues. We need to partition these to keep the MPI communications
 * separate so we can use the MPI_THREAD_SPLIT option of the Intel 2020 MPI
 * library. */
static struct mpiuse_log_entry **volatile reqs_queue[NTHREADS];
static int volatile ind_req[NTHREADS] = {0};
static int volatile nr_reqs[NTHREADS] = {0};
static int volatile injecting[NTHREADS] = {0};
static struct mpiuse_log_entry **volatile recvs_queue[NTHREADS];
static int volatile nr_recvs[NTHREADS] = {0};
static int volatile ind_recv[NTHREADS] = {0};
static int volatile todo_recv[NTHREADS] = {0};
static struct mpiuse_log_entry **volatile sends_queue[NTHREADS];
static int volatile nr_sends[NTHREADS] = {0};
static int volatile ind_send[NTHREADS] = {0};
static int volatile todo_send[NTHREADS] = {0};

/* CPU frequency of the machine that created the MPI log. */
// XXX need to store this in the data file.
static double log_clocks_cpufreq = 2194844448.0;

/**
 * @brief fill a data area with a pattern that can be checked for changes.
 *
 * @param size size of data in bytes.
 * @param data the data to fill.
 */
static void datacheck_fill(size_t size, void *data) {
  unsigned char *p = (unsigned char *)data;
  for (size_t i = 0; i < size; i++) {
    p[i] = 170; /* 10101010 in bits. */
  }
}

/**
 * @brief test a filled data area for our pattern.
 *
 * @param size size of data in bytes.
 * @param data the data to fill.
 *
 * @result 1 on success, 0 otherwise.
 */
static int datacheck_test(size_t size, void *data) {
  unsigned char *p = (unsigned char *)data;
  for (size_t i = 0; i < size; i++) {
    if (p[i] != 170) return 0;
  }
  return 1;
}

/**
 * @brief Injection thread, initiates MPI_Isend and MPI_Irecv requests.
 *
 * The requests are initiated in the time order of the original log and an
 * attempt to start these with the same interval gap is made if usetics is
 * set, otherwise we just do them as quickly as possible.
 */
static void *inject_thread(void *arg) {
  int tid = *((int *)arg);

  if (verbose) message("%d: injection thread starts", tid);
  ticks starttics = getticks();

  /* Ticks of our last attempt and ticks the first loop takes (usetics == 1). */
  ticks basetic = reqs_queue[tid][0]->tic;
  ticks looptics = 0;
  double deadtime = 0.0;
  struct timespec sleep;
  sleep.tv_sec = 0;

  while (ind_req[tid] < nr_reqs[tid]) {
    struct mpiuse_log_entry *log = reqs_queue[tid][ind_req[tid]];

    if (usetics) {
      /* Expect time between this request and the previous one. */
      ticks dt = log->tic - basetic;
      basetic = log->tic;

      /* We guess some time below which we should not attempt to wait,
       * otherwise we'll start to overrun, and just inject the next call if we
       * are below that (we time the ticks this loop takes without any waiting
       * and use that). Otherwise we wait a while. Note we need to convert the
       * ticks of the log file into nanoseconds, that requires the original
       * CPU frequency. */
      if (dt > looptics) {

        /* Remember to be fair and remove the looptics, then convert to
         * nanoseconds. */
        double ns = (double)(dt - looptics) / log_clocks_cpufreq * 1.0e9;
        if (ns < 1.0e9) {
          sleep.tv_nsec = (long)ns;
        } else {
          /* Wait more than one second. Must be an error, but complain and
           * continue.  */
          sleep.tv_nsec = (long)1.0e9;
          message("wait greater than one second");
        }
        nanosleep(&sleep, NULL);
        deadtime += sleep.tv_nsec;
      }
    }

    /* Initialise new log elements. */
    log->done = 0;
    log->nr_tests = 0;
    log->tsum = 0.0;
    log->tmax = 0;
    log->tmin = INT_MAX;
    log->endtic = 0;
    log->injtic = getticks();

    /* Differences to SWIFT: MPI_BYTE not the MPI_Type. */
    int err = 0;
    if (log->type == task_type_send) {
      log->data = calloc(log->size, 1);

      /* Fill data with pattern. */
      if (datacheck) datacheck_fill(log->size, log->data);

      /* And send. */
      err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
                      subtypeMPI_comms[log->subtype], &log->req);

      /* Add a new send request. */
      int ind = atomic_inc(&nr_sends[tid]);
      sends_queue[tid][ind] = log;
      atomic_inc(&todo_send[tid]);

    } else {

      /* Ready to receive. */
      log->data = calloc(log->size, 1);
      err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank, log->tag,
                      subtypeMPI_comms[log->subtype], &log->req);

      /* Add a new recv request. */
      int ind = atomic_inc(&nr_recvs[tid]);
      recvs_queue[tid][ind] = log;
      atomic_inc(&todo_recv[tid]);
    }
    if (err != MPI_SUCCESS) error("Failed to activate send or recv");

    ind_req[tid]++;

    /* Set looptics on the first pass. Assumes MPI_Isend and MPI_Irecv are
     * equally timed. Note we include a nanosleep, they are slow. */
    if (looptics == 0 && usetics) {
      sleep.tv_nsec = 1;
      nanosleep(&sleep, NULL);
      looptics = (getticks() - starttics);
      if (verbose)
        message("%d: injection loop took %.3f %s.", tid,
                clocks_from_ticks(looptics), clocks_getunit());
    }
  }

  /* All done, thread exiting. */
  if (verbose) {
    message("%d: %d injections completed, sends = %d, recvs = %d", tid,
            ind_req[tid], nr_sends[tid], nr_recvs[tid]);
    message("%d: remaining sends = %d, recvs = %d", tid, todo_send[tid],
            todo_recv[tid]);
    if (usetics) message("%d: deadtime %.3f ms", tid, deadtime / 1.0e6);
  }
  message("%d: took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics),
          clocks_getunit());
  atomic_dec(&injecting[tid]);
  return NULL;
}

/**
 * @brief main loop to run over a queue of MPI requests and test for when they
 * complete for a thread. Returns the total amount of time spent in calls to
 * MPI_Test and the number of times it was called.
 *
 * @param logs the list of logs pointing to requests.
 * @param nr_logs pointer to the variable containing the current number of
 *                logs.
 * @param todos pointer to the variable containing the number of requests that
 *              are still active.
 * @param injecting pointer to the variable indicating if there are still some
 *                  injections.
 * @param sum the total number of ticks spent in calls to MPI_Test.
 * @param ncalls the total number of calls to MPI_Test.
 * @param mint the minimum ticks an MPI_Test call took.
 * @param maxt the maximum ticks an MPI_Test call took.
 */
static void queue_runner(struct mpiuse_log_entry **logs, int volatile *nr_logs,
                         int volatile *todos, int volatile *injecting,
                         double *sum, int *ncalls, ticks *mint, ticks *maxt) {

  /* Global MPI_Test statistics. */
  int lncalls = 0;
  double lsum = 0.0;
  ticks lmint = INT_MAX;
  ticks lmaxt = 0;

  /* We loop while new requests are being injected and we still have requests
   * to complete. */
  while (*injecting || (!*injecting && *todos > 0)) {
    int nlogs = *nr_logs;
    for (int k = 0; k < nlogs; k++) {
      struct mpiuse_log_entry *log = logs[k];
      if (log != NULL && !log->done) {
        ticks tics = getticks();
        int res;
        MPI_Status stat;
        int err = MPI_Test(&log->req, &res, &stat);
        if (err != MPI_SUCCESS) {
          error("MPI_Test call failed");
        }

        /* Increment etc. of statistics about time in MPI_Test. */
        ticks dt = getticks() - tics;
        log->tsum += (double)dt;
        lsum += (double)dt;

        log->nr_tests++;
        lncalls++;

        if (dt < log->tmin) log->tmin = dt;
        if (dt > log->tmax) log->tmax = dt;
        if (dt < lmint) lmint = dt;
        if (dt > lmaxt) lmaxt = dt;

        if (res) {
          /* Check data sent data is unchanged and received data is as
           * expected. */
          if (datacheck && !datacheck_test(log->size, log->data)) {
            error("Data mismatch on completion");
          }

          /* Done, clean up. */
          log->done = 1;
          log->endtic = getticks();
          free(log->data);
          atomic_dec(todos);
        }
      }
    }
  }

  /* All done. */
  *sum = lsum;
  *ncalls = lncalls;
  *mint = lmint;
  *maxt = lmaxt;
  return;
}

/**
 * @brief  A send thread, checks if MPI_Isend requests have completed.
 */
static void *send_thread(void *arg) {
  int tid = *((int *)arg);

  if (verbose) message("%d: send thread starts (%d)", tid, injecting[tid]);
  ticks starttics = getticks();

  int ncalls;
  double sum;
  ticks mint;
  ticks maxt;
  queue_runner(sends_queue[tid], &nr_sends[tid], &todo_send[tid],
               &injecting[tid], &sum, &ncalls, &mint, &maxt);

  message(
      "%d: %d MPI_Test calls took: %.3f, mean time %.3f, min time %.3f, "
      "max time %.3f (%s)",
      tid, ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
      clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
  if (verbose)
    message("%d: took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics),
            clocks_getunit());

  /* Thread exits. */
  return NULL;
}

/**
 * @brief A recv thread, checks if MPI_Irecv requests have completed.
 */
static void *recv_thread(void *arg) {
  int tid = *((int *)arg);

  if (verbose) message("%d: recv thread starts (%d)", tid, injecting[tid]);
  ticks starttics = getticks();

  int ncalls;
  double sum;
  ticks mint;
  ticks maxt;
  queue_runner(recvs_queue[tid], &nr_recvs[tid], &todo_recv[tid],
               &injecting[tid], &sum, &ncalls, &mint, &maxt);

  message(
      "%d: %d MPI_Test calls took: %.3f, mean time %.3f, "
      "min time %.3f, max time %.3f (%s)",
      tid, ncalls, clocks_from_ticks(sum), clocks_from_ticks(sum / ncalls),
      clocks_from_ticks(mint), clocks_from_ticks(maxt), clocks_getunit());
  if (verbose)
    message("%d: took %.3f %s.", tid, clocks_from_ticks(getticks() - starttics),
            clocks_getunit());

  /* Thread exits. */
  return NULL;
}

/**
 * @brief Comparison function for logged times.
 */
static int cmp_logs(const void *p1, const void *p2) {
  struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
  struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;

  /* Large unsigned values, so take care. */
  if (l1->tic > l2->tic) return 1;
  if (l1->tic < l2->tic) return -1;
  return 0;
}

/**
 * @brief Pick out the relevant logging data for our rank, i.e. all
 * activations of sends and recvs and split into NTHREADS parts.
 * We ignore the original completions. The final lists are sorted into
 * increasing time of activation.
 */
static void pick_logs(void) {
  size_t nlogs = mpiuse_nr_logs();

  /* Duplicates of logs. */
  int size = nlogs / NTHREADS + 1;
  for (int k = 0; k < NTHREADS; k++) {
    reqs_queue[k] = (struct mpiuse_log_entry **)calloc(
        size, sizeof(struct mpiuse_log_entry *));
    nr_reqs[k] = 0;
    sends_queue[k] = (struct mpiuse_log_entry **)calloc(
        size, sizeof(struct mpiuse_log_entry *));
    nr_sends[k] = 0;
    recvs_queue[k] = (struct mpiuse_log_entry **)calloc(
        size, sizeof(struct mpiuse_log_entry *));
    nr_recvs[k] = 0;
  }

  /* Index of the thread logs to update. */
  int nt = 0;
  int nr_logs = 0;
  for (int k = 0; k < nlogs; k++) {
    struct mpiuse_log_entry *log = mpiuse_get_log(k);
    if (log->rank == myrank && log->activation) {
      if (log->type == task_type_send || log->type == task_type_recv) {

        /* Scale size. */
        log->size *= messagescale ;

        /* Allocate space for data. */
        log->data = calloc(log->size, 1);

        /* And keep this log. */
        log->data = NULL;
        reqs_queue[nt][nr_reqs[nt]] = log;
        nr_reqs[nt]++;
        nt++;
        if (nt == NTHREADS) nt = 0;
        nr_logs++;

      } else {
        error("task type '%d' is not a known send or recv task", log->type);
      }
    }
  }
  message("Read %d active send and recv logs from input file", nr_logs);

  /* Sort into increasing time. */
  for (int j = 0; j < NTHREADS; j++) {
    qsort(reqs_queue[j], nr_reqs[j], sizeof(struct mpiuse_log_entry *),
          cmp_logs);

    /* Check. */
    for (int k = 0; k < nr_reqs[j] - 1; k++) {
      if (reqs_queue[j][k]->tic > reqs_queue[j][k + 1]->tic)
        message("reqs_queue: %lld > %lld", reqs_queue[j][k]->tic,
                reqs_queue[j][k + 1]->tic);
    }
  }
}

/**
 * @brief usage help.
 */
static void usage(char *argv[]) {
  fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n",
          argv[0]);
  fprintf(stderr, " options: -v verbose, -f fast injections, "
                  "-s message size (bytes)\n");
  fflush(stderr);
}

/**
 * @brief main function.
 */
int main(int argc, char *argv[]) {

  /* Initiate MPI. */
  int prov = 0;
  int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
  if (res != MPI_SUCCESS)
    error("Call to MPI_Init_thread failed with error %i.", res);

  int nr_nodes = 0;
  res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
  if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);

  res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
  if (res != MPI_SUCCESS)
    error("Call to MPI_Comm_rank failed with error %i.", res);

  /* Handle the command-line, we expect a mpiuse data file to read and various
   * options. */
  int opt;
  while ((opt = getopt(argc, argv, "vfds:")) != -1) {
    switch (opt) {
      case 'd':
        datacheck = 1;
        break;
      case 'f':
        usetics = 0;
        break;
      case 'v':
        verbose = 1;
        break;
      case 's':
        messagescale = atof(optarg);
        break;
      default:
        if (myrank == 0) usage(argv);
        return 1;
    }
  }
  if (optind >= argc - 1) {
    if (myrank == 0) usage(argv);
    return 1;
  }
  char *infile = argv[optind];
  char *logfile = argv[optind + 1];

  /* Initialize time for messages and timers. */
  clocks_set_cpufreq(0);

  /* Now we read the SWIFT MPI logger output that defines the communcations
   * we will undertake and the time differences between injections into the
   * queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
   * log per rank per step, so you need to combine all ranks from a step. */
  mpiuse_log_restore(infile);
  int nranks = mpiuse_nr_ranks();

  /* This should match the expected size. */
  if (nr_nodes != nranks)
    error("The number of MPI ranks %d does not match the expected value %d",
          nranks, nr_nodes);

  /* Create communicators for each subtype, these are associated with the
   * threads somewhat randomly, we should seek to balance the work. */
  for (int i = 0; i < task_subtype_count; i++) {
    MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
    char str[8] = {0};
    if (i == 11) {
      sprintf(str, "%d", 0); // XXX fudge for xv
    } else {
      sprintf(str, "%d", i % NTHREADS);
    }
    MPI_Info info;
    MPI_Info_create(&info);
    MPI_Info_set(info, "thread_id", str);
    MPI_Comm_set_info(subtypeMPI_comms[i], info);
    message("subtype %d assigned to thread_id %s", i, str);
    MPI_Info_free(&info);
  }

  /* Each rank requires its own queue, so extract them. */
  pick_logs();

  /* Try to make it synchronous across the ranks and reset time to zero. */
  MPI_Barrier(MPI_COMM_WORLD);
  clocks_set_cpufreq(0);
  if (myrank == 0) {
    message("Start of MPI tests");
    message("==================");
    if (messagescale != 1.0f) {
      message(" ");
      message("  Using message scale of %f", messagescale);
    }
    if (verbose) {
      if (!usetics) message("using fast untimed injections");
      if (datacheck)
        message("checking data pattern on send and recv completion");
    }
  }

  /* Make three batches of NTHREADS threads, one batch for injecting tasks and
   * two batches to check for completions of the sends and recv
   * independently. */
  pthread_t injectthread[NTHREADS];
  pthread_t sendthread[NTHREADS];
  pthread_t recvthread[NTHREADS];
  static int tids[NTHREADS];
  for (int j = 0; j < NTHREADS; j++) {
    tids[j] = j;
    injecting[j] = 1;
    if (pthread_create(&injectthread[j], NULL, &inject_thread, &tids[j]) != 0)
      error("Failed to create injection thread.");
    if (pthread_create(&sendthread[j], NULL, &send_thread, &tids[j]) != 0)
      error("Failed to create send thread.");
    if (pthread_create(&recvthread[j], NULL, &recv_thread, &tids[j]) != 0)
      error("Failed to create recv thread.");
  }

  /* Wait until all threads have exited and all MPI requests have completed. */
  for (int j = 0; j < NTHREADS; j++) {
    pthread_join(injectthread[j], NULL);
    pthread_join(sendthread[j], NULL);
    pthread_join(recvthread[j], NULL);
  }

  /* Dump the updated MPI logs. */
  MPI_Barrier(MPI_COMM_WORLD);
  fflush(stdout);
  if (myrank == 0) message("Dumping updated log");
  mpiuse_dump_logs(nranks, logfile);

  /* Shutdown MPI. */
  res = MPI_Finalize();
  if (res != MPI_SUCCESS)
    error("call to MPI_Finalize failed with error %i.", res);

  if (myrank == 0) message("Bye");

  return 0;
}