Skip to content
Snippets Groups Projects
swiftmpirdmastepsim.c 15.33 KiB
/*******************************************************************************
 * This file is part of SWIFT.
 * Copyright (c) 2020 Peter W. Draper
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 ******************************************************************************/

//  Simple approach, use the window as a message board, capable of receiving a
//  single message per ranks at a time, so needs to be larger than the largest
//  message, and we need one per of these per rank.
//
//  So we poll all ranks waiting for a push update unlocks its board, we then
//  check for the tag and size, which need to match one of the expected
//  messages, at which point we copy that away and release the board.
//
//  On the send side we work synchronously, sending a message at a time
//  waiting for our board to be unlocked by the receiver.

#include <limits.h>
#include <mpi.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"

/* Our rank for all to see. */
int myrank = -1;

/* Number of ranks. */
static int nr_ranks;

/* Flags for controlling access. */
static int LOCKED = -2;
static int UNLOCKED = -3;

/* Size of a block of memory. MESSAGE_SIZE needs to be a multiple of this as
 * as we need to align in memory. */
#define BLOCKTYPE size_t
#define MPI_BLOCKTYPE MPI_AINT
static const int BYTESINBLOCK = sizeof(BLOCKTYPE);

/* Size of message header in blocks. The flag, size and tag. */
static const size_t HEADER_SIZE = 3;

/* Size of a message board in blocks, we have one of these per rank per
 * communicator (i.e. per window). */
static size_t MESSAGE_SIZE = 0;

/* Are we verbose. */
static int verbose = 0;

/* Set a data pattern and check we get this back, slow... */
static int datacheck = 0;

/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;

/* Global communicators for each of the subtypes. */
#define task_subtype_count 22  // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[task_subtype_count];

/* And the windows for one-sided communications. */
static MPI_Win mpi_window[task_subtype_count];
static BLOCKTYPE *mpi_ptr[task_subtype_count];

/* The local send queue. */
static struct mpiuse_log_entry **volatile send_queue;
static int volatile nr_send = 0;
static int volatile todo_send = 0;

/* The local receive queue. */
static struct mpiuse_log_entry **volatile recv_queue;
static int volatile nr_recv = 0;
static int volatile todo_recv = 0;

/**
 * @brief Convert a byte count into a number of blocks, rounds up.
 *
 * @param nr_bytes the number of bytes.
 *
 * @result the number of blocks needed.
 */
static int toblocks(BLOCKTYPE nr_bytes) {
  return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK;
}

/**
 * @brief Convert a block count into a number of bytes.
 *
 * @param nr_block the number of blocks.
 *
 * @result the number of bytes.
 */
static BLOCKTYPE tobytes(int nr_blocks) { return (nr_blocks * BYTESINBLOCK); }

/**
 * @brief fill a data area with our rank.
 *
 * @param size size of data in bytes.
 * @param data the data to fill.
 */
static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data) {
  for (BLOCKTYPE i = 0; i < size; i++) {
    data[i] = myrank; 
  }
}

/**
 * @brief test a filled data area for a value.
 *
 * @param size size of data in bytes.
 * @param data the data to check.
 * @param rank the value to, i.e. original rank.
 *
 * @result 1 on success, 0 otherwise.
 */
static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, int rank) {
  for (size_t i = 0; i < size; i++) {
    if (data[i] != rank) {
      message("see %zd expected %d @ %zd", data[i], rank, i);
      return 0;
    }
  }
  return 1;
}

/**
 * @brief Send thread, sends messages to other ranks one-by-one.
 *
 * Messages are all considered in order, regardless of the subtype.
 */
static void *send_thread(void *arg) {

  message("%d: send thread starts with %d messages", *((int *)arg), nr_send);
  ticks starttics = getticks();

  for (int k = 0; k < nr_send; k++) {
    struct mpiuse_log_entry *log = send_queue[k];

    /* Data has the actual data and room for the header. */
    BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE;
    BLOCKTYPE *dataptr = calloc(datasize, BYTESINBLOCK);
    log->data = dataptr;

    /* Fill data with pattern. */
    if (datacheck) datacheck_fill(toblocks(log->size),
                                  &dataptr[HEADER_SIZE]);

    /* First element is marked as LOCKED, so only we can update. */
    dataptr[0] = LOCKED;
    dataptr[1] = log->size;
    dataptr[2] = log->tag;

    /* And send data to other rank. */
    int ret = MPI_Accumulate(dataptr, datasize, MPI_BLOCKTYPE, log->otherrank,
                             MESSAGE_SIZE * myrank, datasize, MPI_BLOCKTYPE,
                             MPI_REPLACE, mpi_window[log->subtype]);
    if (ret != MPI_SUCCESS)
      error("Failed to accumulate data: %d", ret);
    
    /* Now we change the last element to UNLOCKED so that the remote end can
     * find out that the data has arrived. */
    BLOCKTYPE newval[1];
    BLOCKTYPE oldval[1];
    newval[0] = UNLOCKED;
    oldval[0] = 0;
    MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0], MPI_BLOCKTYPE,
                         log->otherrank, MESSAGE_SIZE * myrank,
                         mpi_window[log->subtype]);

    MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);

    // if (oldval[0] == dataptr[0]) {
    //  message("sent a message to %d/%d (%zd:%zd:%zd @ %zd)", log->otherrank,
    //          log->subtype, dataptr[0], oldval[0], newval[0],
    //          MESSAGE_SIZE * myrank);
    //} else {
    //  message("failed to send a message to %d/%d (%zd:%zd:%zd) @ %zd",
    //          log->otherrank, log->subtype, dataptr[0], oldval[0], newval[0],
    //          MESSAGE_SIZE * myrank);
    //}

    /* Wait for completion, this is when remote flips back to LOCKED. We poll
     * on a get, as the local window is only used for receiving. Use an Rget
     * so we can use MPI_Test to get some local progression. */
    newval[0] = UNLOCKED;
    while (newval[0] != LOCKED) {
      // MPI_Win_flush(log->otherrank, mpi_window[log->subtype]);

      MPI_Request request;
      MPI_Rget(&newval[0], 1, MPI_BLOCKTYPE, log->otherrank,
               MESSAGE_SIZE * myrank, 1, MPI_BLOCKTYPE,
               mpi_window[log->subtype], &request);
      int flag = 0;
      while (flag == 0) {
        MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
      }
    }

    //message("sent and received... %d/%d/%d", k, nr_send,
    //        ((char *)log->data)[0]);
    /* Ready the next send. */
  }

  message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
          clocks_getunit());

  return NULL;
}

/**
 * @brief Recv thread, checks for messages in the window from other ranks.
 */
static void *recv_thread(void *arg) {

  message("%d: recv thread starts", *((int *)arg));
  ticks starttics = getticks();

  /* Global statistics. */
  int lncalls = 0;
  double lsum = 0.0;
  ticks lmint = INT_MAX;
  ticks lmaxt = 0;

  /* No. of receives to process. */
  todo_recv = nr_recv;

  /* We loop while new requests are being send and we still have messages
   * to receive. */
  while (todo_recv > 0) {
    for (int n = 0; n < nr_ranks; n++) {
      if (todo_recv <= 0) break;
      if (n == myrank) continue;

      for (int j = 0; j < task_subtype_count; j++) {
        if (todo_recv <= 0) break;

        //MPI_Win_flush(n, mpi_window[j]); // XXX emergency measure
        BLOCKTYPE *dataptr = &mpi_ptr[j][n * MESSAGE_SIZE];
        BLOCKTYPE lockval = dataptr[0];

        if (lockval == UNLOCKED) {

          /* We have a message waiting to be handled, find the log. */
          int found = 0;
          for (int k = 0; k < nr_recv; k++) {
            struct mpiuse_log_entry *log = recv_queue[k];
            if (log != NULL && !log->done && log->otherrank == n &&
                log->subtype == j &&
                log->size == dataptr[1] && log->tag == dataptr[2]) {
              found = 1;

              //message("We have a ready message %d/%d at %zd: lockval %zd",
              //        log->rank, log->subtype, n * MESSAGE_SIZE,
              //        lockval);

              /* Check data sent data is unchanged and received data is as
               * expected. */
              if (datacheck &&
                  !datacheck_test(toblocks(log->size),
                                  &dataptr[HEADER_SIZE], n)) {
                message("Data mismatch on completion");
              }

              /* Done, clean up. */
              log->done = 1;
              free(log->data);
              atomic_dec(&todo_recv);

              /* Now ready for next message. */
              dataptr[0] = LOCKED;

              break;
            }
          }
          if (!found) {
            error("Failed to find a matching receive");
          }

          /* Need to allow for some MPI progession. Since we make no
           * MPI calls. Should not be needed if using a progression thread? */
          int flag = 0;
          MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag,
                     MPI_STATUS_IGNORE);
        }
      }
    }
  }

  message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
          clocks_getunit());

  /* Thread exits. */
  return NULL;
}

/**
 * @brief Comparison function for tags.
 */
static int cmp_logs(const void *p1, const void *p2) {
  struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
  struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;

  if (l1->tag > l2->tag) return 1;
  if (l1->tag < l2->tag) return -1;
  return 0;
}

/**
 * @brief Pick out the relevant logging data for our rank.
 */
static size_t pick_logs() {

  size_t nlogs = mpiuse_nr_logs();
  size_t maxsize = 0;

  /* Duplicate of logs. */
  send_queue = (struct mpiuse_log_entry **)calloc(
      nlogs, sizeof(struct mpiuse_log_entry *));
  nr_send = 0;
  recv_queue = (struct mpiuse_log_entry **)calloc(
      nlogs, sizeof(struct mpiuse_log_entry *));
  nr_recv = 0;

  for (int k = 0; k < nlogs; k++) {
    struct mpiuse_log_entry *log = mpiuse_get_log(k);
    if (log->activation) {
      if (log->rank == myrank) {
        log->done = 0;
        log->data = NULL;
        if (log->type == task_type_send) {
          send_queue[nr_send] = log;
          nr_send++;
        } else if (log->type == task_type_recv) {
          recv_queue[nr_recv] = log;
          nr_recv++;
        } else {
          error("task type '%d' is not a known send or recv task", log->type);
        }
      }
      /* Across all ranks. */
      if (log->size > maxsize) maxsize = log->size;
    }
  }

  /* Sort into increasing tag. */
  qsort(recv_queue, nr_recv, sizeof(struct mpiuse_log_entry *), cmp_logs);
  qsort(send_queue, nr_send, sizeof(struct mpiuse_log_entry *), cmp_logs);

  if (verbose)
    message("maxsize = %zd, nr_send = %d, nr_recv = %d", maxsize, nr_send,
            nr_recv);
  return maxsize;
}

/**
 * @brief usage help.
 */
static void usage(char *argv[]) {
  fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n",
          argv[0]);
  fprintf(stderr, " options: -v verbose\n");
  fflush(stderr);
}

/**
 * @brief main function.
 */
int main(int argc, char *argv[]) {

  /* Initiate MPI. */
  int prov = 0;
  int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
  if (res != MPI_SUCCESS)
    error("Call to MPI_Init_thread failed with error %i.", res);

  res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks);
  if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);

  res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
  if (res != MPI_SUCCESS)
    error("Call to MPI_Comm_rank failed with error %i.", res);

  /* Handle the command-line, we expect a mpiuse data file to read and various
   * options. */
  int opt;
  while ((opt = getopt(argc, argv, "vd")) != -1) {
    switch (opt) {
      case 'd':
        datacheck = 1;
        break;
      case 'v':
        verbose = 1;
        break;
      default:
        if (myrank == 0) usage(argv);
        return 1;
    }
  }
  if (optind >= argc - 1) {
    if (myrank == 0) usage(argv);
    return 1;
  }
  char *infile = argv[optind];
  char *logfile = argv[optind + 1];

  /* Now we read the SWIFT MPI logger output that defines the communcations
   * we will undertake and the time differences between injections into the
   * queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
   * log per rank per step, so you need to combine all ranks from a step. */
  mpiuse_log_restore(infile);
  int nranks = mpiuse_nr_ranks();

  /* This should match the expected size. */
  if (nr_ranks != nranks)
    error("The number of MPI ranks %d does not match the expected value %d",
          nranks, nr_ranks);

  /* Extract the send and recv messages for our rank. */
  size_t maxsize = pick_logs();

  /* Size of a message board. Needs to align on size_t. */
  MESSAGE_SIZE = toblocks(maxsize) + HEADER_SIZE;

  /* Now for the one-sided setup... We need a buffer with space largest
   * message, plus one of these per rank. */
  for (int i = 0; i < task_subtype_count; i++) {
    MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
    MPI_Win_allocate(tobytes(MESSAGE_SIZE * nr_ranks), BYTESINBLOCK,
                     MPI_INFO_NULL, subtypeMPI_comms[i], &mpi_ptr[i],
                     &mpi_window[i]);

    /* Assert a shared lock with all the other processes on this window. */
    MPI_Win_lock_all(0, mpi_window[i]);
  }
  message("Windows allocated");

  /* Time to start time. Try to make it synchronous across the ranks. */
  MPI_Barrier(MPI_COMM_WORLD);
  clocks_set_cpufreq(0);
  if (myrank == 0) {
    message("Start of MPI tests");
    message("==================");
    if (verbose) {
      if (datacheck)
        message("checking data pattern on send and recv completion");
    }
  }

  /* Make two threads, one for send and one for receiving. */
  pthread_t sendthread;
  if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
    error("Failed to create send thread.");
  pthread_t recvthread;
  if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
    error("Failed to create recv thread.");

  /* Wait until all threads have exited and all message exchanges have
   * completed. */
  pthread_join(sendthread, NULL);
  pthread_join(recvthread, NULL);

  /* Free the window locks. */
  for (int i = 0; i < task_subtype_count; i++) {
    MPI_Win_unlock_all(mpi_window[i]);
    MPI_Win_free(&mpi_window[i]);
  }

  /* Dump the updated MPI logs. */
  MPI_Barrier(MPI_COMM_WORLD);
  fflush(stdout);
  if (myrank == 0) message("Dumping updated log");
  mpiuse_dump_logs(nranks, logfile);

  /* Shutdown MPI. */
  res = MPI_Finalize();
  if (res != MPI_SUCCESS)
    error("call to MPI_Finalize failed with error %i.", res);

  if (myrank == 0) message("Bye");

  return 0;
}