Skip to content
Snippets Groups Projects
proxy.c 12.68 KiB
/*******************************************************************************
 * This file is part of SWIFT.
 * Coypright (c) 2013 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 * 
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <math.h>
#include <float.h>
#include <limits.h>
#include <omp.h>
#include <sched.h>

/* MPI headers. */
#ifdef WITH_MPI
    #include <mpi.h>
#endif

/* Local headers. */
#include "const.h"
#include "cycle.h"
#include "atomic.h"
#include "timers.h"
#include "const.h"
#include "vector.h"
#include "lock.h"
#include "space.h"
#include "cell.h"
#include "task.h"
#include "part.h"
#include "debug.h"
#include "proxy.h"
#include "error.h"


/**
 * @brief Exchange cells with a remote node.
 *
 * @paam p The #proxy.
 */
 
void proxy_cells_exch1 ( struct proxy *p ) {

#ifdef WITH_MPI

    int k, ind;
    
    /* Get the number of pcells we will need to send. */
    p->size_pcells_out = 0;
    for ( k = 0 ; k < p->nr_cells_out ; k++ )
        p->size_pcells_out += p->cells_out[k]->pcell_size;
        
    /* Send the number of pcells. */
    if ( MPI_Isend( &p->size_pcells_out , 1 , MPI_INT , p->nodeID , p->mynodeID*proxy_tag_shift + proxy_tag_count , MPI_COMM_WORLD , &p->req_cells_count_out ) != MPI_SUCCESS )
        error( "Failed to isend nr of pcells." );
    MPI_Request_free( &p->req_cells_count_out );
    // message( "isent pcell count (%i) from node %i to node %i." , p->size_pcells_out , p->mynodeID , p->nodeID ); fflush(stdout);
    
    /* Allocate and fill the pcell buffer. */
    if ( p->pcells_out != NULL )
        free( p->pcells_out );
    if ( ( p->pcells_out = malloc( sizeof(struct pcell) * p->size_pcells_out ) ) == NULL )
        error( "Failed to allocate pcell_out buffer." );
    for ( ind = 0 , k = 0 ; k < p->nr_cells_out ; k++ ) {
        memcpy( &p->pcells_out[ind] , p->cells_out[k]->pcell , sizeof(struct pcell) * p->cells_out[k]->pcell_size );
        ind += p->cells_out[k]->pcell_size;
        }
    
    /* Send the pcell buffer. */
    if ( MPI_Isend( p->pcells_out , sizeof(struct pcell)*p->size_pcells_out , MPI_BYTE , p->nodeID , p->mynodeID*proxy_tag_shift + proxy_tag_cells , MPI_COMM_WORLD , &p->req_cells_out ) != MPI_SUCCESS )
        error( "Failed to pcell_out buffer." );
    MPI_Request_free( &p->req_cells_out );
    // message( "isent pcells (%i) from node %i to node %i." , p->size_pcells_out , p->mynodeID , p->nodeID ); fflush(stdout);

    /* Receive the number of pcells. */
    if ( MPI_Irecv( &p->size_pcells_in , 1 , MPI_INT , p->nodeID , p->nodeID*proxy_tag_shift + proxy_tag_count , MPI_COMM_WORLD , &p->req_cells_count_in ) != MPI_SUCCESS )
        error( "Failed to irecv nr of pcells." );
    // message( "irecv pcells count on node %i from node %i." , p->mynodeID , p->nodeID ); fflush(stdout);
    
#else
    error( "SWIFT was not compiled with MPI support." );
#endif

    }


void proxy_cells_exch2 ( struct proxy *p ) {

#ifdef WITH_MPI

    /* Re-allocate the pcell_in buffer. */
    if ( p->pcells_in != NULL )
        free( p->pcells_in );
    if ( ( p->pcells_in = (struct pcell *)malloc( sizeof(struct pcell) * p->size_pcells_in ) ) == NULL )
        error( "Failed to allocate pcell_in buffer." );
        
    /* Receive the particle buffers. */
    if ( MPI_Irecv( p->pcells_in , sizeof(struct pcell)*p->size_pcells_in , MPI_BYTE , p->nodeID , p->nodeID*proxy_tag_shift + proxy_tag_cells , MPI_COMM_WORLD , &p->req_cells_in ) != MPI_SUCCESS )
        error( "Failed to irecv part data." );
    // message( "irecv pcells (%i) on node %i from node %i." , p->size_pcells_in , p->mynodeID , p->nodeID ); fflush(stdout);

#else
    error( "SWIFT was not compiled with MPI support." );
#endif

    }


/**
 * @brief Add a cell to the given proxy's input list.
 *
 * @param p The #proxy.
 * @param c The #cell.
 */

void proxy_addcell_in ( struct proxy *p , struct cell *c ) {

    int k;
    struct cell **temp;
    
    /* Check if the cell is already registered with the proxy. */
    for ( k = 0 ; k < p->nr_cells_in ; k++ )
        if ( p->cells_in[k] == c )
            return;
            
    /* Do we need to grow the number of in cells? */
    if ( p->nr_cells_in == p->size_cells_in ) {
        p->size_cells_in *= proxy_buffgrow;
        if ( ( temp = malloc( sizeof(struct cell *) * p->size_cells_in ) ) == NULL )
            error( "Failed to allocate ingoing cell list." );
        memcpy( temp , p->cells_in , sizeof(struct cell *) * p->nr_cells_in );
        free( p->cells_in );
        p->cells_in = temp;
        }
        
    /* Add the cell. */
    p->cells_in[ p->nr_cells_in ] = c;
    p->nr_cells_in += 1;

    }


/**
 * @brief Add a cell to the given proxy's output list.
 *
 * @param p The #proxy.
 * @param c The #cell.
 */

void proxy_addcell_out ( struct proxy *p , struct cell *c ) {

    int k;
    struct cell **temp;
    
    /* Check if the cell is already registered with the proxy. */
    for ( k = 0 ; k < p->nr_cells_out ; k++ )
        if ( p->cells_out[k] == c )
            return;
            
    /* Do we need to grow the number of out cells? */
    if ( p->nr_cells_out == p->size_cells_out ) {
        p->size_cells_out *= proxy_buffgrow;
        if ( ( temp = malloc( sizeof(struct cell *) * p->size_cells_out ) ) == NULL )
            error( "Failed to allocate outgoing cell list." );
        memcpy( temp , p->cells_out , sizeof(struct cell *) * p->nr_cells_out );
        free( p->cells_out );
        p->cells_out = temp;
        }
        
    /* Add the cell. */
    p->cells_out[ p->nr_cells_out ] = c;
    p->nr_cells_out += 1;

    }


/**
 * @brief Exchange particles with a remote node.
 *
 * @paam p The #proxy.
 */
 
void proxy_parts_exch1 ( struct proxy *p ) {

#ifdef WITH_MPI

    /* Send the number of particles. */
    if ( MPI_Isend( &p->nr_parts_out , 1 , MPI_INT , p->nodeID , p->mynodeID*proxy_tag_shift + proxy_tag_count , MPI_COMM_WORLD , &p->req_parts_count_out ) != MPI_SUCCESS )
        error( "Failed to isend nr of parts." );
    MPI_Request_free( &p->req_parts_count_out );
    // message( "isent particle count (%i) from node %i to node %i." , p->nr_parts_out , p->mynodeID , p->nodeID ); fflush(stdout);
    
    /* Send the particle buffers. */
    if ( MPI_Isend( p->parts_out , sizeof(struct part)*p->nr_parts_out , MPI_BYTE , p->nodeID , p->mynodeID*proxy_tag_shift + proxy_tag_parts , MPI_COMM_WORLD , &p->req_parts_out ) != MPI_SUCCESS ||
         MPI_Isend( p->xparts_out , sizeof(struct xpart)*p->nr_parts_out , MPI_BYTE , p->nodeID , p->mynodeID*proxy_tag_shift + proxy_tag_xparts , MPI_COMM_WORLD , &p->req_xparts_out ) != MPI_SUCCESS )
        error( "Failed to isend part data." );
    MPI_Request_free( &p->req_parts_out );
    MPI_Request_free( &p->req_xparts_out );
    // message( "isent particle data (%i) to node %i." , p->nr_parts_out , p->nodeID ); fflush(stdout);
    /* for ( int k = 0 ; k < p->nr_parts_out ; k++ )
        message( "sending particle %lli, x=[%.3e %.3e %.3e], h=%.3e, to node %i." ,
            p->parts_out[k].id , p->parts_out[k].x[0] , p->parts_out[k].x[1] , p->parts_out[k].x[2] ,
            p->parts_out[k].h , p->nodeID ); */

    /* Receive the number of particles. */
    if ( MPI_Irecv( &p->nr_parts_in , 1 , MPI_INT , p->nodeID , p->nodeID*proxy_tag_shift + proxy_tag_count , MPI_COMM_WORLD , &p->req_parts_count_in ) != MPI_SUCCESS )
        error( "Failed to irecv nr of parts." );
    // message( "irecv particle count on node %i from node %i." , p->mynodeID , p->nodeID ); fflush(stdout);
    
#else
    error( "SWIFT was not compiled with MPI support." );
#endif

    }


void proxy_parts_exch2 ( struct proxy *p ) {

#ifdef WITH_MPI

    /* Is there enough space in the buffer? */
    if ( p->nr_parts_in > p->size_parts_in ) {
        do {
            p->size_parts_in *= proxy_buffgrow;
            } while ( p->nr_parts_in > p->size_parts_in );
        free( p->parts_in ); free( p->xparts_in );
        if ( ( p->parts_in = (struct part *)malloc( sizeof(struct part) * p->size_parts_in ) ) == NULL ||
             ( p->xparts_in = (struct xpart *)malloc( sizeof(struct xpart) * p->size_parts_in ) ) == NULL )
            error( "Failed to re-allocate parts_in buffers." );
        }
        
    /* Receive the particle buffers. */
    if ( MPI_Irecv( p->parts_in , sizeof(struct part)*p->nr_parts_in , MPI_BYTE , p->nodeID , p->nodeID*proxy_tag_shift + proxy_tag_parts , MPI_COMM_WORLD , &p->req_parts_in ) != MPI_SUCCESS ||
         MPI_Irecv( p->xparts_in , sizeof(struct xpart)*p->nr_parts_in , MPI_BYTE , p->nodeID , p->nodeID*proxy_tag_shift + proxy_tag_xparts , MPI_COMM_WORLD , &p->req_xparts_in ) != MPI_SUCCESS )
        error( "Failed to irecv part data." );
    // message( "irecv particle data (%i) from node %i." , p->nr_parts_in , p->nodeID ); fflush(stdout);

#else
    error( "SWIFT was not compiled with MPI support." );
#endif

    }


/**
 * @brief Load parts onto a proxy for exchange.
 *
 * @param p The #proxy.
 * @param parts Pointer to an array of #part to send.
 * @param xparts Pointer to an array of #xpart to send.
 * @param N The number of parts.
 */
 
void proxy_parts_load ( struct proxy *p , struct part *parts , struct xpart *xparts , int N ) {

    /* Is there enough space in the buffer? */
    if ( p->nr_parts_out + N > p->size_parts_out ) {
        do {
            p->size_parts_out *= proxy_buffgrow;
            } while ( p->nr_parts_out + N > p->size_parts_out );
        struct part *tp;
        struct xpart *txp;
        if ( ( tp = (struct part *)malloc( sizeof(struct part) * p->size_parts_out ) ) == NULL ||
             ( txp = (struct xpart *)malloc( sizeof(struct xpart) * p->size_parts_out ) ) == NULL )
            error( "Failed to re-allocate parts_out buffers." );
        memcpy( tp , p->parts_out , sizeof(struct part) * p->nr_parts_out );
        memcpy( txp , p->xparts_out , sizeof(struct part) * p->nr_parts_out );
        free( p->parts_out ); free( p->xparts_out );
        p->parts_out = tp; p->xparts_out = txp;
        }
        
    /* Copy the parts and xparts data to the buffer. */
    memcpy( &p->parts_out[ p->nr_parts_out ] , parts , sizeof(struct part) * N );
    memcpy( &p->xparts_out[ p->nr_parts_out ] , xparts , sizeof(struct xpart) * N );
    
    /* Increase the counters. */
    p->nr_parts_out += N;

    }


/**
 * @brief Initialize the given proxy.
 *
 * @param p The #proxy.
 * @param nodeID The node with which this proxy will communicate.
 */
 
void proxy_init ( struct proxy *p , int mynodeID , int nodeID ) {

    /* Set the nodeID. */
    p->mynodeID = mynodeID;
    p->nodeID = nodeID;
    
    /* Allocate the cell send and receive buffers, if needed. */
    if ( p->cells_in == NULL ) {
        p->size_cells_in = proxy_buffinit;
        if ( ( p->cells_in = (struct cell **)malloc( sizeof(void *) * p->size_cells_in ) ) == NULL )
            error( "Failed to allocate cells_in buffer." );
        }
    p->nr_cells_in = 0;
    if ( p->cells_out == NULL ) {
        p->size_cells_out = proxy_buffinit;
        if ( ( p->cells_out = (struct cell **)malloc( sizeof(void *) * p->size_cells_out ) ) == NULL )
            error( "Failed to allocate cells_out buffer." );
        }
    p->nr_cells_out = 0;

    /* Allocate the part send and receive buffers, if needed. */
    if ( p->parts_in == NULL ) {
        p->size_parts_in = proxy_buffinit;
        if ( ( p->parts_in = (struct part *)malloc( sizeof(struct part) * p->size_parts_in ) ) == NULL ||
             ( p->xparts_in = (struct xpart *)malloc( sizeof(struct xpart) * p->size_parts_in ) ) == NULL )
            error( "Failed to allocate parts_in buffers." );
        }
    p->nr_parts_in = 0;
    if ( p->parts_out == NULL ) {
        p->size_parts_out = proxy_buffinit;
        if ( ( p->parts_out = (struct part *)malloc( sizeof(struct part) * p->size_parts_out ) ) == NULL ||
             ( p->xparts_out = (struct xpart *)malloc( sizeof(struct xpart) * p->size_parts_out ) ) == NULL )
            error( "Failed to allocate parts_out buffers." );
        }
    p->nr_parts_out = 0;

    }