diff --git a/Makefile b/Makefile
index 27c8f7a5f03d3b834ab0e998acfc78f179540edd..3a5d3600c04ba15d50019fd868dd1cac06441327 100644
--- a/Makefile
+++ b/Makefile
@@ -1,6 +1,6 @@
-#CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined
+CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=address -fno-omit-frame-pointer -fsanitize=undefined -DINFINITY_DEBUG_ON -DINFINITY_ASSERT_ON
 #CFLAGS = -g -O3 -Wall -Iinfinity/include
-CFLAGS = -g -O3 -Wall -Iinfinity/include -DINFINITY_DEBUG_ON -DINFINITY_ASSERT_ON
+#CFLAGS = -g -O3 -Wall -Iinfinity/include -DINFINITY_DEBUG_ON -DINFINITY_ASSERT_ON
 #CFLAGS = -g -O0 -Wall -Iinfinity/include -fsanitize=thread
 
 INCLUDES = mpiuse.h atomic.h cycle.h clocks.h error.h
@@ -9,10 +9,10 @@ DEPS = Makefile $(SOURCES) $(INCLUDES)
 
 INFINITY = -Linfinity -linfinity -libverbs
 
-
 all: swiftmpistepsim swiftmpistepsim2 \
      swiftmpirdmastepsim swiftmpirdmastepsim2 swiftmpirdmastepsim3 \
-     swiftmpirdmaonestepsim swiftmpirdmaonestepsim2
+     swiftmpirdmaonestepsim swiftmpirdmaonestepsim2 \
+     swiftmpirdmaonestepsim_wrapper
 
 swiftmpistepsim: swiftmpistepsim.c  $(DEPS)
 	mpicxx $(CFLAGS) -o swiftmpistepsim swiftmpistepsim.c $(SOURCES)
@@ -35,6 +35,10 @@ swiftmpirdmastepsim2: swiftmpirdmastepsim2.c $(DEPS)
 swiftmpirdmastepsim3: swiftmpirdmastepsim3.c $(DEPS)
 	mpicxx $(CFLAGS) -o swiftmpirdmastepsim3 swiftmpirdmastepsim3.c $(SOURCES) $(INFINITY)
 
+swiftmpirdmaonestepsim_wrapper: swiftmpirdmaonestepsim_wrapper.c $(DEPS) infinity_wrapper.h infinity_wrapper.c
+	mpicxx $(CFLAGS) -o swiftmpirdmaonestepsim_wrapper swiftmpirdmaonestepsim_wrapper.c \
+		infinity_wrapper.c $(SOURCES) $(INFINITY)
+
 clean:
 	rm -f swiftmpistepsim
 	rm -f swiftmpistepsim2
@@ -43,5 +47,6 @@ clean:
 	rm -f swiftmpirdmaonestepsim2
 	rm -f swiftmpirdmastepsim2
 	rm -f swiftmpirdmastepsim3
+	rm -f swiftmpirdmastepsim_wrapper
 
 
diff --git a/infinity_wrapper.c b/infinity_wrapper.c
new file mode 100644
index 0000000000000000000000000000000000000000..105a4761a75e9a94c0923513ae026bff2cc6692b
--- /dev/null
+++ b/infinity_wrapper.c
@@ -0,0 +1,346 @@
+/*******************************************************************************
+ * This file is part of SWIFT.
+ * Copyright (c) 2020 Peter W. Draper
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ ******************************************************************************/
+
+/**
+ * @brief simple C wrapper for the C++ infinity library, only provides an
+ * interface to meta capabilities we use. Note still exposed to C++ linkage
+ * from the infinity library so we must use a C++ compiler.
+ */
+/* Config parameters. */
+//#include "../config.h"
+#define HAVE_INFINITY
+
+/* Standard includes. */
+#include <arpa/inet.h>
+#ifdef WITH_MPI
+#include <mpi.h>
+#endif
+#include <netdb.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+/* Infinity C++ headers. */
+#ifdef HAVE_INFINITY
+#include <infinity/core/Context.h>
+#include <infinity/memory/Buffer.h>
+#include <infinity/memory/RegionToken.h>
+#include <infinity/queues/QueuePair.h>
+#include <infinity/queues/QueuePairFactory.h>
+#include <infinity/requests/RequestToken.h>
+#endif
+
+/* Local defines. */
+#include "infinity_wrapper.h"
+
+/* Local includes. */
+#include "error.h"
+
+/* Size of a block of memory. MESSAGE_SIZE needs to be a multiple of this as
+ * as we need to align in memory. */
+#define BLOCKTYPE size_t
+#define MPI_BLOCKTYPE MPI_AINT
+static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
+
+/* Flags for controlling access. High end of size_t. */
+static size_t UNLOCKED = (((size_t)2 << 63) - 1);
+
+/* Struct of QPs and associated data. */
+struct qps_data {
+  int nr_qps;
+  infinity::core::Context *context;
+  infinity::queues::QueuePairFactory *factory;
+  infinity::queues::QueuePair **qps;
+  infinity::memory::Buffer **receive_buffers;
+  infinity::memory::RegionToken **remote_buffers;
+  infinity::memory::Buffer **readwrite_buffers;
+  infinity::memory::RegionToken **token_buffers;
+};
+
+/**
+ * @brief Find an IP address for the given hostname.
+ *
+ * @param hostname the hostname
+ *
+ * @result the IP address, note copy away to keep.
+ */
+
+static char *toipaddr(char *hostname) {
+
+  struct hostent *hostent = gethostbyname(hostname);
+  if (hostent == NULL) {
+    error("Failed to convert hostname '%s' to an IP address", hostname);
+  }
+  struct in_addr **addr_list = (struct in_addr **)hostent->h_addr_list;
+  return inet_ntoa(*addr_list[0]);
+}
+
+/**
+ * @brief Create a QPs to connect a group of clients to a group of servers.
+ *
+ * Requires that infinity_create_servers is also running, otherwise we
+ * block waiting for the connections.
+ *
+ * @param servers a #mpi_servers struct with the server details.
+ * @param nr_servers the number of servers expected to connect.
+ * @param myrank the MPI rank of this process.
+ * @param verbose if 1 then report the connections made.
+ *
+ * @return handle for the QPs and related data.
+ */
+void *infinity_connect_clients(struct mpi_servers *servers, int nr_servers,
+                               int myrank, int verbose) {
+#ifdef HAVE_INFINITY
+
+  /* Struct to hold all the persistent data. */
+  struct qps_data *cqps = (struct qps_data *)calloc(1, sizeof(struct qps_data));
+
+  /* Need a factory to create QPs. */
+  cqps->context = new infinity::core::Context();
+  cqps->factory = new infinity::queues::QueuePairFactory(cqps->context);
+
+  /* Create the QPs connecting to all the other ranks. */
+  cqps->qps = (infinity::queues::QueuePair **)
+    calloc(nr_servers, sizeof(infinity::queues::QueuePair *));
+  cqps->nr_qps = nr_servers;
+
+  /* Space for the pointers to the remote memory. */
+  cqps->remote_buffers = (infinity::memory::RegionToken **)
+    calloc(nr_servers, sizeof(infinity::memory::RegionToken *));
+
+  /* We need to listen for messages from the other rank servers that we can
+   * connect to them as they need to be up first. */
+  int buf[1];
+  MPI_Request reqs[nr_servers];
+  for (int k = 0; k < nr_servers; k++) {
+    if (k != myrank) {
+      MPI_Irecv(buf, 1, MPI_INT, k, k, MPI_COMM_WORLD, &reqs[k]);
+    } else {
+      reqs[myrank] = MPI_REQUEST_NULL;
+    }
+  }
+
+  /* Now we poll for any servers that are ready to connect. */
+  int index;
+  MPI_Status status;
+  while (1) {
+    MPI_Waitany(nr_servers, reqs, &index, &status);
+
+    /* All done when all requests have completed. */
+    if (index == MPI_UNDEFINED) break;
+
+    /*  Got one, so connect. */
+    char *ip = &servers->ip[index * infinity_max_server_ip];
+    if (verbose)
+      message("%d waiting for connection to remote server %s %d on %d", myrank,
+              ip, index, BASE_PORT + myrank);
+    cqps->qps[index] = cqps->factory->connectToRemoteHost(ip, BASE_PORT + myrank);
+    if (verbose)
+      message("%d connected to remote server %s %d on %d", myrank, ip, index,
+              BASE_PORT + myrank);
+
+    /* Remote buffer access */
+    cqps->remote_buffers[index] =
+      (infinity::memory::RegionToken *)cqps->qps[index]->getUserData();
+  }
+
+  /* Result is opaque. */
+  return (void *)cqps;
+
+#else
+  return NULL;
+#endif
+}
+
+/**
+ * @brief Send a buffer to a server listening on a QP.
+ *
+ * @param qphandle the handle from infinity_connect_clients.
+ * @param index index of the server to send to.
+ * @param buffer the buffer to send, should be block aligned.
+ * @param size the size of the buffer in bytes.
+ * @param offset the offset into the remote buffer, note in bytes not blocks.
+ */
+void infinity_send_data(void *qphandle, int index, void *buffer, size_t size,
+                        size_t offset) {
+#ifdef HAVE_INFINITY
+  struct qps_data *cqps = (struct qps_data *)qphandle;
+
+  /* Need to assign to a buffer to register memory. XXX make this as big as
+   * necessary per server and reuse. */
+  auto *sendBuffer =
+    new infinity::memory::Buffer(cqps->context, buffer, size);
+
+  /* And send. */
+  infinity::requests::RequestToken requestToken(cqps->context);
+  cqps->qps[index]->write(sendBuffer,
+                          0,                           // localOffset
+                          cqps->remote_buffers[index], // destination
+                          offset,                      // remoteOffset
+                          size,                        // sizeInBytes
+                          infinity::queues::OperationFlags(),
+                          &requestToken);
+  requestToken.waitUntilCompleted();
+  requestToken.reset();
+
+  /* Now we update the unlock field. */
+  ((BLOCKTYPE *)sendBuffer->getData())[0] = UNLOCKED;
+  cqps->qps[index]->write(sendBuffer,
+                          0,                           // localOffset
+                          cqps->remote_buffers[index], // destination
+                          offset,                      // remoteOffset
+                          BYTESINBLOCK,                // sizeInBytes
+                          infinity::queues::OperationFlags(),
+                          &requestToken);
+  requestToken.waitUntilCompleted();  // Since we reuse the sendBuffer.
+
+  delete sendBuffer;
+
+#endif
+  return;
+}
+
+/* @brief Free the resource associated with handle.
+ *
+ * @param qphandle the handle from infinity_connect_clients.
+ */
+void infinity_clients_free(void *qphandle) {
+
+#ifdef HAVE_INFINITY
+  struct qps_data *cqps = (struct qps_data *)qphandle;
+  for (int k = 0; k < cqps->nr_qps; k++) delete cqps->qps[k];
+  free(cqps->qps);
+  delete cqps->factory;
+  delete cqps->context;
+  free(cqps->receive_buffers);
+  free(cqps->remote_buffers);
+  if (cqps->readwrite_buffers != NULL) {
+    for (int k = 0; k < cqps->nr_qps; k++) delete cqps->readwrite_buffers[k];
+    free(cqps->readwrite_buffers);
+  }
+  if (cqps->token_buffers != NULL) {
+    for (int k = 0; k < cqps->nr_qps; k++) delete cqps->token_buffers[k];
+    free(cqps->token_buffers);
+  }
+  free(cqps);
+#endif
+  return;
+}
+
+/**
+ * @brief Create QPs for server to receive data from our clients.
+ *
+ * Requires that infinity_connect_clients is also ran, otherwise we
+ * block waiting for the connections.
+ *
+ * @param servers a #mpi_servers struct with the server details.
+ * @param nr_servers the number of servers we will create.
+ * @param sizes the sizes, in bytes, of the various windows needed to receive
+ *              all the remote data from a client. Array size of nr_servers.
+ * @param myrank the MPI rank of this process.
+ * @param verbose if 1 then report the connections made.
+ *
+ * @return handle for the QPs and related data.
+ */
+void *infinity_create_servers(struct mpi_servers *servers, int nr_servers,
+                              size_t *sizes, int myrank, int verbose) {
+
+#ifdef HAVE_INFINITY
+  /* Struct to hold all the persistent data. */
+  struct qps_data *cqps = (struct qps_data *)calloc(1, sizeof(struct qps_data));
+
+  /* Need a factory to create QPs. */
+  cqps->context = new infinity::core::Context();
+  cqps->factory = new infinity::queues::QueuePairFactory(cqps->context);
+
+  /* Create the QPs connecting to all the other ranks. */
+  cqps->qps = (infinity::queues::QueuePair **)
+    calloc(nr_servers, sizeof(infinity::queues::QueuePair *));
+  cqps->nr_qps = nr_servers;
+
+  /* Create buffers to receive all the remote data. */
+  cqps->readwrite_buffers = (infinity::memory::Buffer **)
+    calloc(nr_servers, sizeof(infinity::memory::Buffer *));
+  cqps->token_buffers = (infinity::memory::RegionToken **)
+    calloc(nr_servers, sizeof(infinity::memory::RegionToken *));
+
+  for (int k = 0; k < nr_servers; k++) {
+    if (sizes[k] > 0) {
+      cqps->readwrite_buffers[k] =
+        new infinity::memory::Buffer(cqps->context, sizes[k]);
+    } else {
+      /* Dummy: not expecting any data, but setup anyway. */
+      cqps->readwrite_buffers[k] =
+        new infinity::memory::Buffer(cqps->context, BYTESINBLOCK);
+    }
+    cqps->token_buffers[k] = cqps->readwrite_buffers[k]->createRegionToken();
+  }
+
+  /* Do the port binding for each other rank. */
+  int buf[1];
+  MPI_Request req;
+  for (int k = 0; k < nr_servers; k++) {
+    if (k != myrank) {
+      if (verbose)
+        message("%d binding to %d on port %d", myrank, k, BASE_PORT + k);
+      cqps->factory->bindToPort(BASE_PORT + k);
+
+      /* Send message this port is about to block for a connection. */
+      if (verbose) message("Blocking for first message on %d", BASE_PORT + k);
+      MPI_Isend(buf, 1, MPI_INT, k, myrank, MPI_COMM_WORLD, &req);
+      cqps->qps[k] = cqps->factory->acceptIncomingConnection
+        (cqps->token_buffers[k], sizeof(infinity::memory::RegionToken));
+      if (verbose)
+        message("Accepting incoming connections on %d", BASE_PORT + k);
+    }
+  }
+
+  return (void *)cqps;
+#else
+  return NULL;
+#endif
+}
+
+/**
+ * @brief Check if data is ready, that is has arrived.
+ *
+ * @param qphandle the handle from infinity_create_servers.
+ * @param index index of the client we are checking.
+ * @param offset the offset of this data in the RDMA buffer,
+ *               note in blocks not bytes.
+ *
+ * @result pointer to the start of the data, otherwise NULL.
+ */
+void *infinity_check_ready(void *qphandle, int index, size_t offset) {
+
+  void *result = NULL;
+#ifdef HAVE_INFINITY
+  struct qps_data *cqps = (struct qps_data *)qphandle;
+
+  /* Get the data location. */
+  BLOCKTYPE *dataptr = &((BLOCKTYPE *)cqps->readwrite_buffers[index]->getData())[offset];
+
+  /* Check if this has been unlocked. */
+  BLOCKTYPE volatile lock = dataptr[0];
+  if (lock == UNLOCKED) result = (void *)dataptr;
+
+#endif
+  return result;
+}
diff --git a/infinity_wrapper.h b/infinity_wrapper.h
new file mode 100644
index 0000000000000000000000000000000000000000..b32f8628d48942be9d3ff07449bbbffc3e3d6428
--- /dev/null
+++ b/infinity_wrapper.h
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * This file is part of SWIFT.
+ * Copyright (c) 2020 Peter W. Draper (p.w.draper@durham.ac.uk)
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ ******************************************************************************/
+#ifndef INFINITY_WRAPPER_H
+#define INFINITY_WRAPPER_H
+
+/* Config parameters. */
+//#include "../config.h"
+
+/* Base port no. Ranks use +rank. XXX we need to handle this more
+ * dynamically. */
+static int BASE_PORT = 27771;
+
+/* Maximum length of formatted server IP address. */
+#define infinity_max_server_ip 24
+
+/* Struct of MPI server ip addresses as formatted strings.*/
+struct mpi_servers {
+  char *ip;
+};
+
+void *infinity_connect_clients(struct mpi_servers *servers, int nr_servers,
+                               int myrank, int verbose);
+void infinity_send_data(void *qphandle, int index, void *buffer, size_t size,
+                        size_t offset);
+void infinity_clients_free(void *qphandle);
+void *infinity_create_servers(struct mpi_servers *servers, int nr_servers,
+                              size_t *sizes, int myrank, int verbose);
+void *infinity_check_ready(void *qphandle, int index, size_t offset);
+
+#endif /* INFINITY_WRAPPER_H */
diff --git a/swiftmpirdmaonestepsim_wrapper.c b/swiftmpirdmaonestepsim_wrapper.c
new file mode 100644
index 0000000000000000000000000000000000000000..9fd9a49869a162b2278f8873fb7fad0d41b82d92
--- /dev/null
+++ b/swiftmpirdmaonestepsim_wrapper.c
@@ -0,0 +1,656 @@
+/*******************************************************************************
+ * This file is part of SWIFT.
+ * Copyright (c) 2020 Peter W. Draper
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ ******************************************************************************/
+
+// Pure RDMA version, we use MPI for process control and synchronization.
+// Write variant, attempting to use eager-like one-sided sends.
+// Variation has a single send and receive thread per rank.
+
+#include <arpa/inet.h>
+#include <limits.h>
+#include <mpi.h>
+#include <netdb.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "infinity_wrapper.h"
+
+#include "atomic.h"
+#include "clocks.h"
+#include "error.h"
+#include "mpiuse.h"
+
+/* Our rank for all to see. */
+int myrank = -1;
+
+/* CPU frequency estimate, shared so we do this once. */
+static long long cpufreq = 0;
+
+/* Number of ranks. */
+static int nr_ranks;
+
+/* Size of a block of memory. */
+#define BLOCKTYPE size_t
+#define MPI_BLOCKTYPE MPI_AINT
+static const int BYTESINBLOCK = sizeof(BLOCKTYPE);
+
+/* Size of message header in blocks. The lock/rank, subtype, size and tag. */
+static const size_t HEADER_SIZE = 4;
+
+/* Maximum Size of a message in blocks. */
+static size_t max_size = 0;
+
+/* Are we verbose. */
+static int verbose = 0;
+
+/* Scale to apply to the size of the messages we send. */
+static float messagescale = 1.0;
+
+/* Set a data pattern and check we get this back, slow... */
+static int datacheck = 0;
+
+/* Integer types of send and recv tasks, must match log. */
+static const int task_type_send = 22;
+static const int task_type_recv = 23;
+
+/* 3D index of array. */
+#define INDEX3(nx, ny, x, y, z) (nx * ny * z + nx * y + x)
+
+/* 2D index of array. */
+#define INDEX2(nx, x, y) (nx * y + x)
+
+/* Bit shift to accomodate all the bits of the maximum rank id. */
+static int rank_shift = 0;
+
+/* Bit shift to accomodate all the bits of the maximum subtype. */
+static int subtype_shift = 0;
+
+/* Maximum no. of messages (logs). */
+static size_t max_logs = 0;
+
+/* Offsets of the ranktag regions within the receive windows and lists of the
+ * assocated tags. */
+static size_t *ranktag_sizes;
+static size_t *ranktag_offsets;
+static size_t *ranktag_lists;
+
+/* The local send queue. */
+static struct mpiuse_log_entry **volatile send_queue;
+static int volatile nr_sends = 0;
+static int volatile todo_send = 0;
+
+/* Lock for starting up completed. */
+static int volatile starting_up = 1;
+
+/* Local receive queues separated by rank. */
+static int volatile nr_recvs = 0;
+static struct mpiuse_log_entry **volatile recv_queue;
+
+/**
+ * @brief Convert ranks, subtype and tag into a single unique value.
+ *
+ * Assumes there is enough space in a size_t for these values.
+ *
+ * @param subtype the subtype of the message
+ * @param sendrank the receive rank
+ * @param recvrank the receive rank
+ * @param tag the tag
+ *
+ * @result a unique value based on both values
+ */
+static size_t toranktag(int subtype, int sendrank, int recvrank, int tag) {
+  size_t result = subtype | sendrank << subtype_shift |
+                  recvrank << (subtype_shift + rank_shift) |
+                  tag << (subtype_shift * 2 + rank_shift);
+  return result;
+}
+
+/**
+ * @brief Find an IP address for the given hostname.
+ *
+ * @param hostname the hostname
+ *
+ * @result the IP address, note copy away to keep.
+ */
+
+static char *toipaddr(char *hostname) {
+
+  struct hostent *hostent = gethostbyname(hostname);
+  if (hostent == NULL) {
+    error("Failed to convert hostname '%s' to an IP address", hostname);
+  }
+  struct in_addr **addr_list = (struct in_addr **)hostent->h_addr_list;
+  return inet_ntoa(*addr_list[0]);
+}
+
+/**
+ * @brief Convert a byte count into a number of blocks, rounds up.
+ *
+ * @param nr_bytes the number of bytes.
+ *
+ * @result the number of blocks needed.
+ */
+static BLOCKTYPE toblocks(BLOCKTYPE nr_bytes) {
+  return (nr_bytes + (BYTESINBLOCK - 1)) / BYTESINBLOCK;
+}
+
+/**
+ * @brief Convert a block count into a number of bytes.
+ *
+ * @param nr_block the number of blocks.
+ *
+ * @result the number of bytes.
+ */
+static BLOCKTYPE tobytes(BLOCKTYPE nr_blocks) {
+  return (nr_blocks * BYTESINBLOCK);
+}
+
+/**
+ * @brief fill a data area with given value.
+ *
+ * @param size size of data in bytes.
+ * @param data the data to fill.
+ * @param value the value to fill.
+ */
+static void datacheck_fill(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) {
+  for (BLOCKTYPE i = 0; i < size; i++) {
+    data[i] = value;
+  }
+}
+
+/**
+ * @brief test a filled data area for a value.
+ *
+ * @param size size of data in bytes.
+ * @param data the data to check.
+ * @param value the value expected.
+ *
+ * @result 1 on success, 0 otherwise.
+ */
+static int datacheck_test(BLOCKTYPE size, BLOCKTYPE *data, BLOCKTYPE value) {
+  for (size_t i = 0; i < size; i++) {
+    if (data[i] != value) {
+      message("see %zd expected %zd @ %zd (%zd to go)", data[i], value, i,
+              size);
+      return 0;
+    }
+  }
+  return 1;
+}
+
+/**
+ * @brief Send thread, sends RDMA messages to the other ranks.
+ *
+ * Messages are all considered in order.
+ */
+static void *send_thread(void *arg) {
+
+  // Connect QPs to the remote servers.
+  struct mpi_servers *servers = (struct mpi_servers *)arg;
+  void *qphandle = infinity_connect_clients(servers, nr_ranks,
+                                            myrank, 0);
+
+  /* Extract the offset lists that we use. */
+  int nr = 0;
+  int size = (max_logs / 16 + 1);
+  size_t *ranktags = (size_t *)malloc(size * sizeof(size_t));
+  size_t *offsets = (size_t *)malloc(size * sizeof(size_t));
+
+  /* A tag that will match any subtype or tag with our rank for all the
+   * otherranks. */
+  for (int k = 0; k < nr_ranks; k++) {
+    size_t matchranktag = toranktag(0, myrank, k, 0);
+    for (size_t j = 0; j < max_logs; j++) {
+      size_t ranktag = ranktag_lists[INDEX3(nr_ranks, nr_ranks, myrank, k, j)];
+      if ((ranktag & matchranktag) == matchranktag) {
+
+        /* Keep this one. */
+        ranktags[nr] = ranktag;
+        offsets[nr] = ranktag_offsets[INDEX3(nr_ranks, nr_ranks, myrank, k, j)];
+        nr++;
+        if (nr >= size) {
+          size += (max_logs / 16 + 1);
+          ranktags = (size_t *)realloc(ranktags, size * sizeof(size_t));
+          offsets = (size_t *)realloc(offsets, size * sizeof(size_t));
+        }
+      }
+    }
+  }
+
+  // Startup complete, so start timing and release the receive thread.
+  MPI_Barrier(MPI_COMM_WORLD);  // Vital for synchronization.
+  clocks_set_cpufreq(cpufreq);
+  starting_up = 0;
+  message("All synchronized");
+  ticks starttics = getticks();
+
+  for (int k = 0; k < nr_sends; k++) {
+    struct mpiuse_log_entry *log = send_queue[k];
+
+    /* Data has the actual data and room for the header. */
+    BLOCKTYPE datasize = toblocks(log->size) + HEADER_SIZE;
+    BLOCKTYPE *dataptr = (BLOCKTYPE *)calloc(datasize, BYTESINBLOCK);
+    log->data = dataptr;
+    log->injtic = getticks();
+
+    /* Fill data with pattern. */
+    if (datacheck)
+      datacheck_fill(toblocks(log->size), &dataptr[HEADER_SIZE], log->tag);
+
+    /* First element is the lock element, which can have any value other than
+     * UNLOCKED, the other elements define an MPI message. */
+    dataptr[0] = myrank;
+    dataptr[1] = log->subtype;
+    dataptr[2] = log->size;
+    dataptr[3] = log->tag;
+
+    /* Need to find the offset for this data in the remotes window. */
+    size_t ranktag =
+        toranktag(log->subtype, log->rank, log->otherrank, log->tag);
+    log->offset = 0;
+    int found = 0;
+
+    for (int j = 0; j < nr; j++) {
+      if (ranktags[j] == ranktag) {
+        log->offset = offsets[j];
+        found = 1;
+        break;
+      }
+    }
+    if (!found) {
+      error(
+          "Failed sending a message of size %zd to %d/%d "
+          "@ %zd\n, no offset found for ranktag %zd",
+          datasize, log->otherrank, log->subtype, log->offset, ranktag);
+    }
+
+    if (verbose)
+      message(
+          "sending message subtype %d from %d to %d todo: %d/%d, offset %ld "
+          "size %ld",
+          log->subtype, myrank, log->otherrank, nr_sends - k, nr_sends,
+          log->offset, datasize);
+
+    // And send
+    infinity_send_data(qphandle, log->otherrank, dataptr, tobytes(datasize),
+                       tobytes(log->offset));
+    free(dataptr);
+    log->endtic = getticks();
+  }
+
+  message("took %.3f %s", clocks_from_ticks(getticks() - starttics),
+          clocks_getunit());
+
+  // Free data.
+  free(offsets);
+  free(ranktags);
+  infinity_clients_free(qphandle);
+
+  return NULL;
+}
+
+/**
+ * @brief recv thread, listens for remote sends from another rank.
+ */
+static void *recv_thread(void *arg) {
+
+  struct mpi_servers *servers = (struct mpi_servers *)arg;
+
+  /* Need sizes in bytes. */
+  size_t *sizes = (size_t *)calloc(nr_ranks, sizeof(size_t));
+  for (int k = 0; k < nr_ranks; k++) {
+    sizes[k] = tobytes(ranktag_sizes[k]);
+  }
+  void *qphandle = infinity_create_servers(servers, nr_ranks, sizes,
+                                           myrank, 0);
+  free(sizes);
+  message("All incomings up");
+
+  /* Now we wait for the remotes to connect before proceeding, otherwise the
+   * timings will be incorrect. */
+  while (starting_up)
+    ;
+
+  /* Ignore the timing on previous part, which is fixed. */
+  ticks starttics = getticks();
+
+  /* We loop while new requests are being send and we still have messages
+   * to receive. */
+  int todo_recv = nr_recvs;
+  while (todo_recv > 0) {
+
+    /* Loop over remaining messages, checking if any have been unlocked. */
+    int k = 0;
+    while (k < todo_recv) {
+      struct mpiuse_log_entry *log = recv_queue[k];
+
+      /* On the first attempt we start listening for this receive. */
+      if (log->injtic == 0) log->injtic = getticks();
+
+      /* Check if data has arrived. */
+      BLOCKTYPE *dataptr = (BLOCKTYPE *)infinity_check_ready(qphandle,
+                                                             log->otherrank,
+                                                             log->offset);
+      if (dataptr != NULL) {
+        // XXX should check these for correctness.
+        // int subtype = dataptr[1];
+        // size_t size = dataptr[2];
+        // int tag = dataptr[3];
+
+        if (verbose)
+          message(
+              "receive message subtype %d from %d to %d todo: %d/%d,"
+              " offset %ld size %ld",
+              log->subtype, myrank, log->otherrank, todo_recv, nr_recvs,
+              log->offset, toblocks(log->size) + HEADER_SIZE);
+
+        /* Check data sent data is unchanged and received data is as
+         * expected. */
+        if (datacheck && !datacheck_test(toblocks(log->size),
+                                         &dataptr[HEADER_SIZE], log->tag)) {
+          message("Data mismatch on completion");
+        }
+
+        /* Done, clean up. */
+        log->done = 1;
+        log->endtic = getticks();
+        todo_recv--;
+
+        /* And swap with the last log if we can. */
+        if (k < todo_recv) {
+          recv_queue[k] = recv_queue[todo_recv];
+
+          /* Do this log again, we wanted to check it this loop. */
+          k--;
+        }
+      }
+      k++;
+    }
+  }
+
+  // Free data.
+  infinity_clients_free(qphandle);
+
+  message("took %.3f %s", clocks_from_ticks(getticks() - starttics),
+          clocks_getunit());
+
+  /* Thread exits. */
+  return NULL;
+}
+
+/**
+ * @brief Comparison function for tags.
+ */
+static int cmp_logs(const void *p1, const void *p2) {
+  struct mpiuse_log_entry *l1 = *(struct mpiuse_log_entry **)p1;
+  struct mpiuse_log_entry *l2 = *(struct mpiuse_log_entry **)p2;
+
+  if (l1->tic > l2->tic) return 1;
+  if (l1->tic < l2->tic) return -1;
+  return 0;
+}
+
+/**
+ * @brief Pick out the relevant logging data for our rank.
+ */
+static size_t pick_logs() {
+
+  size_t nlogs = mpiuse_nr_logs();
+  size_t maxsize = 0;
+
+  /* Queues of send and receive logs. */
+  send_queue = (struct mpiuse_log_entry **)calloc(
+      nlogs, sizeof(struct mpiuse_log_entry *));
+  nr_sends = 0;
+  recv_queue = (struct mpiuse_log_entry **)calloc(
+      nlogs, sizeof(struct mpiuse_log_entry *));
+  nr_recvs = 0;
+
+  for (size_t k = 0; k < nlogs; k++) {
+    struct mpiuse_log_entry *log = mpiuse_get_log(k);
+    if (log->activation) {
+      if (log->rank == myrank) {
+        log->done = 0;
+        log->injtic = 0;
+        log->endtic = 0;
+        log->data = NULL;
+        log->ranktag =
+            toranktag(log->subtype, log->otherrank, log->rank, log->tag);
+
+        /* Scale size. */
+        log->size *= messagescale;
+
+        if (log->type == task_type_send) {
+          send_queue[nr_sends] = log;
+          nr_sends++;
+        } else if (log->type == task_type_recv) {
+          recv_queue[nr_recvs] = log;
+          nr_recvs++;
+        } else {
+          error("task type '%d' is not a known send or recv task", log->type);
+        }
+      }
+
+      /* Across all ranks. */
+      if (log->size > maxsize) maxsize = log->size;
+    }
+  }
+
+  /* Sort into increasing tic. */
+  qsort(recv_queue, nr_recvs, sizeof(struct mpiuse_log_entry *), cmp_logs);
+  qsort(send_queue, nr_sends, sizeof(struct mpiuse_log_entry *), cmp_logs);
+
+  /* Offsets and ranktags. */
+  ranktag_offsets =
+      (size_t *)calloc(nr_ranks * nr_ranks * max_logs, sizeof(size_t));
+  ranktag_lists =
+      (size_t *)calloc(nr_ranks * nr_ranks * max_logs, sizeof(size_t));
+  ranktag_sizes =
+      (size_t *)calloc(nr_ranks * nr_ranks * max_logs, sizeof(size_t));
+
+  /* Setup the ranktag offsets for our receive windows. Also define the sizes
+   * of the windows. */
+  for (int k = 0; k < nr_recvs; k++) {
+    struct mpiuse_log_entry *log = recv_queue[k];
+    ranktag_lists[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, k)] =
+        log->ranktag;
+    ranktag_offsets[INDEX3(nr_ranks, nr_ranks, log->otherrank, log->rank, k)] =
+        ranktag_sizes[log->otherrank];
+    log->offset = ranktag_sizes[log->otherrank];
+
+    /* Need to use a multiple of blocks to keep alignment. */
+    size_t size = toblocks(log->size) + HEADER_SIZE;
+    ranktag_sizes[log->otherrank] += size;
+  }
+
+  if (verbose) {
+    message("maxsize = %zd, nr_sends = %d, nr_recvs = %d", maxsize, nr_sends,
+            nr_recvs);
+  }
+
+  return maxsize;
+}
+
+/**
+ * @brief usage help.
+ */
+static void usage(char *argv[]) {
+  fprintf(stderr, "Usage: %s [-vf] SWIFT_mpiuse-log-file.dat logfile.dat\n",
+          argv[0]);
+  fprintf(stderr, " options: -v verbose\n");
+  fflush(stderr);
+}
+
+/**
+ * @brief main function.
+ */
+int main(int argc, char *argv[]) {
+
+  /* Start time for logging. This will be reset to restart time. */
+  clocks_set_cpufreq(0);
+  cpufreq = clocks_get_cpufreq();
+
+  /* Initiate MPI. */
+  int prov = 0;
+  int res = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &prov);
+  if (res != MPI_SUCCESS)
+    error("Call to MPI_Init_thread failed with error %i.", res);
+
+  res = MPI_Comm_size(MPI_COMM_WORLD, &nr_ranks);
+  if (res != MPI_SUCCESS) error("MPI_Comm_size failed with error %i.", res);
+
+  res = MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
+  if (res != MPI_SUCCESS)
+    error("Call to MPI_Comm_rank failed with error %i.", res);
+
+  /* Handle the command-line, we expect a mpiuse data file to read and various
+   * options. */
+  int opt;
+  while ((opt = getopt(argc, argv, "vds:")) != -1) {
+    switch (opt) {
+      case 'd':
+        datacheck = 1;
+        break;
+      case 'v':
+        verbose = 1;
+        break;
+      case 's':
+        messagescale = atof(optarg);
+        break;
+      default:
+        if (myrank == 0) usage(argv);
+        return 1;
+    }
+  }
+  if (optind >= argc - 1) {
+    if (myrank == 0) usage(argv);
+    return 1;
+  }
+  char *infile = argv[optind];
+  char *logfile = argv[optind + 1];
+
+  /* Now we read the SWIFT MPI logger output that defines the communcations
+   * we will undertake and the time differences between injections into the
+   * queues. Note this has all ranks for a single steps, SWIFT outputs one MPI
+   * log per rank per step, so you need to combine all ranks from a step. */
+  mpiuse_log_restore(infile);
+  int nranks = mpiuse_nr_ranks();
+
+  /* This should match the expected size. */
+  if (nr_ranks != nranks)
+    error("The number of MPI ranks %d does not match the expected value %d",
+          nranks, nr_ranks);
+
+  /* Index of most significant bits in the maximum rank and subtypes.
+   * Assumes GCC intrinsic. */
+  rank_shift = (sizeof(int) * CHAR_BIT) - __builtin_clz(nr_ranks);
+  subtype_shift = (sizeof(int) * CHAR_BIT) - __builtin_clz(32);
+
+  /* We all need to agree on a maximum count of logs, so we can exchange the
+   * offset arrays (would be ragged otherwise and difficult to exchange). */
+  max_logs = mpiuse_nr_logs() / 2 + 1;
+  MPI_Allreduce(MPI_IN_PLACE, &max_logs, 1, MPI_AINT, MPI_MAX, MPI_COMM_WORLD);
+
+  /* Extract the send and recv messages for our rank. */
+  size_t maxsize = pick_logs();
+
+  /* Largest Size of a message. Needs to align on size_t. */
+  max_size = toblocks(maxsize) + HEADER_SIZE;
+
+  /* We need to share all the offsets for each communicator with all the other
+   * ranks so they can push data into the correct parts of our receive
+   * window. */
+  MPI_Allreduce(MPI_IN_PLACE, ranktag_offsets, nr_ranks * nr_ranks * max_logs,
+                MPI_AINT, MPI_SUM, MPI_COMM_WORLD);
+  MPI_Allreduce(MPI_IN_PLACE, ranktag_lists, nr_ranks * nr_ranks * max_logs,
+                MPI_AINT, MPI_SUM, MPI_COMM_WORLD);
+
+  /* Now for the RDMA setup. We need the IP addresses of all the ranks. */
+
+  /* Each rank can find its name and IP. */
+  char name[MPI_MAX_PROCESSOR_NAME];
+  int namelen = 0;
+  MPI_Get_processor_name(name, &namelen);
+  char ip[infinity_max_server_ip];
+  strncpy(ip, toipaddr(name), infinity_max_server_ip);
+
+  /* And distribute, so we all know everyone's IPs. */
+  struct mpi_servers servers;
+  servers.ip = (char *)malloc(sizeof(char) * nr_ranks * infinity_max_server_ip);
+  MPI_Allgather(ip, infinity_max_server_ip, MPI_BYTE, servers.ip,
+                infinity_max_server_ip, MPI_BYTE, MPI_COMM_WORLD);
+
+  if (myrank == 0 && verbose) {
+    message("RDMA servers will listen on:");
+    for (int j = 0; j < nr_ranks; j++) {
+      for (int k = 0; k < nr_ranks; k++) {
+        if (k != j) {
+          message("  %d: %s on port %d", j,
+                  &servers.ip[j * infinity_max_server_ip], BASE_PORT + k);
+        }
+      }
+    }
+  }
+
+  /* Time to start time. Try to make it synchronous across the ranks. */
+  MPI_Barrier(MPI_COMM_WORLD);
+  clocks_set_cpufreq(cpufreq);
+  if (myrank == 0) {
+    message("Start of MPI tests");
+    message("==================");
+    if (verbose) {
+      if (datacheck)
+        message("checking data pattern on send and recv completion");
+    }
+  }
+
+  /* Server rank that listens for connections and receives data. */
+  pthread_t recvthread;
+  if (pthread_create(&recvthread, NULL, &recv_thread, &servers) != 0)
+    error("Failed to create recv thread.");
+
+  /* Now we have a single thread to send the messages. */
+  pthread_t sendthread;
+  if (pthread_create(&sendthread, NULL, &send_thread, &servers) != 0)
+    error("Failed to create send thread.");
+
+  /* Wait until all threads have exited and all message exchanges have
+   * completed. */
+  pthread_join(sendthread, NULL);
+  pthread_join(recvthread, NULL);
+
+  /* Dump the updated MPI logs. */
+  MPI_Barrier(MPI_COMM_WORLD);
+  if (myrank == 0) message("Dumping updated log");
+  mpiuse_dump_logs(nranks, logfile);
+
+  /* Shutdown MPI. */
+  res = MPI_Finalize();
+  if (res != MPI_SUCCESS)
+    error("call to MPI_Finalize failed with error %i.", res);
+
+  /* Free resources. */
+  free(servers.ip);
+
+  if (myrank == 0) message("Bye");
+
+  return 0;
+}