Skip to content
Snippets Groups Projects
swiftmpiproxies.c 12.96 KiB
/*******************************************************************************
 * This file is part of SWIFT.
 * Copyright (c) 2020 Peter W. Draper
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 ******************************************************************************/

#include <limits.h>
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"

/* Global: Our rank for all to see. */
int myrank = -1;

/* Are we verbose. */
static int verbose = 0;

/* Maximum main loops. */
static int maxloops = 1000;

/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 23;
static const int task_type_recv = 24;

static const int task_subtype_count = 29;
static const int task_subtype_pcells = 35;

/* Proxy tag arithmetic. From proxy.h, must match log. */
#define proxy_tag_shift 8
#define proxy_tag_count 0
#define proxy_tag_cells 6

/* Our queues of communications. Need two to separate out the pcell sends and
 * recvs. */
struct mpiuse_log_entry **send_pcells;
int nr_send_pcells = 0;
struct mpiuse_log_entry **recv_pcells;
int nr_recv_pcells = 0;

/**
 * @brief fill a data area with a pattern that can be checked for changes.
 *
 * @param fill value used in fill, note data type.
 * @param size size of data in bytes.
 * @param data the data to fill.
 */
static void datacheck_fill(unsigned char fill, size_t size, void *data) {
  unsigned char *p = (unsigned char *)data;
  for (size_t i = 0; i < size; i++) {
    p[i] = fill;
  }
}

/**
 * @brief test a filled data area for the given value. Returns 0 if not found
 * in all elements of data.
 *
 * @param fill value used in fill, note data type.
 * @param size size of data in bytes.
 * @param data the data to fill.
 *
 * @result 1 on success, 0 otherwise.
 */
static int datacheck_test(unsigned char fill, size_t size, void *data) {
  unsigned char *p = (unsigned char *)data;
  for (size_t i = 0; i < size; i++) {
    if (p[i] != fill) {
      if (verbose) {
        message("%d != %d", p[i], fill);
        fflush(stdout);
      }
      return 0;
    }
  }
  return 1;
}

/**
 * @brief check a data area reporting some statistics about the content.
 *
 * Assumes datacheck_test() has already failed.
 *
 * @param size size of data in bytes.
 * @param data the data to fill.
 */
static void datacheck_fulltest(size_t size, void *data) {

  unsigned char *p = (unsigned char *)data;
  double sum = 0.0;
  unsigned char pmin = 255;
  unsigned char pmax = 0;
  for (size_t i = 0; i < size; i++) {
    sum += p[i];
    if (p[i] > pmax) pmax = p[i];
    if (p[i] < pmin) pmin = p[i];
  }
  message("sum: %.2f, mean: %.2f, min: %d, max: %d", sum, sum / (double)size,
          pmin, pmax);
}

/**
 * @brief Pick out the relevant logging data for our rank, i.e. all
 * activations of sends and recvs. We ignore the original completion logs,
 * those are not relevant.
 */
static void pick_logs(void) {
  size_t nlogs = mpiuse_nr_logs();
  if (verbose) message("Restored %zd logs", nlogs);

  /* Separated logs. */
  send_pcells = (struct mpiuse_log_entry **)calloc(
      nlogs, sizeof(struct mpiuse_log_entry *));
  nr_send_pcells = 0;
  recv_pcells = (struct mpiuse_log_entry **)calloc(
      nlogs, sizeof(struct mpiuse_log_entry *));
  nr_recv_pcells = 0;

  for (int k = 0; k < nlogs; k++) {
    struct mpiuse_log_entry *log = mpiuse_get_log(k);
    if (log->rank == myrank && log->activation) {
      log->data = NULL;
      if (log->type == task_type_send) {

        if (log->subtype == task_subtype_pcells) {
          send_pcells[nr_send_pcells] = log;
          nr_send_pcells++;
        } else if (log->subtype != task_subtype_pcells &&
                   log->subtype != task_subtype_count) {
          error("task subtype '%d' is not a known value", log->subtype);
        }

      } else if (log->type == task_type_recv) {

        if (log->subtype == task_subtype_pcells) {
          recv_pcells[nr_recv_pcells] = log;
          nr_recv_pcells++;
        } else if (log->subtype != task_subtype_pcells &&
                   log->subtype != task_subtype_count) {
          error("task subtype '%d' is not a known value", log->subtype);
        }

      } else {
        error("task type '%d' is not a known send or recv task", log->type);
      }
    }
  }
  if (verbose)
    message("Read %d send and %d recv pcells logs", nr_send_pcells,
            nr_recv_pcells);
}

/**
 * @brief usage help.
 */
static void usage(char *argv[]) {
  fprintf(stderr, "Usage: %s [-vn] SWIFT_mpiuse-log-file.dat\n", argv[0]);
  fprintf(stderr, " options: -n maxloops, -v verbose\n");
  fflush(stderr);
}

/**
 * @brief main function.
 */
int main(int argc, char *argv[]) {

  /* Initiate MPI. */
  int prov = 0;
  int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
  if (res != MPI_SUCCESS)
    error("Call to MPI_Init_thread failed with error %i.", res);

  int nr_nodes = 0;
  res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
  if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);

  res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
  if (res != MPI_SUCCESS)
    error("Call to MPI_Comm_rank failed with error %i.", res);

  /* Handle the command-line, we expect a mpiuse data file to read and
   * various options. */
  int opt;
  while ((opt = getopt(argc, argv, "vn:")) != -1) {
    switch (opt) {
      case 'n':
        maxloops = atoi(optarg);
        break;
      case 'v':
        verbose = 1;
        break;
      default:
        if (myrank == 0) usage(argv);
        return 1;
    }
  }
  if (optind >= argc) {
    if (myrank == 0) usage(argv);
    return 1;
  }
  char *infile = argv[optind];

  /* Start time across the ranks. */
  MPI_Barrier(MPI_COMM_WORLD);
  clocks_set_cpufreq(0);

  /* Now we read the SWIFT MPI logger output that defines the communcations we
   * will undertake. Note this has all ranks for a single step, SWIFT outputs
   * one MPI log per rank per step, so you need to combine all ranks from a
   * step. */
  mpiuse_log_restore(infile);
  int nranks = mpiuse_nr_ranks();

  /* This should match the expected size. */
  if (nr_nodes != nranks)
    error("The number of MPI ranks %d does not match the expected value %d",
          nranks, nr_nodes);

  /* Each rank has its own queues of requests, so extract them. */
  pick_logs();

  /* And run our version of the proxy exchanges. */
  MPI_Request req_send_counts[nr_send_pcells];
  MPI_Request req_recv_counts[nr_send_pcells];
  MPI_Request req_pcells_out[nr_send_pcells];
  int pcells_size[nr_send_pcells];

  /* Loop over next section a number of times to simulate rebuild steps in
   * SWIFT. */
  for (int nloop = 0; nloop < maxloops; nloop++) {
    MPI_Barrier(MPI_COMM_WORLD);
    if (myrank == 0)
      message("*** Proxy simulation exchange loop: %d ***", nloop);

    /* Note in SWIFT we use the threadpool to launch these. */
    for (int k = 0; k < nr_send_pcells; k++) {
      struct mpiuse_log_entry *log = send_pcells[k];

      /* Need to regenerate the tags for each other communication type. */
      int basetag = myrank * proxy_tag_shift;

      /* Start Isend counts of pcells. Really just the size of the buffer
       * we're about to send, SWIFT sends the count. */

      res = MPI_Isend(&log->size, 1, MPI_INT, log->otherrank,
                      basetag + proxy_tag_count, MPI_COMM_WORLD,
                      &req_send_counts[k]);
      if (res != MPI_SUCCESS) error("Counts MPI_Isend failed.");

      /* Start Isend of pcells, fill with our rank value. */
      log->data = calloc(log->size, 1);
      datacheck_fill(myrank, log->size, log->data);

      res = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank,
                      basetag + proxy_tag_cells, MPI_COMM_WORLD,
                      &req_pcells_out[k]);
      if (res != MPI_SUCCESS) error("Pcell MPI_Isend failed.");

      /* Start Irecv counts of pcells from other rank. */
      basetag = log->otherrank * proxy_tag_shift;
      res = MPI_Irecv(&pcells_size[k], 1, MPI_INT, log->otherrank,
                      basetag + proxy_tag_count, MPI_COMM_WORLD,
                      &req_recv_counts[k]);
      if (res != MPI_SUCCESS) error("Counts MPI_Irecv failed.");
    }
    if (verbose)
      message("All counts requests and pcell sends are launched");

    /* Now wait for any of the counts irecvs to complete and then create the
     * irecv for the pcells. */
    void *pcells_in[nr_send_pcells];
    MPI_Request req_pcells_in[nr_send_pcells];
    for (int k = 0; k < nr_send_pcells; k++) {
      int pid = MPI_UNDEFINED;
      MPI_Status status;

      res = MPI_Waitany(nr_send_pcells, req_recv_counts, &pid, &status);
      if (res != MPI_SUCCESS || pid == MPI_UNDEFINED)
        error("MPI_Waitany failed.");
      if (verbose) message("Counts received for proxy %d", pid);

      struct mpiuse_log_entry *log = send_pcells[pid];
      int basetag = log->otherrank * proxy_tag_shift;

      pcells_in[pid] = calloc(pcells_size[pid], 1);

      /* Fill data with our rank. */
      datacheck_fill(myrank, pcells_size[pid], pcells_in[pid]);

      res = MPI_Irecv(pcells_in[pid], pcells_size[pid], MPI_BYTE,
                      log->otherrank, basetag + proxy_tag_cells, MPI_COMM_WORLD,
                      &req_pcells_in[pid]);
      if (res != MPI_SUCCESS) error("Pcell MPI_Irecv failed.");
    }
    if (verbose)
      message("All proxy cell counts have arrived, pcells irecvs are launched");

    /* Waitall for all Isend counts to complete. */

    res = MPI_Waitall(nr_send_pcells, req_send_counts, MPI_STATUSES_IGNORE);
    if (res != MPI_SUCCESS) error("Waitall for counts Isend failed.");
    if (verbose)
      message("All sends of counts have completed");

    /* Now wait for the pcell irecvs to complete, so we receive the pcells,
     * which would be unpacked in SWIFT. */
    for (int k = 0; k < nr_send_pcells; k++) {
      int pid = MPI_UNDEFINED;
      MPI_Status status;

      res = MPI_Waitany(nr_send_pcells, req_pcells_in, &pid, &status);
      if (res != MPI_SUCCESS || pid == MPI_UNDEFINED)
        error("MPI_Waitany failed.");

      /* Check the data received is correct. Should be filled with
       * rank of sender. */
      if (!datacheck_test(status.MPI_SOURCE, pcells_size[pid],
                          pcells_in[pid])) {
        message("Received data is not correct");

        /* Report the tag and source of the request. */
        int expected_tag =
            status.MPI_SOURCE * proxy_tag_shift + proxy_tag_cells;
        message("sent from rank %d, with tag %d/%d and error code %d",
                status.MPI_SOURCE, status.MPI_TAG, expected_tag,
                status.MPI_ERROR);

        /* Shouldn't happen, but has been seen. */
        if (status.MPI_ERROR != MPI_SUCCESS)
          mpi_error_string(status.MPI_ERROR, "unexpected MPI status");

        /* Make a report on what the buffer contains. */
        datacheck_fulltest(pcells_size[pid], pcells_in[pid]);

        /* This call will succeed, if the receive buffer has not been
         * updated. */
        if (datacheck_test(myrank, pcells_size[pid], pcells_in[pid])) {
          message("Received data buffer has not been modified");
          fflush(stdout);
          error("Failed");
        } else {
          message("Received data is corrupt");
          fflush(stdout);
          error("Failed");
        }
      } else {
        if (verbose) message("Received data is correct");
      }
      free(pcells_in[pid]);
      pcells_in[pid] = NULL;
    }
    if (verbose)
      message("All proxy cells have arrived");

    /* Waitall for Isend of pcells to complete. */
    res = MPI_Waitall(nr_send_pcells, req_pcells_out, MPI_STATUSES_IGNORE);
    if (res != MPI_SUCCESS) error("Waitall for pcells Isend failed.");
    if (verbose)
      message("All sends of pcells have completed");

    /* Check data is unmodified while being offloaded. */
    for (int k = 0; k < nr_send_pcells; k++) {
      struct mpiuse_log_entry *log = send_pcells[k];
      if (!datacheck_test(myrank, log->size, log->data)) {
        datacheck_fulltest(log->size, log->data);
        error("Sent data has been corrupted");
      } else {
        if (verbose) message("Sent data is correct");
      }
      free(log->data);
      log->data = NULL;
    }

  } /* nloop */

  /* Shutdown MPI. */
  res = MPI_Finalize();
  if (res != MPI_SUCCESS)
    error("call to MPI_Finalize failed with error %i.", res);

  if (myrank == 0) message("All done, no errors detected");

  return 0;
}