Skip to content
Snippets Groups Projects
mpistalls-timed.c 9.86 KiB
/**
 * Attempt to reproduce the asynchronous stalling that we see for medium
 * busy steps in SWIFT.
 *
 * Timed injection version.
 *
 * So we need to setup a multithreaded MPI program that performs asynchronous
 * exchanges of various data sizes and continuously checks the requests for
 * completion. Also need timers to record the time taken by all this...
 */
#include <stdio.h>
#include <mpi.h>
#include <pthread.h>
#include <stdlib.h>

#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "mpiuse.h"

/* Global: Our rank for all to see. */
int myrank = -1;

/* Integer types of send and recv tasks, must match log. */
static const int task_type_send = 22;
static const int task_type_recv = 23;

/* Global communicators for each of the subtypes. */
static const int task_subtype_count = 30; // Just some upper limit on subtype.
static MPI_Comm subtypeMPI_comms[30];

/* The local queues. */
static struct mpiuse_log_entry **volatile reqs_queue;
static int volatile ind_req = 0;
static int volatile nr_reqs = 0;
static int volatile injecting = 1;
static struct mpiuse_log_entry **volatile recvs_queue;
static int volatile nr_recvs = 0;
static int volatile ind_recv = 0;
static int volatile todo_recv = 0;
static struct mpiuse_log_entry **volatile sends_queue;
static int volatile nr_sends = 0;
static int volatile ind_send = 0;
static int volatile todo_send = 0;

/* Delta tics that we do not wait for. XXX measure this. */
const ticks looptics = 10000;

/* CPU frequency of the machine that created the MPI log. */
// XXX need to store this in the data file.
static double clocks_cpufreq = 2194844448.0;

/* Injection thread, initiates MPI_Isend and MPI_Irecv requests at various
 * times. */
static void *inject_thread(void *arg) {
  message("%d: injection thread starts", *((int *)arg));
  ticks starttics = getticks();

  /* Ticks of our last attempt. */
  ticks basetic = reqs_queue[0]->tic;

  while (ind_req < nr_reqs) {
    struct mpiuse_log_entry *log = reqs_queue[ind_req];

    /* Expect time between this request and the previous one. */
    ticks dt = log->tic - basetic;
    double ddt = (double)log->tic - (double)basetic;
    ticks oldbasetic = basetic;
    basetic = log->tic;
    /* Guess some time below which we should not attempt to wait, otherwise
     * we'll start to overrun, and just inject the next call if we are below
     * that (basically how many ticks this loop takes without any
     * waiting). Otherwise wait a while. */
    if (dt > looptics) {
      struct timespec sleep;
      sleep.tv_sec = 0;

      /* Remember to be fair and remove the looptics, then convert to
       * nanoseconds. */
      double ns = (double)(dt - looptics) / clocks_cpufreq * 1.0e9;
      if (ns < 1.0e9) {
        sleep.tv_nsec = (long) ns;
      } else {
        /* Wait more than one second. Must be an error, but complain and
         * continue.  */
        sleep.tv_nsec = (long) 1.0e9;
        message("wait greater than one second");
      }
      nanosleep(&sleep, NULL);
    }

    // Differences to SWIFT: MPI_BYTE might overflow, should use MPI_Type(?).
    int err = 0;
    if (log->type == task_type_send) {
      err = MPI_Isend(log->data, log->size, MPI_BYTE, log->otherrank,
                      log->tag, subtypeMPI_comms[log->subtype], &log->req);

      /* Add a new send request. */
      int ind = atomic_inc(&nr_sends);
      sends_queue[ind] = log;
      atomic_inc(&todo_send);

    } else {
      err = MPI_Irecv(log->data, log->size, MPI_BYTE, log->otherrank,
                      log->tag, subtypeMPI_comms[log->subtype], &log->req);

      /* Add a new recv request. */
      int ind = atomic_inc(&nr_recvs);
      recvs_queue[ind] = log;
      atomic_inc(&todo_recv);
    }
    if (err != MPI_SUCCESS) error("Failed to activate send or recv");

    ind_req++;
  }
  message("%d injections completed, sends = %d, recvs = %d", ind_req,
          nr_sends, nr_recvs);
  message("remaining sends = %d, recvs = %d", todo_send, todo_recv);
  message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
          clocks_getunit());
  atomic_dec(&injecting);
  return NULL;
}

/* Send thread, checks if MPI_Isend requests have completed. */
static void *send_thread(void *arg) {
  message("%d: send thread starts (%d)", *((int *)arg), injecting);
  ticks starttics = getticks();

  int res;
  MPI_Status stat;

  // Need a test that only exits when requests are all inserted and we have
  // emptied our queue. */
  size_t attempts = 0;
  while (injecting || (!injecting && todo_send > 0)) {
    int nsends = nr_sends;
    for (int k = 0; k < nsends; k++) {
      struct mpiuse_log_entry *log = sends_queue[k];
      if (log != NULL) {
        attempts++;
        int err = MPI_Test(&log->req, &res, &stat);
        if (err != MPI_SUCCESS) {
          error("MPI_Test call failed");
        }
        if (res) {
          /* Done, clean up. */
          //message("MPI_Test successful");
          free(log->data);
          sends_queue[k] = NULL;
          atomic_dec(&todo_send);
        }
      }
    }
  }
  message("sends completed, required %zd attempts (left: %d)", attempts,
          todo_send);
  message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
          clocks_getunit());
  return NULL;
}

/* Recv thread, checks if MPI_Irecv requests have completed. */
static void *recv_thread(void *arg) {
  message("%d: recv thread starts", *((int *)arg));
  ticks starttics = getticks();

  int res;
  MPI_Status stat;

  size_t attempts = 0;
  while (injecting || (!injecting && todo_recv > 0)) {
    int nrecvs = nr_recvs;
    for (int k = 0; k < nrecvs; k++) {
      struct mpiuse_log_entry *log = recvs_queue[k];
      if (log != NULL) {
        attempts++;
        int err = MPI_Test(&log->req, &res, &stat);
        if (err != MPI_SUCCESS) {
          error("MPI_Test call failed");
        }
        if (res) {
          /* Done, clean up. */
          //message("MPI_Test successful");
          free(log->data);
          recvs_queue[k] = NULL;
          atomic_dec(&todo_recv);
        }
      }
    }
  }

  message("recvs completed, required %zd attempts (left: %d)", attempts,
          todo_recv);
  message("took %.3f %s.", clocks_from_ticks(getticks() - starttics),
          clocks_getunit());
  return NULL;
}

/* Comparison function for logged times. */
static int cmp_logs(const void *p1, const void *p2) {
  struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
  struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;

  /* Large unsigned values, so take care. */
  if (l1->tic > l2->tic)
    return 1;
  if (l1->tic < l2->tic)
    return -1;
  return 0;
}

/* Pick out the relevant logging data for our rank, i.e. all activations of
 * sends and recvs. */
static void pick_logs(void) {
  size_t nlogs = mpiuse_nr_logs();
  int nranks = mpiuse_nr_ranks();

  /* Duplicate of logs. XXX could loop twice to reduce memory use if needed. */
  reqs_queue = (struct mpiuse_log_entry **)
    malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
  nr_reqs= 0;
  sends_queue = (struct mpiuse_log_entry **)
    malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
  nr_sends= 0;
  recvs_queue = (struct mpiuse_log_entry **)
    malloc(sizeof(struct mpiuse_log_entry *) * nlogs);
  nr_recvs = 0;

  for (int k = 0; k < nlogs; k++) {
    struct mpiuse_log_entry *log = mpiuse_get_log(k);
    if (log->rank == myrank && log->activation) {
      if (log->type == task_type_send || log->type == task_type_recv) {

        /* Allocate space for data. */
        log->data = calloc(log->size, 1);

        /* And keep this log. */
        reqs_queue[nr_reqs] = log;
        nr_reqs++;

      } else {
        error("task type '%d' is not a known send or recv task", log->type);
      }
    }
  }

  /* Sort into increasing time. */
  qsort(reqs_queue, nr_reqs, sizeof(struct mpiuse_log_entry *), cmp_logs);
}

int main(int argc, char *argv[]) {

  /* First we read the SWIFT MPI logger output that defines the communcations
   * we will undertake and the time differences between injections into the
   * queues. */
  mpiuse_log_restore("testdata/mpiuse_report-step1.dat");
  int nranks = mpiuse_nr_ranks();

  /* Initiate MPI. */
  int prov = 0;
  int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
  if (res != MPI_SUCCESS)
    error("Call to MPI_Init_thread failed with error %i.", res);

  int nr_nodes = 0;
  res = MPI_Comm_size(MPI_COMM_WORLD, &nr_nodes);
  if (res != MPI_SUCCESS)
    error("MPI_Comm_size failed with error %i.", res);

  /* This should match the expected size. */
  if (nr_nodes != nranks)
    error("The number of MPI ranks %d does not match the expected value %d",
          nranks, nr_nodes);

  res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
  if (res != MPI_SUCCESS)
    error("Call to MPI_Comm_rank failed with error %i.", res);
  /* Create communicators for each subtype of the tasks. */
  for (int i = 0; i < task_subtype_count; i++) {
    MPI_Comm_dup(MPI_COMM_WORLD, &subtypeMPI_comms[i]);
  }

  /* Each rank requires its own queue, so extract them. */
  pick_logs();

  /* Start time. */
  message("Start of MPI tests");
  clocks_set_cpufreq(0);

  /* Make three threads, one for injecting tasks and two to check for
   * completions of the sends and recv independently. */
  pthread_t injectthread;
  if (pthread_create(&injectthread, NULL, &inject_thread, &myrank) != 0)
    error("Failed to create injection thread.");
  pthread_t sendthread;
  if (pthread_create(&sendthread, NULL, &send_thread, &myrank) != 0)
    error("Failed to create send thread.");
  pthread_t recvthread;
  if (pthread_create(&recvthread, NULL, &recv_thread, &myrank) != 0)
    error("Failed to create recv thread.");

  /* Wait until all threads have exited and all MPI requests have completed. */
  pthread_join(injectthread, NULL);
  pthread_join(sendthread, NULL);
  pthread_join(recvthread, NULL);

  /* Shutdown MPI. */
  res = MPI_Finalize();
  if (res != MPI_SUCCESS)
    error("call to MPI_Finalize failed with error %i.", res);

  return 0;
}