Skip to content
Snippets Groups Projects
test_bh_mpi.c 42.33 KiB
/*******************************************************************************
 * This file is part of QuickSched.
 * Coypright (c) 2014 Pedro Gonnet (pedro.gonnet@durham.ac.uk),
 *                    Matthieu Schaller (matthieu.schaller@durham.ac.uk)
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
* *****************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Standard includes. */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <math.h>
#include <float.h>
#include <limits.h>
#include <omp.h>
#include <fenv.h>
#include <mpi.h>

/* Local includes. */
#include "quicksched.h"
#include "res.h"

/* Some local constants. */
#define cell_pool_grow 1000
#define cell_maxparts 50
#define task_limit 1
#define const_G 1    // 6.6738e-8
#define dist_min 0.5 /* Used for legacy walk only */
#define dist_cutoff_ratio 1.5

#define ICHECK -1
#define SANITY_CHECKS
#define NO_COM_AS_TASK
#define NO_COUNTERS
#define EXACT



/** Data structure for the particles. */
struct part {
  double x[3];
  union {
    float a[3];
    float a_legacy[3];
    #ifndef EXACT
    float a_exact[3];
    #endif
  };
    #ifdef EXACT
    float a_exact[3];
    #endif
  float mass;
  int id;
};  // __attribute__((aligned(32)));

struct part_local {
  float x[3];
  float a[3];
  float mass;
} __attribute__((aligned(32)));

struct multipole {

#ifdef QUADRUPOLES
  double I_xx;
  double I_yy;
  double I_zz;
  double I_xy;
  double I_xz;
  double I_yz;
#endif

  double com[3];
  float mass;
};

struct multipole_local {

#ifdef QUADRUPOLES
  float I_xx;
  float I_yy;
  float I_zz;
  float I_xy;
  float I_xz;
  float I_yz;
#endif

  float com[3];
  float mass;
};

/** Data structure for the BH tree cell. */
struct cell {
  double loc[3];
  double h;

  int count;
  unsigned short int split, sorted;
  struct part *parts;

  qsched_res_t firstchild; /* Next node if opening */
  qsched_res_t sibling;    /* Next node */

  /* We keep both CoMs and masses to make sure the comp_com calculation is
   * correct (use an anonymous union to keep variable names compact).  */
  union {

    /* Information for the legacy walk */
    struct multipole legacy;

    /* Information for the QuickShed walk */
    struct multipole new;
  };

  long long int res,res_parts, com_tid;
  struct index *indices;

} __attribute__((aligned(128)));

/** Task types. */
enum task_type {
  task_type_self = 0,
  task_type_pair,
  task_type_pair_pc,
  task_type_com,
  task_type_count
};

/** Global variable for the pool of allocated cells. */
struct cell *cell_pool = NULL;


/**
 *
 * @brief Checks whether the cells are direct neighbours or not. Cells can be different sizes.
 *
 */
static inline int are_neighbours_different_size(struct cell *ci, struct cell *cj) {

    float min_dist =  min_dist = 0.5*(ci->h+cj->h);

    for(int k = 0; k < 3; k++)
    {
        float center_i = ci->loc[k];
        float center_j = cj->loc[k];
        if( fabsf(center_i - center_j) > min_dist) return 0;
    }
    return 1;
}

/**
 * @brief Checks whether the cells are direct neighbours ot not. Both cells have
 * to be of the same size
 *
 */
static inline int are_neighbours(struct cell *ci, struct cell *cj) {

#ifdef SANITY_CHECKS
  if (ci->h != cj->h)
    error(" Cells of different size in distance calculation.");
#endif

  /* Maximum allowed distance */
  float min_dist = ci->h;

  /* (Manhattan) Distance between the cells */
  for (int k = 0; k < 3; k++) {
    float center_i = ci->loc[k];
    float center_j = cj->loc[k];
    if (fabsf(center_i - center_j) > min_dist) return 0;
  }

  return 1;
}

/**
 * @brief Interacts all count particles in parts
 * with a series of multipoles
 *
 * @param parts An array of local particles
 * @param count The number of particles in the array
 * @param mults The multipoles to interact with
 * @param mult_count The number of multipoles to interact with
 */
static inline void make_interact_pc(struct part_local *parts, int part_count, 
				    struct multipole_local mults[8], int mult_count) {

  int i, j, k;
  float dx[3] = {0.0, 0.0, 0.0}, r2, ir, mrinv3;

#ifdef SANITY_CHECKS

  /* Sanity checks */
  if (part_count == 0) error("Empty cell!");

  /* Sanity check. */
  for (j = 0; j < mult_count; j++)
    if (mults[j].mass == 0.0) 
      error("com %d does not seem to have been set.", j);

#endif

  /* Loop over every particle in leaf. */
  for (i = 0; i < part_count; i++) {

    /* Loop over the multipoles */
    for (j = 0; j < mult_count; j++) {
      
      /* Compute the pairwise distance. */
      for (r2 = 0.0, k = 0; k < 3; k++) {
	dx[k] = mults[j].com[k] - parts[i].x[k];
	r2 += dx[k] * dx[k];
      }

      /* Apply the gravitational acceleration. */
      ir = 1.0f / sqrtf(r2);
      mrinv3 = mults[j].mass * const_G * ir * ir * ir;

#ifndef QUADRUPOLES

      parts[i].a[0] += mrinv3 * dx[0];
      parts[i].a[1] += mrinv3 * dx[1];
      parts[i].a[2] += mrinv3 * dx[2];

#else

      /* Follows the notation in Bonsai */
      float mrinv5 = mrinv3 * ir * ir;
      float mrinv7 = mrinv5 * ir * ir;

      float D1 = -mrinv3;
      float D2 =  3.f * mrinv5;
      float D3 = -15.f * mrinv7;

      float q = mults[j].I_xx + mults[j].I_yy + mults[j].I_zz;
      float qRx = mults[j].I_xx*dx[0] + mults[j].I_xy*dx[1] + mults[j].I_xz*dx[2];
      float qRy = mults[j].I_xy*dx[0] + mults[j].I_yy*dx[1] + mults[j].I_yz*dx[2];
      float qRz = mults[j].I_xz*dx[0] + mults[j].I_yz*dx[1] + mults[j].I_zz*dx[2];
      float qRR = qRx * dx[0] + qRy * dx[1] + qRz * dx[2];
      float C = D1 + 0.5f * D2 * q + 0.5f * D3 * qRR;

      parts[i].a[0] -= C * dx[0] + D2 * qRx;
      parts[i].a[1] -= C * dx[1] + D2 * qRy;
      parts[i].a[2] -= C * dx[2] + D2 * qRz;


#endif

    } /* loop over every multipole. */
  } /* loop over every particle. */
}


static inline void iact_pair_direct(struct qsched *s, struct cell *ci, struct cell *cj) {

  int i, j, k;
  ci->parts = (struct part *) qsched_getresdata(s, ci->res_parts);
  cj->parts = (struct part *) qsched_getresdata(s, cj->res_parts);
  int count_i = ci->count, count_j = cj->count;
  float xi[3];
  float dx[3], ai[3], mi, mj, r2, w, ir;
  double com[3];
  struct part_local parts_j[count_j];
  struct part *parts_i = ci->parts;

#ifdef SANITY_CHECKS

  /* Bad stuff will happen if cell sizes are different */
  if (ci->h != cj->h)
    error("Non matching cell sizes !! h_i=%f h_j=%f\n", ci->h, cj->h);

  if(ci->loc[0] == cj->loc[0] && ci->loc[1] == cj->loc[1] && ci->loc[2] == cj->loc[2])
    error("ci and cj are the same");

  if(ci->parts <  cj->parts && ci->parts + ci->count > cj->parts){
    printf("ci->res = %lli, cj->res = %lli\n", ci->res, cj->res);
    printf("ci->parts_res = %lli, cj->parts_res = %lli\n", ci->res_parts, cj->res_parts);
    printf("ci->h = %.3f, cj->h = %.3f\n", ci->h, cj->h);
    printf("ci->parts - cj->parts = %li\n", ci->parts - cj->parts);
    printf("ci->parts = %p, cj->parts = %p\n", ci->parts, cj->parts);
    error("cj parts is a subset of ci parts");
    }
  if(ci->parts >  cj->parts && ci->parts < cj->parts + cj->count){
    printf("ci->res = %lli, cj->res = %lli\n", ci->res, cj->res);
    printf("ci->parts_res = %lli, cj->parts_res = %lli\n", ci->res_parts, cj->res_parts);
    printf("ci->h = %.3f, cj->h = %.3f\n", ci->h, cj->h);
    printf("ci->parts - cj->parts = %li\n", ci->parts - cj->parts);
    error("ci parts is a subset of cj parts");
    }

  if(parts_i == NULL)
    error("parts_i hasn't been set");


#endif

  /* Find the center point of the interaction. */
  for (k = 0; k < 3; k++) {
    com[k] = 0.5 * (ci->loc[k] + cj->loc[k]);
  }

  /* Init the local copies of the particles. */
  for (k = 0; k < count_j; k++) {
    for (j = 0; j < 3; j++) {
      parts_j[k].x[j] = cj->parts[k].x[j] - com[j];
      parts_j[k].a[j] = 0.0f;
    }
    parts_j[k].mass = cj->parts[k].mass;
  }

  /* Loop over all particles in ci... */
  for (i = 0; i < count_i; i++) {

    /* Init the ith particle's data. */
    for (k = 0; k < 3; k++) {
      xi[k] = parts_i[i].x[k] - com[k];
      ai[k] = 0.0f;
    }
    mi = parts_i[i].mass;

    /* Loop over every particle in the other cell. */
    for (j = 0; j < count_j; j++) {

      /* Compute the pairwise distance. */
      for (r2 = 0.0, k = 0; k < 3; k++) {
        dx[k] = xi[k] - parts_j[j].x[k];
        r2 += dx[k] * dx[k];
      }

        /*if(r2 == 0)
        {
            printf("parts_j[j] = %.3f %.3f %.3f\n", parts_j[j].x[0], parts_j[j].x[1], parts_j[j].x[2]);
            printf("cj->parts[j] = %.3f %.3f %.3f\n", cj->parts[j].x[0], cj->parts[j].x[1] ,cj->parts[j].x[2]);
            printf("parts_i[i] = %.3f %.3f %.3f\n", xi[0], xi[1], xi[2]);
            printf("ci->parts[i] = %.3f %.3f %.3f\n", ci->parts[i].x[0], ci->parts[i].x[1] ,ci->parts[i].x[2]);

            printf("ci->loc[0] = %.3f, cj->loc[0] = %.3f\n", ci->loc[0], cj->loc[0]);
            printf("ci->loc[1] = %.3f, cj->loc[1] = %.3f\n", ci->loc[1], cj->loc[1]);
            printf("ci->loc[2] = %.3f, cj->loc[2] = %.3f\n", ci->loc[2], cj->loc[2]);
        }*/
      /* Apply the gravitational acceleration. */
      ir = 1.0f / sqrtf(r2);
      w = const_G * ir * ir * ir;
      mj = parts_j[j].mass;
      for (k = 0; k < 3; k++) {
        float wdx = w * dx[k];
        parts_j[j].a[k] += wdx * mi;
        ai[k] -= wdx * mj;
      }

#ifdef COUNTERS
      ++count_direct_unsorted;
#endif

#if ICHECK >= 0
      if (parts_i[i].id == ICHECK)
        /*message(
            "[NEW] Interaction with particle id= %d (pair i) a=( %e %e %e )",
            cj->parts[j].id, -mi * w * dx[0], -mi * w * dx[1], -mi * w * dx[2]);*/
        message(
            "%d %e %e %e",
            cj->parts[j].id, -mi * w * dx[0], -mi * w * dx[1], -mi * w * dx[2]);

      if (cj->parts[j].id == ICHECK)
        /*message(
            "[NEW] Interaction with particle id= %d (pair j) a=( %e %e %e )",
            parts_i[i].id, mj * w * dx[0], mj * w * dx[1], mj * w * dx[2]);*/
        message(
            "%d %e %e %e",
            parts_i[i].id, mj * w * dx[0], mj * w * dx[1], mj * w * dx[2]);
#endif

    } /* loop over every other particle. */

    /* Store the accumulated acceleration on the ith part. */
    for (k = 0; k < 3; k++) parts_i[i].a[k] += ai[k];

  } /* loop over all particles. */

  /* Copy the local particle data back. */
  for (k = 0; k < count_j; k++) {
    for (j = 0; j < 3; j++) cj->parts[k].a[j] += parts_j[k].a[j];
  }
}

/**
 * @brief Interacts all particles in cell c with themselves
 * either by recursing or by direct summation
 *
 * @param c The #cell containing the particles
 */
void iact_self_direct(struct qsched *s, struct cell *c) {
  int i, j, k, count = c->count;
  float xi[3] = {0.0, 0.0, 0.0};
  float ai[3] = {0.0, 0.0, 0.0}, mi, mj, dx[3] = {0.0, 0.0, 0.0}, r2, ir, w;
  //struct cell *cp, *cps;
  c->parts = qsched_getresdata(s, c->res_parts);

#ifdef SANITY_CHECKS

  /* Early abort? */
  if (count == 0) error("Empty cell !");
#endif

    /* Init the local copies of the particles. */
    double loc[3];
    struct part_local parts[count];
    loc[0] = c->loc[0];
    loc[1] = c->loc[1];
    loc[2] = c->loc[2];
    for (k = 0; k < count; k++) {
      for (j = 0; j < 3; j++) {
        parts[k].x[j] = c->parts[k].x[j] - loc[j];
        parts[k].a[j] = 0.0f;
      }
      parts[k].mass = c->parts[k].mass;
    }

    /* Loop over all particles... */
    for (i = 0; i < count; i++) {

      /* Init the ith particle's data. */
      for (k = 0; k < 3; k++) {
        xi[k] = parts[i].x[k];
        ai[k] = 0.0;
      }
      mi = parts[i].mass;

      /* Loop over every following particle. */
      for (j = i + 1; j < count; j++) {

        /* Compute the pairwise distance. */
        for (r2 = 0.0, k = 0; k < 3; k++) {
          dx[k] = xi[k] - parts[j].x[k];
          r2 += dx[k] * dx[k];
        }

        /* Apply the gravitational acceleration. */
        ir = 1.0f / sqrtf(r2);
        w = const_G * ir * ir * ir;
        mj = parts[j].mass;
        for (k = 0; k < 3; k++) {
          float wdx = w * dx[k];
          parts[j].a[k] += wdx * mi;
          ai[k] -= wdx * mj;
        }

#if ICHECK >= 0
        if (c->parts[i].id == ICHECK)
/*          message("[NEW] Interaction with particle id= %d (self i)  a=( %e %e %e )",
                  c->parts[j].id,  -mi * w * dx[0], -mi * w * dx[1], -mi * w * dx[2]);*/
          message("%d %e %e %e",
                  c->parts[j].id,  -mi * w * dx[0], -mi * w * dx[1], -mi * w * dx[2]);

        if (c->parts[j].id == ICHECK)
/*          message("[NEW] Interaction with particle id= %d (self j) a =( %e %e %e )",
                  c->parts[i].id,  -mj * w * dx[0], -mj * w * dx[1], -mj * w * dx[2]);*/
          message("%d %e %e %e",
                  c->parts[i].id,  -mj * w * dx[0], -mj * w * dx[1], -mj * w * dx[2]);
#endif

      } /* loop over every other particle. */

      /* Store the accumulated acceleration on the ith part. */
      for (k = 0; k < 3; k++) parts[i].a[k] += ai[k];

    } /* loop over all particles. */

    /* Copy the local particle data back. */
    for (k = 0; k < count; k++) {
      for (j = 0; j < 3; j++) c->parts[k].a[j] += parts[k].a[j];
    }
}

static inline void iact_pair_pc(struct qsched *s, struct cell *ci, struct cell *cj) {

    struct part_local *parts = NULL;
    struct multipole_local mult_local[1];
    int part_count;
    ci->parts = (struct part*) qsched_getresdata(s, ci->res_parts);

    #ifdef SANITY_CHECKS
        if(are_neighbours(ci, cj))
            error("ci and cj are neighbours");
    #endif

    /* Load particles. */
    part_count = ci->count;
     if ((parts = (struct part_local *) malloc(sizeof(struct part_local) * part_count)) == NULL)
        	error("Failed to allocate local parts.");
        for (int k = 0; k < part_count; k++) {
            for (int j = 0; j < 3; j++) {
	            parts[k].x[j] = ci->parts[k].x[j] - ci->loc[j];
            	parts[k].a[j] = 0.0f;
            }
            parts[k].mass = ci->parts[k].mass;
        }

    mult_local[0].com[0] = cj->new.com[0] - ci->loc[0];
    mult_local[0].com[1] = cj->new.com[1] - ci->loc[1];
    mult_local[0].com[2] = cj->new.com[2] - ci->loc[2];
    mult_local[0].mass = cj->new.mass;
    
    #ifdef QUADRUPOLES
    /* Final operation to get the quadrupoles */
    double imass = 1. / cj->new.mass;
    mult_local[0].I_xx = cj->new.I_xx*imass - cj->new.com[0] * cj->new.com[0];
    mult_local[0].I_xy = cj->new.I_xy*imass - cj->new.com[0] * cj->new.com[1];
    mult_local[0].I_xz = cj->new.I_xz*imass - cj->new.com[0] * cj->new.com[2];
    mult_local[0].I_yy = cj->new.I_yy*imass - cj->new.com[1] * cj->new.com[1];
    mult_local[0].I_yz = cj->new.I_yz*imass - cj->new.com[1] * cj->new.com[2];
    mult_local[0].I_zz = cj->new.I_zz*imass - cj->new.com[2] * cj->new.com[2];
    #endif	

    /* Perform the interaction between the particles and the list of multipoles */
    make_interact_pc(parts, part_count, mult_local, 1);
    
    /* Clean up local parts? */
    for (int k = 0; k < part_count; k++) {
        for (int j = 0; j < 3; j++) ci->parts[k].a[j] += parts[k].a[j];
    }
    free(parts);
}

struct cell *cell_get(struct qsched *s) {

  struct cell *res;
//  int k;

  long long int resource = qsched_addres(s, qsched_owner_none, sizeof(struct cell), (void**) &res );
  res->res = resource;
  res->firstchild = -1;

  /* Return the cell. */
  return res;
}


/**
 * @brief Compute the center of mass of a given cell.
 *
 * @param c The #cell.
 */
void comp_com(struct qsched *s, struct cell *c) {

  int k, count = c->count;
  struct cell *cp;
  struct part *parts = c->parts;
  double com[3] = {0.0, 0.0, 0.0}, mass = 0.0, imass = 0.0;

#ifdef QUADRUPOLES
  double I_xx = 0.;
  double I_yy = 0.;
  double I_zz = 0.;
  double I_xy = 0.;
  double I_xz = 0.;
  double I_yz = 0.;
#endif

#ifdef SANITY_CHECKS
  if ( count == 0 )
    error("CoM computed for an empty cell");
#endif
  
  if (c->split) {

    /* Loop over the projenitors and collect the multipole information. */
    for (cp = (struct cell*) qsched_getresdata(s, c->firstchild); cp != (struct cell*) qsched_getresdata(s, c->sibling); cp = (struct cell*) qsched_getresdata(s, cp->sibling) ) {
      float cp_mass = cp->new.mass;
      com[0] += cp->new.com[0] * cp_mass;
      com[1] += cp->new.com[1] * cp_mass;
      com[2] += cp->new.com[2] * cp_mass;
      mass += cp_mass;

#ifdef QUADRUPOLES
      I_xx += cp->new.I_xx;
      I_yy += cp->new.I_yy;
      I_zz += cp->new.I_zz;
      I_xy += cp->new.I_xy;
      I_xz += cp->new.I_xz;
      I_yz += cp->new.I_yz;
#endif 

    }

  /* Otherwise, collect the multipole from the particles. */
  } else {

    for (k = 0; k < count; k++) {
      float p_mass = parts[k].mass;
      com[0] += parts[k].x[0] * p_mass;
      com[1] += parts[k].x[1] * p_mass;
      com[2] += parts[k].x[2] * p_mass;
      mass += p_mass;

#ifdef QUADRUPOLES
      I_xx += parts[k].x[0] * parts[k].x[0] * p_mass;
      I_yy += parts[k].x[1] * parts[k].x[1] * p_mass;
      I_zz += parts[k].x[2] * parts[k].x[2] * p_mass;
      I_xy += parts[k].x[0] * parts[k].x[1] * p_mass;
      I_xz += parts[k].x[0] * parts[k].x[2] * p_mass;
      I_yz += parts[k].x[1] * parts[k].x[2] * p_mass;
#endif 

    }

  }

  /* Store the multipole data */
  imass = 1.0f / mass;
  c->new.mass = mass;
  c->new.com[0] = com[0] * imass;
  c->new.com[1] = com[1] * imass;
  c->new.com[2] = com[2] * imass;


#ifdef QUADRUPOLES
  /* Store the quadrupole data */
  c->new.I_xx = I_xx;
  c->new.I_yy = I_yy;
  c->new.I_zz = I_zz;
  c->new.I_xy = I_xy;
  c->new.I_xz = I_xz;
  c->new.I_yz = I_yz;
#endif

}


/**
 * @brief Sort the parts into eight bins along the given pivots and
 *        fill the multipoles. Also adds the hierarchical resources
 *        to the sched.
 *
 * @param c The #cell to be split.
 * @param N The total number of parts.
 * @param s The #sched to store the resources.
 */

void cell_split(struct cell *c, struct qsched *s) {

  int i, j, k, kk, count = c->count;
  struct part temp, *parts = qsched_getresdata( s, c->res_parts );
  struct cell *cp;
  int left[8], right[8];
  double pivot[3];
  static struct cell *root = NULL;
  struct cell *progenitors[8];

  /* Set the root cell. */
  if (root == NULL) {
    root = c;
    c->sibling = 0;
  }

  /* Add a resource for this cell if it doesn't have one yet. */
  //if (c->res == qsched_res_none)
    //c->res = qsched_addres_local(s, qsched_owner_none, qsched_res_none);

  /* Does this cell need to be split? */
  if (count > cell_maxparts) {

    /* Mark this cell as split. */
    c->split = 1;

    /* Create the progeny. */
    for (k = 0; k < 8; k++) {
      progenitors[k] = cp = cell_get(s);
      cp->loc[0] = c->loc[0];
      cp->loc[1] = c->loc[1];
      cp->loc[2] = c->loc[2];
      cp->h = c->h * 0.5;
      //cp->res = qsched_addres_local(s, qsched_owner_none, c->res);
      if (k & 4) cp->loc[0] += cp->h;
      if (k & 2) cp->loc[1] += cp->h;
      if (k & 1) cp->loc[2] += cp->h;
    }

    /* Init the pivots. */
    for (k = 0; k < 3; k++) pivot[k] = c->loc[k] + c->h * 0.5;

    /* Split along the x-axis. */
    i = 0;
    j = count - 1;
    while (i <= j) {
      while (i <= count - 1 && parts[i].x[0] < pivot[0]) i += 1;
      while (j >= 0 && parts[j].x[0] >= pivot[0]) j -= 1;
      if (i < j) {
        temp = parts[i];
        parts[i] = parts[j];
        parts[j] = temp;
      }
    }
    left[1] = i;
    right[1] = count - 1;
    left[0] = 0;
    right[0] = j;

    /* Split along the y axis, twice. */
    for (k = 1; k >= 0; k--) {
      i = left[k];
      j = right[k];
      while (i <= j) {
        while (i <= right[k] && parts[i].x[1] < pivot[1]) i += 1;
        while (j >= left[k] && parts[j].x[1] >= pivot[1]) j -= 1;
        if (i < j) {
          temp = parts[i];
          parts[i] = parts[j];
          parts[j] = temp;
        }
      }
      left[2 * k + 1] = i;
      right[2 * k + 1] = right[k];
      left[2 * k] = left[k];
      right[2 * k] = j;
    }

    /* Split along the z axis, four times. */
    for (k = 3; k >= 0; k--) {
      i = left[k];
      j = right[k];
      while (i <= j) {
        while (i <= right[k] && parts[i].x[2] < pivot[2]) i += 1;
        while (j >= left[k] && parts[j].x[2] >= pivot[2]) j -= 1;
        if (i < j) {
          temp = parts[i];
          parts[i] = parts[j];
          parts[j] = temp;
        }
      }
      left[2 * k + 1] = i;
      right[2 * k + 1] = right[k];
      left[2 * k] = left[k];
      right[2 * k] = j;
    }

    /* Store the counts and offsets. */
    for (k = 0; k < 8; k++) {
      progenitors[k]->count = right[k] - left[k] + 1;
//      progenitors[k]->parts = &c->parts[left[k]];
      progenitors[k]->res_parts = qsched_addchildres(s, c->res_parts, qsched_owner_none, (right[k] - left[k] + 1)*sizeof(struct part), left[k]*sizeof(struct part), (void**) &progenitors[k]->parts );
    }

    /* Find the first non-empty progenitor */
    for (k = 0; k < 8; k++)
      if (progenitors[k]->count > 0) {
        c->firstchild = progenitors[k]->res;
        break;
      }

    /* Prepare the pointers. */
    for (k = 0; k < 8; k++) {

      /* Find the next non-empty sibling */
      for (kk = k + 1; kk < 8; ++kk) {
        if (progenitors[kk]->count > 0) {
          progenitors[k]->sibling = progenitors[kk]->res;
          break;
        }
      }

      /* No non-empty sibling ? Go back a level */
      if (kk == 8) progenitors[k]->sibling = c->sibling;
    }

    /* Recurse. */
    for (k = 0; k < 8; k++)
      if (progenitors[k]->count > 0) cell_split(progenitors[k], s);


    /* Otherwise, we're at a leaf, so create the cell's particle-cell task. */
  }

  /* Set this cell's resources ownership. */
    comp_com(s, c);

}


/** 
 * @brief Create the tasks for the particle-cell interactions.
 * @param s The #sched in which to create the tasks.
 * @param ci The first #cell.
 * @param cj The second #cell.
 * @param depth The current depth.
 * @param leaf_depth The depth at which to start making tasks.
 */

void create_pcs(struct qsched *s, struct cell *ci, struct cell *cj, int depth, int leaf_depth){

  qsched_task_t tid;
  qsched_task_t data[2];
  qsched_task_t cp, cps;
  struct cell *cp1, *cp2;

    #ifdef SANITY_CHECKS
        if(cj!= NULL && ci->h != cj->h)
            error("ci and cj have different h values.");
    #endif

    /* If cj == NULL we're at the root.*/
    if(cj == NULL)
    {
        for (cp = ci->firstchild; cp != ci->sibling; cp = cp1->sibling) {
            cp1 = (struct cell*) qsched_getresdata(s, cp);
                /* Recurse over all pairs. */
            for (cps = cp1->sibling; cps != ci->sibling; cps = cp2->sibling)
            {
                cp2 = (struct cell*) qsched_getresdata(s, cps);
                create_pcs(s, cp1, cp2, depth+1, leaf_depth);
            }
        }
        /* Once we get back here we're done!*/
        return;
    }

    /* If we're not yet at the level of making tasks */
    if(depth < leaf_depth)
    {
        for (cp = ci->firstchild; cp != ci->sibling; cp = cp1->sibling) {
            cp1 = (struct cell*) qsched_getresdata(s, cp);
                /* Recurse over all pairs. */
            for (cps = cj->firstchild; cps != cj->sibling; cps = cp2->sibling)
            {
                cp2 = (struct cell*) qsched_getresdata(s, cps);
                create_pcs(s, cp1, cp2, depth+1, leaf_depth);
            }
        }
    }else{
        /* We can make tasks if they're not neighbours! */
        if(are_neighbours(ci, cj))
        {
            /* If they are neighbours we need to recurse if possible. If either is not split then we do the pair interaction so we can stop.*/
            if(ci->split && cj->split)
            {
                for (cp = ci->firstchild; cp != ci->sibling; cp = cp1->sibling) 
                {
                    cp1 = (struct cell*) qsched_getresdata(s, cp);
                    /* Recurse over all pairs. */
                    for (cps = cj->firstchild; cps != cj->sibling; cps = cp2->sibling)
                    {
                        cp2 = (struct cell*) qsched_getresdata(s, cps);
                        create_pcs(s, cp1, cp2, depth+1, leaf_depth);
                    }
                }
            }
        }else
        {
            /* Create the task. */
            data[0] = ci->res;
            data[1] = cj->res;
            tid = qsched_addtask(s, task_type_pair_pc, task_flag_none, data,
			       sizeof(qsched_task_t) * 2, ci->count * 8 );
      
            /* Add the resource and dependance */
            qsched_addlock(s, tid, ci->res_parts);
            qsched_adduse(s, tid, ci->res);
            qsched_adduse(s, tid, cj->res);

            /* Create the task. */
            data[0] = cj->res;
            data[1] = ci->res;
            tid = qsched_addtask(s, task_type_pair_pc, task_flag_none, data,
			       sizeof(qsched_task_t) * 2, cj->count * 8 );
      
            /* Add the resource and dependance */
            qsched_addlock(s, tid, cj->res_parts);
            qsched_adduse(s, tid, cj->res);
            qsched_adduse(s, tid, ci->res);
        }
    }
}

/**
 * @brief Create the tasks for the cell pair/self.
 *
 * @param s The #sched in which to create the tasks.
 * @param ci The first #cell.
 * @param cj The second #cell.
 */
void create_tasks(struct qsched *s, struct cell *ci, struct cell *cj) {

  qsched_task_t tid;
  //struct cell *data[2];
  qsched_task_t data[2];
  qsched_task_t cp, cps;
  struct cell *cp1, *cp2;

#ifdef SANITY_CHECKS

  /* If either cell is empty, stop. */
  if (ci->count == 0 || (cj != NULL && cj->count == 0)) error("Empty cell !");

#endif

  /* Single cell? */
  if (cj == NULL) {

    /* Is this cell split and above the task limit ? */
    if (ci->split && ci->count > task_limit / ci->count) {

      /* Loop over each of this cell's progeny. */
      for (cp = ci->firstchild; cp != ci->sibling; cp = cp1->sibling) {
        cp1 = (struct cell*) qsched_getresdata(s, cp);
        /* Make self-interaction task. */
        create_tasks(s, cp1, NULL);

        /* Make all pair-interaction tasks. */
        for (cps = cp1->sibling; cps != ci->sibling; cps = cp2->sibling)
        {
          cp2 = (struct cell*) qsched_getresdata(s, cps);
          create_tasks(s, cp1, cp2);
        }
      }

      /* Otherwise, add a self-interaction task. */
    } else {

      /* Set the data. */
      data[0] = ci->res;
      data[1] = -1;

      /* Create the task. */
      tid =  qsched_addtask(s, task_type_self, task_flag_none, data,
			    sizeof( qsched_task_t) * 2, ci->count * ci->count / 2);

      /* Add the resource (i.e. the cell) to the new task. */
      qsched_addlock(s, tid, ci->res_parts);
      qsched_adduse(s, tid, ci->res);
      if(ci->split)
      {
          cp = ci->firstchild;
          while(cp != ci->sibling)
          {
                cp1 = (struct cell*) qsched_getresdata(s, cp);
                qsched_adduse(s, tid, cp1->res);
                if(cp1->split)
                    cp = cp1->firstchild;
                else
                    cp = cp1->sibling;
          }
      }
/* Since this call might recurse, add a dependency on the cell's COM
   task. */
    }

    /* Otherwise, it's a pair. */
  } else {

    /* Are both cells split and we are above the task limit ? */
    if (ci->split && cj->split && ci->count > task_limit / cj->count) {
      
      /* Let's split both cells and build all possible pairs */
      for (cp = ci->firstchild; cp != ci->sibling; cp = cp1->sibling) {
        cp1 = (struct cell*) qsched_getresdata(s, cp); 
	for (cps = cj->firstchild; cps != cj->sibling; cps = cp2->sibling) {
	    cp2 = (struct cell*) qsched_getresdata(s, cps);
	  /* Recurse */
	  if (are_neighbours(cp1, cp2)) {
	    create_tasks(s, cp1, cp2);
	        }
    	}
    }

#ifdef OLD_SETUP
      /* Create the task. */
      data[0] = ci->res;
      data[1] = cj->res;
      tid = qsched_addtask(s, task_type_pair_pc, task_flag_none, data,
			   sizeof(qsched_task_t) * 2, ci->count * 8 );
      
      /* Add the resource and dependance */
      qsched_addlock(s, tid, ci->res_parts);
      qsched_adduse(s, tid, ci->res);
      qsched_adduse(s, tid, cj->res);
      for(cp = ci->firstchild; cp != ci->sibling; cp = cp1->sibling)
      {
            cp1 = (struct cell*) qsched_getresdata(s, cp);
            qsched_adduse(s, tid, cp1->res);
      }
      for(cp = cj->firstchild; cp != cj->sibling; cp = cp1->sibling)
      {
            cp1 = (struct cell*) qsched_getresdata(s, cp);
            qsched_adduse(s, tid, cp1->res);
      }

      /* Create the task. */
      data[0] = cj->res;
      data[1] = ci->res;
      tid = qsched_addtask(s, task_type_pair_pc, task_flag_none, data,
			   sizeof(qsched_task_t) * 2, cj->count * 8);

      qsched_addlock(s, tid, cj->res_parts);
      qsched_adduse(s, tid, ci->res);
      qsched_adduse(s, tid, cj->res);
      for(cp = ci->firstchild; cp != ci->sibling; cp = cp1->sibling)
      {
            cp1 = (struct cell*) qsched_getresdata(s, cp);
            qsched_adduse(s, tid, cp1->res);
      }
      for(cp = cj->firstchild; cp != cj->sibling; cp = cp1->sibling)
      {
            cp1 = (struct cell*) qsched_getresdata(s, cp);
            qsched_adduse(s, tid, cp1->res);
      }

#endif
      /* Add the resource and dependance */
//      qsched_addunlock(s, ci->com_tid, tid);

     
    } else {  /* Otherwise, at least one of the cells is not split, build a direct
	       * interaction. */

      /* Set the data. */
      data[0] = ci->res;
      data[1] = cj->res;
      
      /* Create the task. */
      tid = qsched_addtask(s, task_type_pair, task_flag_none, data,
			   sizeof(qsched_task_t) * 2, ci->count * cj->count);
      
        struct part *part_j = (struct part*) qsched_getresdata(s, cj->res_parts );
        struct part *part_i = (struct part*) qsched_getresdata(s, ci->res_parts );
        ci->parts = part_i;
        cj->parts = part_j;

      /* Add the resources. */
      qsched_addlock(s, tid, ci->res_parts);
      qsched_adduse(s, tid, ci->res);
      qsched_addlock(s, tid, cj->res_parts);
      qsched_adduse(s, tid, cj->res);

    }

  }   /* Otherwise it's a pair */
}

inline int getindex(long long int id, struct qsched *s){
    return id;
}


/**
 * @brief Solve the particle interactions using the stupid N^2 algorithm
 *
 * @param N The number of particles
 * @param parts The array of particles
 */
void interact_exact(int N, struct part *parts) {

  int i, j, k;
  float ir, w, r2, dx[3];
    message("Starting exact interaction");
  /* Loop over all particles. */
  for (i = 0; i < N; ++i) {

    /* Some things to store locally. */
    double pix[3] = {parts[i].x[0], parts[i].x[1], parts[i].x[2]};
    float mi = parts[i].mass;

    /* Loop over every other particle. */
    for (j = i + 1; j < N; ++j) {

      /* Compute the pairwise distance. */
      for (r2 = 0.0, k = 0; k < 3; k++) {
        dx[k] = parts[j].x[k] - pix[k];
        r2 += dx[k] * dx[k];
      }

      /* Apply the gravitational acceleration. */
      ir = 1.0f / sqrtf(r2);
      w = const_G * ir * ir * ir;

      for (k = 0; k < 3; k++) {
        float wdx = w * dx[k];
        parts[j].a_exact[k] -= wdx * mi;
        parts[i].a_exact[k] += wdx * parts[j].mass;
      }
    }
  }

#if ICHECK >= 0
 /* for (i = 0; i < N; ++i)
    if (parts[i].id == monitor)
      message("[check] exact acceleration for particle %d a= %.3e %.3e %.3e\n",
              parts[i].id, parts[i].a_exact[0], parts[i].a_exact[1],
              parts[i].a_exact[2]);*/
#endif
    message("Completed exact interaction");
}

void output_parts(struct qsched *s)
{
    int i, j;
    FILE *file;
    file = fopen("particle_dump.out", "a");
    for(i = 0; i < s->res_ranks[s->count_ranks]; i++)
    {
        /* We want to find particle resources, which are locked only. */
        if(s->res[i].node == s->rank && s->res[i].num_lockers > 0)
        {
            struct res *resource = &s->res[i];
            int parent_output = 0;
            while(resource->parent > 0)
            {
                if( s->res[getindex(resource->parent, s)].num_lockers > 0)
                {
                    parent_output = 1;
                    break;
                }
                resource = &s->res[getindex(resource->parent, s)];   
            }
            /* If we didn't output any of the parents then we need to output this guy.*/
            if(!parent_output)
            {
                struct part *parts = qsched_getresdata(s, s->res[i].ID);
                if(parts == NULL)
                {
                    message("Failed to output parts");
                    fclose(file);
                    return;
                }
                int count = s->res[i].size / sizeof(struct part);
                for(j = 0; j < count; j++)
                {   
                    struct part cur = parts[j];
                    fprintf(file, "%i %e %e %e %e %e %e %e %e %e %e %e %e %e\n", cur.id, cur.mass, cur.x[0], cur.x[1], cur.x[2], 
                            cur.a_exact[0], cur.a_exact[1], cur.a_exact[2], cur.a_legacy[0], cur.a_legacy[1], cur.a_legacy[2], cur.a[0], cur.a[1], cur.a[2]);
                #if ICHECK >= 0
                    if(cur.id == ICHECK)
                        message("Final accelerations = %e %e %e", cur.a[0], cur.a[1], cur.a[2]);
                #endif
                }
            }
        }
    }
    fclose(file);
    message("Output parts correctly");
}







/**
 * @brief Set up and run a task-based Barnes-Hutt N-body solver.
 *
 * @param N The number of random particles to use.
 * @param nr_threads Number of threads to use.
 * @param runs Number of force evaluations to use as a benchmark.
 * @param fileName Input file name. If @c NULL or an empty string, random
 *        particle positions will be used.
 */
void test_bh(int N, int nr_threads, int runs, char *fileName) {

  int i, k;
  struct cell *root = NULL;
  struct part *parts;
  qsched_res_t parts_res;
  FILE *file;
  struct qsched s;
 // ticks tic, toc_run, tot_setup = 0, tot_run = 0;
//  int countMultipoles = 0, countPairs = 0, countCoMs = 0;
char processor_name[MPI_MAX_PROCESSOR_NAME];
    int name_len;



    // Initialize the MPI environment
int MpiThreadLevel;
 MPI_Init_thread(NULL, NULL, MPI_THREAD_MULTIPLE, &MpiThreadLevel);

if(MpiThreadLevel != MPI_THREAD_MULTIPLE)
    error("We didn't get thread multiple!");

    MPI_Get_processor_name(processor_name, &name_len);
    MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN );
  /* Runner function. */
  void runner(struct qsched *s, int type, void * data) {
    long long int *d;
    struct cell *cell_i;
    struct cell *cell_j;


    switch (type) {
        case task_type_self:
            d = (long long int*) data;
            cell_i = (struct cell*) qsched_getresdata(s, d[0]);
            iact_self_direct(s, cell_i);
            //printf("Computed a self with no seg fault!\n");
            break;
        case task_type_pair:
            d = (long long int*) data;
            cell_i = (struct cell*) qsched_getresdata(s, d[0]);
            cell_j = (struct cell*) qsched_getresdata(s, d[1]);
            iact_pair_direct(s, cell_i, cell_j);
            //printf("Computed a pair with no seg fault!\n");
            break;
        case task_type_pair_pc:
            d = (long long int*) data;
            cell_i = (struct cell*) qsched_getresdata(s, d[0]);
            cell_j = (struct cell*) qsched_getresdata(s, d[1]);
            iact_pair_pc(s, cell_i, cell_j);            
            //printf("Computed a pair_pc with no seg fault!\n");
            break;
        case task_type_send:
            break;
        case task_type_recv:
            break;
        default:
            error("Unknown task type");
    }
  }
  /* Initialize the per-task type timers. */
//  for (k = 0; k < task_type_count; k++) task_timers[k] = 0;

  /* Initialize the scheduler. */
    qsched_init(&s, nr_threads, qsched_flag_noreown, MPI_COMM_WORLD);

  /* Init and fill the particle array. */
//  if ((parts = (struct part *)malloc(sizeof(struct part) * N)) == NULL)
  //  error("Failed to allocate particle buffer.");
if(s.rank == 0)
{
  parts_res = qsched_addres(&s, qsched_owner_none, sizeof(struct part) * N, (void**) &parts);

  /* If no input file was specified, generate random particle positions. */
  if (fileName == NULL || fileName[0] == 0) {
    for (k = 0; k < N; k++) {
      parts[k].id = k;
      parts[k].x[0] = ((double)rand()) / RAND_MAX;
      parts[k].x[1] = ((double)rand()) / RAND_MAX;
      parts[k].x[2] = ((double)rand()) / RAND_MAX;
      parts[k].mass = ((double)rand()) / RAND_MAX;
      parts[k].a_legacy[0] = 0.0;
      parts[k].a_legacy[1] = 0.0;
      parts[k].a_legacy[2] = 0.0;
    }
    /* Otherwise, read them from a file. */
  } else {
    file = fopen(fileName, "r");
    if (file) {
      for (k = 0; k < N; k++) {
        if (fscanf(file, "%d", &parts[k].id) != 1)
          error("Failed to read ID of part %i.", k);
        if (fscanf(file, "%lf%lf%lf", &parts[k].x[0], &parts[k].x[1],
                   &parts[k].x[2]) !=
            3)
          error("Failed to read position of part %i.", k);
        if (fscanf(file, "%f", &parts[k].mass) != 1)
          error("Failed to read mass of part %i.", k);
      }
      fclose(file);
    }
  }

  /* Init the cells. */
  root = cell_get(&s);
  root->loc[0] = 0.0;
  root->loc[1] = 0.0;
  root->loc[2] = 0.0;
  root->h = 1.0;
  root->count = N;
  root->parts = parts;
  root->res_parts = parts_res;
  cell_split(root, &s);

  /* Iterate over the cells and get the average number of particles
     per leaf. */
  struct cell *c = root;
  int nr_leaves = 0;
  while (c != NULL && (c->sibling > 0 || c->firstchild > 0)) {
    if (!c->split) {
      nr_leaves++;
      c = (struct cell*) qsched_getresdata( &s, c->sibling );
    } else {
      c = (struct cell*) qsched_getresdata( &s, c->firstchild );
    }
  }
  message("There are %i leaves.", nr_leaves);
  message("Average number of parts per leaf is %f.", ((double)N) / nr_leaves);

}
   qsched_sync_resources(&s);

#ifdef EXACT
    if(s.rank == 0)
    {
        interact_exact(N, parts);
    }

    MPI_Barrier(s.comm);
#endif

if(s.rank == 0)
{
    create_tasks(&s, root, NULL);
    /* Compute the loweest depth of a leaf. */
    int depth = 1;
    int leaf_depth = 0xFFFFFFF;
    struct cell *temp = NULL;
    long long int next = root->firstchild;
    while(next > 0)
    {
        temp = (struct cell*) qsched_getresdata(&s, next);
        if(temp->split)
        {
            depth += 1;
            next = temp->firstchild;
        }else
        {
            if(depth < leaf_depth)
            {
                leaf_depth = depth;
            }
            struct cell *temp2;
            temp2 = qsched_getresdata(&s, temp->sibling);
            if(temp2->h > temp->h)
                depth -= 1;
            next = temp->sibling;
        }
    }
    message("leaf_depth = %i", leaf_depth);
    message("tasks before = %i", s.count);
    create_pcs(&s, root, NULL, 0, leaf_depth-1);    
    message("tasks after = %i", s.count);
}
printf("s.count = %i\n", s.count);
//Mark all the schedulers dirty so they actually synchronize.
s.flags |= qsched_flag_dirty;
qsched_run_MPI(&s, nr_threads, runner);
  // Print off a hello world message
    printf("Hello world from processor %s, rank = %i, count_ranks = %i\n",
           processor_name, s.rank, s.count_ranks);

if(s.rank == 0)
{
    file = fopen("particle_dump.out", "w");
  /*fprintf(file,
          "ID m x y z a_exact.x   a_exact.y    a_exact.z    a_legacy.x    "
          "a_legacy.y    a_legacy.z    a_new.x     a_new.y    a_new.z\n");*/
//    fprintf(file, "");
    fclose(file);
}

//Output the parts:
for(i = 0; i < s.count_ranks; i++)
{
    //If it is our turn to output parts.
    if(s.rank == i)
    {
        output_parts(&s);
    }
    MPI_Barrier(s.comm);
}

//Need to clean up everything.
//    free(parts);
    //TODO Clean up the cell-resource data.
    qsched_free(&s);

    // Finalize the MPI environment.
    MPI_Finalize();
}


/**
 * @brief Main function.
 */

int main(int argc, char *argv[]) {

  int c, nr_threads;
  int N = 1000, runs = 1;
  char fileName[100] = {0};

  /* Die on FP-exceptions. */
  feenableexcept(FE_DIVBYZERO | FE_INVALID | FE_OVERFLOW);

/* Get the number of threads. */
/*#pragma omp parallel shared(nr_threads)
  {
    if (omp_get_thread_num() == 0) nr_threads = omp_get_num_threads();
  }*/
    nr_threads = 1;

  /* Parse the options */
  while ((c = getopt(argc, argv, "n:r:t:f:c:i:")) != -1) switch (c) {
      case 'n':
        if (sscanf(optarg, "%d", &N) != 1)
          error("Error parsing number of particles.");
        break;
      case 'r':
        if (sscanf(optarg, "%d", &runs) != 1)
          error("Error parsing number of runs.");
        break;
      case 't':
        if (sscanf(optarg, "%d", &nr_threads) != 1)
          error("Error parsing number of threads.");
        omp_set_num_threads(nr_threads);
        message("omp_get_max_threads() = %i\n", omp_get_max_threads());
        message("omp_get_num_procs() = %i\n", omp_get_num_procs());
        break;
      case 'f':
        if (sscanf(optarg, "%s", &fileName[0]) != 1)
          error("Error parsing file name.");
        break;
      case '?':
        fprintf(stderr, "Usage: %s [-t nr_threads] [-n N] [-r runs] [-f file] "
                        "[-c Nparts] [-i Niterations] \n",
                argv[0]);
        fprintf(stderr, "Solves the N-body problem using a Barnes-Hut\n"
                        "tree code with N random particles read from a file in "
                        "[0,1]^3 using"
                        "nr_threads threads.\n"
                        "A test of the neighbouring cells interaction with "
                        "Nparts per cell is also run Niterations times.\n");
        exit(EXIT_FAILURE);
    }

  /* Tree node information */
  printf("Size of cell: %zu bytes.\n", sizeof(struct cell));

  /* Part information */
  printf("Size of part: %zu bytes.\n", sizeof(struct part));

  /* Dump arguments. */
  if (fileName[0] == 0) {
    message("Computing the N-body problem over %i random particles using %i "
            "threads (%i runs).",
            N, nr_threads, runs);
  } else {
    message("Computing the N-body problem over %i particles read from '%s' "
            "using %i threads (%i runs).",
            N, fileName, nr_threads, runs);
  }

  /* Run the BH test. */
  test_bh(N, nr_threads, runs, fileName);

  return 0;
}