Skip to content
Snippets Groups Projects
swiftmpirdmastepsim.c 22.36 KiB
/*******************************************************************************
 * This file is part of SWIFT.
 * Copyright (c) 2020 Peter W. Draper
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 ******************************************************************************/

// Fully one sided approach with passive target communication. This means only
// the sending side updates the window buffer and since we have threaded
// access we can only use flushes with a shared lock that is permanently open
// to move data. The send side has no window, as it only pushes data.
//
// So each rank needs a receive window that has room for all the expected
// sends, plus an additional element for controlling the readyness of the data
// (this is an atomic send that should be guaranteed to only be updated after
// the send of the main data).
//
// In this implementation the size of the receive buffer per rank is just the
// sum of all the messages we know we are about to get. The order of that
// buffer is determined by the rank and tags, which gives us a list of offsets
// into the buffer mapped by the ranktag, which we need to share with any rank
// that is expected to send us data. We'll send this data using normal MPI
// (could be done either as another extension into the window, which we get,
// but we'd need to synchronize that across all ranks, or we could use the
// global communicator to share this in a similar fashion).
//
// XXX ranktag per subtype is not unique we need to include the size.

#include <limits.h>
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"

/* Maximum number of communicator windows per rank, for SWIFT this is the
 * number of subtypes. */
#define task_subtype_count 22

/* 3D index of array. */
#define INDEX3(nx, ny, x, y, z) (nx * ny * z + nx * y + x)

/* 2D index of array. */
#define INDEX2(nx, x, y) (nx * y + x)

/* Our rank for all to see. */
int myrank = -1;

/* Number of ranks. */
static int nr_ranks;

/* Maximum no. of messages (logs). */
static size_t max_logs = 0;

/* Flags for controlling access. */
static size_t UNLOCKED = (((size_t)2 << 63) - 1);

/* Size of a block of memory. All addressible memory chunks need to be a
 * multiple of this as as we need to align sends in memory. */
#define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
static const int BYTESINBLOCK = sizeof(BLOCKTYPE);

/* Size of message header in blocks. The flag, size and tag. Note size and tag
 * are just for sanity checks. The flag value controls access to the main data
 * areas. */
static const size_t HEADER_SIZE = 3;

/* Are we verbose. */
static int verbose = 0;

/* Set a data pattern and check we get this back, slow... */
static int datacheck = 0;

/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;

/* Global ommunicators for each of the subtypes. */
static MPI_Comm subtypeMPI_comms[task_subtype_count];

/* And the windows for one-sided communications. */
static MPI_Win mpi_window[task_subtype_count];
static BLOCKTYPE *mpi_ptr[task_subtype_count] = {NULL};

/* Offsets of the ranktag regions within the windows and lists of the
 * assocated tags. */
static size_t ranktag_sizes[task_subtype_count] = {0};
static size_t *ranktag_counts;
static size_t *ranktag_offsets;
;
static size_t *ranktag_lists;

/* The local send queue. */
static struct mpiuse_log_entry **volatile send_queue;
static int volatile nr_send = 0;
static int volatile todo_send = 0;

/* The local receive queue. */
static struct mpiuse_log_entry **volatile recv_queue;
static int volatile nr_recv = 0;
static int volatile todo_recv = 0;

/**
 * @brief Convert ranks and tag into a single unique value.
 *
 * Assumes there is enough space in a size_t for these values.
 *
 * @param sendrank the send rank
 * @param recvrank the receive rank
 * @param tag the tag
 *
 * @result a unique value based on both values
 */
static size_t toranktag(int sendrank, int recvrank, int tag) {
  
  int shift = (sizeof(int) * 8) - __builtin_clz(nr_ranks); /* XXX could precalc. */
  //message("nr_ranks = %d, shift = %d", nr_ranks, shift);
  size_t result = sendrank | recvrank << shift | tag << (shift * 2);
  return result;
}
static char *tokey(struct mpiuse_log_entry *log) {
  static char buf[256];
  sprintf(buf, "%d/%d/%d/%zd on %d ranktags %zd/%zd", 
          log->otherrank, log->tag, log->subtype, log->size, log->rank,
          toranktag(log->rank, log->otherrank, log->tag), toranktag(log->rank, log->otherrank, log->tag));
  return buf;
}

/**
 * @brief Convert a byte count into a number of blocks, rounds up.
 *
 * @param nr_bytes the number of bytes.
 *
 * @result the number of blocks needed.
 */
static int toblocks(BLOCKTYPE nr_bytes) {
  return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK;
}

/**
 * @brief Convert a block count into a number of bytes.
 *
 * @param nr_block the number of blocks.
 *
 * @result the number of bytes.
 */
static BLOCKTYPE tobytes(int nr_blocks) { return (nr_blocks * BYTESINBLOCK); }

/**
 * @brief fill a data area with our rank.
 *
 * @param size size of data in bytes.
 * @param data the data to fill.
 */
static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
  for (BLOCKTYPE i = 0; i < size; i++) {
    data[i] = myrank;
  }
}

/**
 * @brief test a filled data area for a value.
 *
 * @param size size of data in bytes.
 * @param data the data to check.
 * @param rank the value to, i.e. original rank.
 *
 * @result 1 on success, 0 otherwise.
 */
static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) {
  for (size_t i = 0; i < size; i++) {
    if (data[i] != (size_t)rank) {
      message("see %zd expected %d @ %zd", data[i], rank, i);
      return 0;
    }
  }
  return 1;
}

/**
 * @brief Send thread, sends messages to other ranks one-by-one with the
 * correct offsets into the remote windows.
 *
 * Messages are all considered in order, regardless of the subtype.
 */
static void *send_thread(void *arg) {

  message("%d: send thread starts with %d messages", *((int *)arg), nr_send);
  ticks starttics = getticks();
  for (int k = 0; k < nr_send; k++) {
    struct mpiuse_log_entry *log = send_queue[k];
    if (log == NULL) error("NULL send message queued (%d/%d)", k, nr_send);

    /* Data has the actual data and room for the header. */
    BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE;
    BLOCKTYPE *dataptr = calloc(datasize, BYTESINBLOCK);
    log->data = dataptr;

    /* Fill data with pattern. */
    if (datacheck) datacheck_fill(datasize, dataptr);

    /* And define header; dataptr[0] can be any value except UNLOCKED. */
    dataptr[0] = 0;
    dataptr[1] = log->size;
    dataptr[2] = log->tag;

    /* Need to find the offset for this data in the remotes window. We match
     * subtype, tag and rank. Need to search the ranktag_lists for our ranktag
     * value. XXX bisection search XXX */
    size_t ranktag = toranktag(log->rank, log->otherrank, log->tag);
    //message("looking for %s", tokey(log));
    size_t counts = ranktag_counts[INDEX2(task_subtype_count, log->subtype,
                                          log->otherrank)];
    size_t offset = 0;
    int found = 0;
    struct mpiuse_log_entry *keeplog = NULL;

    counts = max_logs;

    for (size_t j = 0; j < counts; j++) {
      if (ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype,
                               log->otherrank, j)] == ranktag) {
        if (found) message("duplicate %zd: %s of %s", ranktag, tokey(log), tokey(keeplog));
        offset = ranktag_offsets[INDEX3(task_subtype_count, nr_ranks,
                                        log->subtype, log->otherrank, j)];
        keeplog = log;
        //message("%d/%d located offset %zd one of %zd ranktag: %zd", log->otherrank,
        //        log->subtype, offset, counts, ranktag);
        found = 1;
        //break;
      }
    }
    if (!found) {
      fflush(stdout); fflush(stderr);
      error("Failed sending a message of size %zd to %d/%d @ %zd\n, no offset"
            " found for ranktag %zd, counts = %zd", datasize, log->otherrank, log->subtype,
            offset, ranktag, counts);
    }
    //message("Sending a message of size %zd to %d/%d @ %zd", datasize,
    //        log->otherrank, log->subtype, offset);


    /* And send data to other rank. */
    int ret = MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank,
                             offset, datasize, MPI_BLOCKTYPE, MPI_REPLACE,
                             mpi_window[log->subtype]);
    if (ret != MPI_SUCCESS) mpi_error_message(ret, "Failed to accumulate data");

    /* XXX Emergency measure. */
    ret = MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
    if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");

    /* Now we change the first element to UNLOCKED so that the remote end can
     * find out that the data has arrived. */
    BLOCKTYPE newval[1];
    BLOCKTYPE oldval[1];
    newval[0] = UNLOCKED;
    oldval[0] = 0;
    ret =
        MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_BLOCKTYPE,
                             log->otherrank, offset, mpi_window[log->subtype]);

    if (ret != MPI_SUCCESS)
      mpi_error_message(ret, "MPI_Compare_and_swap error");

    ret = MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);
    if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Win_flush failed");

    //if (verbose) {
    //  if (oldval[0] == dataptr[0]) {
    //    message("sent a message to %d/%d (%zd:%zd:%zd @ %zd %d/%d)",
    //            log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
    //            offset, k, nr_send);
    //  } else {
    //    message("failed to send a message to %d/%d (%zd:%zd:%zd) @ %zd %d/%d",
    //            log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
    //            offset, k, nr_send);
    //  }
    //}
    //sleep(1);
  }

  message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
          clocks_getunit());

  return NULL;
}

/**
 * @brief Recv thread, checks for messages in the window from other ranks.
 */
static void *recv_thread(void *arg) {

  message(
      "%d: recv thread starts, checking for %d messages %d "
      "ranks %d communicators",
      *((int *)arg), nr_recv, nr_ranks, task_subtype_count);
  ticks starttics = getticks();

  /* Global statistics. */
  //int lncalls = 0;
  //double lsum = 0.0;
  //ticks lmint = INT_MAX;
  //ticks lmaxt = 0;

  /* No. of receives to process. */
  todo_recv = nr_recv;

  /* We loop while new requests are being send and we still have messages
   * to receive. */
  while (todo_recv > 0) {
    //sleep(5);
    for (int k = 0; k < nr_recv; k++) {
      struct mpiuse_log_entry *log = recv_queue[k];
      if (log != NULL && !log->done) {

        /* Get offset into subtype for this message. */
        size_t offset = log->offset;

        /* Check if that part of the window has been unlocked. */
        BLOCKTYPE *dataptr = &mpi_ptr[log->subtype][offset];
        BLOCKTYPE volatile lock = dataptr[0];
        //message("Checking: %s @ %zd", (dataptr[0] == UNLOCKED?"unlocked":"locked"), offset);
        //if (atomic_cas(&dataptr[0], UNLOCKED, UNLOCKED)) {
        if (lock == UNLOCKED) {
          //message("Checked: %s @ %zd", (dataptr[0] == UNLOCKED?"unlocked":"locked"), offset);

          /* OK, so data should be ready for use, check the tag and size. */
          if ((size_t)log->size == dataptr[1] && (size_t)log->tag == dataptr[2]) {
            if (verbose)
              //message("receive message %d/%d from %d @ offset %zd: "
              //        "dataptr[0] %zd, size %zd/%zd", log->rank, log->subtype,
              //        log->otherrank, offset, dataptr[0], log->size,
              //        toblocks(log->size) + HEADER_SIZE);

            /* Check data sent data is unchanged. */
            if (datacheck) {
              if (!datacheck_test(toblocks(log->size),
                                  &dataptr[HEADER_SIZE], log->otherrank)) {
                fflush(stdout);
                fflush(stderr);
                error("Data mismatch on completion");
              }
            }

            /* Done, clean up. */
            log->done = 1;
            atomic_dec(&todo_recv);
            if (todo_recv == 0) break;

          } else {
            message("bad unlocked message %d/%d from %d @ offset %zd: "
                    "dataptr[0] %zd, size %zd/%zd", log->rank, log->subtype,
                    log->otherrank, offset, dataptr[0], log->size,
                    toblocks(log->size) + HEADER_SIZE);
            error("Unlocked data has incorrect tag or size: %zd/%zd %d/%zd",
                  log->size, dataptr[1], log->tag, dataptr[2]);
          }
        } 
        //else {
        //  message("unlocked message %d/%d from %d @ offset %zd: "
        //          "dataptr[0] %zd, size %zd/%zd", log->rank, log->subtype,
        //          log->otherrank, offset, dataptr[0], log->size,
        //          toblocks(log->size) + HEADER_SIZE);
        //}

        /* Need to allow for some MPI progession. Since we make no MPI calls
         * (by intent receive is a passive target so only the sender should
         * make calls that move data) use a no-op call. */
        int flag = 0;
        int ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag,
                             MPI_STATUS_IGNORE);
        if (ret != MPI_SUCCESS) mpi_error_message(ret, "MPI_Iprobe failed");
      }
    }
  }
  message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
          clocks_getunit());

  /* Thread exits. */
  return NULL;
}

/**
 * @brief Comparison function for ranktags.
 */
static int cmp_ranktags(const void *p1, const void *p2) {
  struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
  struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;

  if (l1->ranktag > l2->ranktag) return 1;
  if (l1->ranktag < l2->ranktag) return -1;
  return 0;
}

/**
 * @brief Comparison function for tags.
 */
static int cmp_tags(const void *p1, const void *p2) {
  struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
  struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;

  if (l1->tag > l2->tag) return 1;
  if (l1->tag < l2->tag) return -1;
  return 0;
}

/**
 * @brief Pick out the relevant logging data for our rank.
 */
static void pick_logs() {

  int nlogs = mpiuse_nr_logs();

  /* Duplicate of logs. Bit large... */
  send_queue = (struct mpiuse_log_entry **)calloc(
      nlogs, sizeof(struct mpiuse_log_entry *));
  nr_send = 0;
  recv_queue = (struct mpiuse_log_entry **)calloc(
      nlogs, sizeof(struct mpiuse_log_entry *));
  nr_recv = 0;

  for (int k = 0; k < nlogs; k++) {
    struct mpiuse_log_entry *log = mpiuse_get_log(k);
    if (log->activation) {
      if (log->rank == myrank) {
        log->done = 0;
        log->data = NULL;
        log->ranktag = toranktag(log->otherrank, log->rank, log->tag);
        //message("add %s", tokey(log));
        if (log->type == task_type_send) {
          send_queue[nr_send] = log;
          nr_send++;
        } else if (log->type == task_type_recv) {
          recv_queue[nr_recv] = log;
          nr_recv++;
        } else {
          error("task type '%d' is not a known send or recv task", log->type);
        }
      }
    }
  }

  /* Sort recv into increasing ranktag and send into tag order (don't
   * want to sort by rank, that would synchronize the sends). */
  qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_ranktags);
  qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_tags);

  /* Offsets and ranktags. */
  ranktag_offsets =
      calloc(task_subtype_count * nr_ranks * max_logs, sizeof(size_t));
  ranktag_lists =
      calloc(task_subtype_count * nr_ranks * max_logs, sizeof(size_t));
  ranktag_counts = calloc(task_subtype_count * nr_ranks, sizeof(size_t));

  /* Setup the ranktag offsets for our receive windows. Also define the sizes
   * of the windows. */
  for (int k = 0; k < nr_recv; k++) {
    struct mpiuse_log_entry *log = recv_queue[k];
    ranktag_lists[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank, k)] = log->ranktag;
    ranktag_offsets[INDEX3(task_subtype_count, nr_ranks, log->subtype, myrank, k)] = ranktag_sizes[log->subtype];
    log->offset = ranktag_sizes[log->subtype];

    /* Need to use a multiple of blocks to keep alignment. */
    size_t size = toblocks(log->size) + HEADER_SIZE;
    //message("%d increment offset by %zd blocks", log->subtype, size);
    ranktag_sizes[log->subtype] += size;

    ranktag_counts[INDEX2(task_subtype_count, log->subtype, myrank)]++;
  }

  if (verbose)
    message("local logs picked, nr_send = %d, nr_recv = %d ", nr_send, nr_recv);
}

/**
 * @brief usage help.
 */
static void usage(char *argv[]) {
  fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n",
          argv[0]);
  fprintf(stderr, " options: -v verbose\n");
  fflush(stderr);
}

/**
 * @brief main function.
 */
int main(int argc, char *argv[]) {

  /* Initiate MPI. */
  int prov = 0;
  int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
  if (res != MPI_SUCCESS)
    error("Call to MPI_Init_thread failed with error %i.", res);

  res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks);
  if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);

  res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
  if (res != MPI_SUCCESS)
    error("Call to MPI_Comm_rank failed with error %i.", res);

  /* Handle the command-line, we expect a mpiuse data file to read and various
   * options. */
  int opt;
  while ((opt = getopt(argc, argv, "vd")) != -1) {
    switch (opt) {
      case 'd':
        datacheck = 1;
        break;
      case 'v':
        verbose = 1;
        break;
      default:
        if (myrank == 0) usage(argv);
        return 1;
    }
  }
  if (optind >= argc - 1) {
    if (myrank == 0) usage(argv);
    return 1;
  }
  char *infile = argv[optind];
  char *logfile = argv[optind + 1];

  /* Now we read the SWIFT MPI logger output that defines the communcations
   * we will undertake and the time differences between injections into the
   * queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
   * log per rank per step, so you need to combine all ranks from a step. */
  mpiuse_log_restore(infile);
  int nranks = mpiuse_nr_ranks();

  /* This should match the expected size. */
  if (nr_ranks != nranks)
    error("The number of MPI ranks %d does not match the expected value %d",
          nranks, nr_ranks);

  /* We all need to agree on a maximum count of logs, so we can exchange the
   * offset arrays (would be ragged otherwise). */
  max_logs = mpiuse_nr_logs() / 2 + 1;
  MPI_Allreduce(MPI_IN_PLACE, &max_logs, 1, MPI_AINT, MPI_MAX, MPI_COMM_WORLD);
  if (verbose) message("maximum log count = %zd", max_logs);

  /* Extract the send and recv messages for our rank. */
  pick_logs();

  /* Now for the one-sided setup... Each rank needs a buffer per communicator
   * with space for all the expected messages. */
  for (int i = 0; i < task_subtype_count; i++) {
      MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
      size_t size = tobytes(ranktag_sizes[i]);
      if (size == 0) size = BYTESINBLOCK;
      MPI_Win_allocate(size, BYTESINBLOCK, MPI_INFO_NULL,
                       subtypeMPI_comms[i], &mpi_ptr[i], &mpi_window[i]);

      memset(mpi_ptr[i], 170, tobytes(ranktag_sizes[i]));


      //if (verbose)
      //  message("Allocated window of size %zd for subtype %d",
      //          ranktag_sizes[i], i);

      /* Assert a shared lock with all the other processes on this
       * window. Needed as we use threads, so cannot lock or unlock as a means
       * of synchronization. */
      MPI_Win_lock_all(MPI_MODE_NOCHECK, mpi_window[i]);
  }
  //message("Windows allocated");
  
  /* We need to share all the offsets for each communucator with all the other
   * ranks so they can push data into the correct parts of our receive
   * window. */
  MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets,
                task_subtype_count * nr_ranks * max_logs, MPI_AINT, MPI_SUM,
                MPI_COMM_WORLD);
  MPI_Allreduce(MPI_IN_PLACE, ranktag_counts, task_subtype_count * nr_ranks,
                MPI_AINT, MPI_SUM, MPI_COMM_WORLD);
  MPI_Allreduce(MPI_IN_PLACE, ranktag_lists,
                task_subtype_count * nr_ranks * max_logs, MPI_AINT, MPI_SUM,
                MPI_COMM_WORLD);
  //message("check: local logs picked, nr_send = %d, nr_recv = %d ", nr_send, nr_recv);

  /* Time to start time. Try to make it synchronous across the ranks. */
  MPI_Barrier(MPI_COMM_WORLD);
  clocks_set_cpufreq(0);
  if (myrank == 0) {
    message("Start of MPI tests");
    message("==================");
    if (verbose) {
      if (datacheck)
        message("checking data pattern on send and recv completion");
    }
  }

  /* Make two threads, one for send and one for receiving. */
  pthread_t sendthread;
  if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
    error("Failed to create send thread.");

  /* XXX could have more than one of these... With a partition of the
   * subtypes. */
  pthread_t recvthread;
  if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
    error("Failed to create recv thread.");

  /* Wait until all threads have exited and all message exchanges have
   * completed. */
  pthread_join(sendthread, NULL);
  pthread_join(recvthread, NULL);

  /* Free the window locks. Once we all arrive. */
  MPI_Barrier(MPI_COMM_WORLD);
  for (int i = 0; i < task_subtype_count; i++) {
    MPI_Win_unlock_all(mpi_window[i]);
    MPI_Win_free(&mpi_window[i]);
  }

  /* Dump the updated MPI logs. */
  fflush(stdout);
  if (myrank == 0) message("Dumping updated log");
  mpiuse_dump_logs(nranks, logfile);

  /* Shutdown MPI. */
  res = MPI_Finalize();
  if (res != MPI_SUCCESS)
    error("call to MPI_Finalize failed with error %i.", res);

  if (myrank == 0) message("Bye");

  return 0;
}