Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • CubeTest
  • GPU_swift
  • TangoSIDM
  • active_h_max_optimization
  • arm_vec
  • c11
  • c11_atomics_copy
  • comm_tasks_are_special
  • cuda_test
  • domain_zoom_nometis
  • drift_flag_debug_check
  • driven_turbulence
  • engineering
  • evrard_disc
  • expand_fof
  • fix_sink_timestep
  • fixed_hSIDM
  • fof_snapshots
  • gear_metal_diffusion
  • generic_cache
  • genetic_partitioning2
  • gizmo
  • gizmo_entropy_switch
  • gizmo_mfv_entropy
  • hashmap_mesh
  • isotropic_feedback
  • ivanova-testing
  • jsw/6dfof
  • kahip
  • lean_gparts
  • load-balance-testing
  • locked_hydro
  • logger_read_history
  • logger_read_history2
  • logger_write_hdf5
  • mass_dependent_h_max
  • master
  • mpi-one-thread
  • mpi-packed-parts
  • mpi-send-subparts
  • mpi-send-subparts-vector
  • mpi-subparts-vector-grav
  • mpi-testsome
  • mpi-threads
  • mpi_force_checks
  • numa_awareness
  • onesided-mpi-rdma
  • onesided-mpi-recv-cache
  • onesided-mpi-recv-window
  • onesided-mpi-single-recv-window
  • origin-master
  • parallel_exchange_cells
  • paranoid
  • phantom
  • planetary
  • planetary_boundary
  • queue-timers
  • queue-timers-clean
  • rdma-only
  • rdma-only-multiple-sends
  • rdma-only-subcopies
  • rdma-only-subparts
  • rdma-only-subparts-update
  • rdma-only-subparts-update-flamingo
  • rdma-only-subparts-update-flamingo-cellids
  • rdma-only-subparts-update-keep
  • rdma-only-subparts-update-keep-update
  • rdma-only-subsends
  • reweight-fitted-costs
  • reweight-scaled-costs
  • rgb-engineering
  • rt-gas-interactions
  • rt-ghost2-and-thermochemistry
  • scheduler_determinism
  • search-window-tests
  • signal-handler-dump
  • simba-stellar-feedback
  • sink_formation2
  • sink_merger
  • sink_merger2
  • skeleton
  • smarter_sends
  • snipes_data
  • spiral_potential
  • subgrid_SF_threshold
  • subsends
  • swift-rdma
  • swift_zoom_support
  • sync-send
  • thread-dump-extract-waiters
  • threadpool_rmapper
  • traphic
  • variable_hSIDM
  • whe-nu-bg-cosmo
  • when_to_proxy
  • yb-bhdev
  • yb-sndev
  • yb-sndev-dev
  • yb-varsndt-isotropic
  • yb-vi-gastrack
  • v0.0
  • v0.1
  • v0.1.0-pre
  • v0.2.0
  • v0.3.0
  • v0.4.0
  • v0.5.0
  • v0.6.0
  • v0.7.0
  • v0.8.0
  • v0.8.1
  • v0.8.2
  • v0.8.3
  • v0.8.4
  • v0.8.5
  • v0.9.0
116 results

Target

Select target project
  • dc-oman1/swiftsim
  • swift/swiftsim
  • pdraper/swiftsim
  • tkchan/swiftsim
  • dc-turn5/swiftsim
5 results
Select Git revision
  • 840-unit-test-testtimeline-fails
  • 875-wendland-c6-missing-neighbour-contributions
  • 887-code-does-not-compile-with-parmetis-installed-locally-but-without-metis
  • CubeTest
  • FS_Del
  • GEARRT_Iliev1
  • GEARRT_Iliev3
  • GEARRT_Iliev4
  • GEARRT_Iliev5
  • GEARRT_Iliev5-fixed-nr-subcycles
  • GEARRT_Iliev7
  • GEARRT_Iliev_static
  • GEARRT_Ivanova
  • GEARRT_fixed_nr_subcycles
  • GEARRT_injection_tests_Iliev0
  • GPU_swift
  • GrackleCoolingUpdates2
  • Lambda-T-table
  • MAGMA2
  • MAGMA2_matthieu
  • MHD_FS
  • MHD_FS_TESTs
  • MHD_FS_VP_AdvectGauge
  • MHD_Orestis
  • MHD_canvas
  • MHD_canvas_RF_128
  • MHD_canvas_RF_growth_rate
  • MHD_canvas_RobertsFlow
  • MHD_canvas_SPH_errors
  • MHD_canvas_matthieu
  • MHD_canvas_nickishch
  • MHD_canvas_nickishch_Lorentz_force_test
  • MHD_canvas_nickishch_track_everything
  • MHD_canvas_sid
  • OAK/CPAW_updates
  • OAK/LoopAdvectionTest
  • OAK/adaptive_divv
  • OAK/kinetic_dedner
  • REMIX_cosmo
  • RT_dualc
  • RT_recombination_radiation
  • RT_test_mladen
  • SIDM
  • SIDM_wKDSDK
  • SNdust
  • SPHM1RT_CosmologicalStromgrenSphere
  • SPHM1RT_bincheck
  • SPHM1RT_smoothedRT
  • TangoSIDM
  • TestPropagation3D
  • Test_fixedhProb
  • activate_fewer_comms
  • active_h_max_optimization
  • adaptive_softening_Lieuwe
  • add_2p5D
  • add_black_holes_checks
  • adding_sidm_to_master
  • agn_crksph
  • agn_crksph_subtask_speedup
  • amd-optimization
  • arm_vec
  • automatic_tasks
  • better_ray_RNG
  • black_holes_accreted_angular_momenta_from_gas
  • burkert-potential
  • c11
  • c11_atomics_copy
  • cancel_all_sorts
  • cell_exchange_improvements
  • cell_types
  • cherry-pick-cd1c39e0
  • comm_tasks_are_special
  • conduction_velocities
  • cpp-fixes
  • cuda_test
  • darwin/adaptive_softening
  • darwin/gear_chemistry_fluxes
  • darwin/gear_mechanical_feedback
  • darwin/gear_preSN_feedback
  • darwin/gear_radiation
  • darwin/simulations
  • darwin/sink_formation_proba
  • darwin/sink_mpi
  • darwin/sink_mpi_physics
  • dead-time-stats
  • derijcke_cooling
  • dev_cms
  • do-not-activate-empty-star-pairs
  • domain_zoom_nometis
  • drift_flag_debug_check
  • driven_turbulence
  • driven_turbulence_forcings
  • engineering
  • eos_updates
  • evrard_disc
  • expand_fof_2022
  • explict_bkg_cdim
  • fewer_gpart_comms
  • fewer_star_comms
  • fewer_timestep_comms_no_empty_pairs
  • v0.0
  • v0.1
  • v0.1.0-pre
  • v0.2.0
  • v0.3.0
  • v0.4.0
  • v0.5.0
  • v0.6.0
  • v0.7.0
  • v0.8.0
  • v0.8.1
  • v0.8.2
  • v0.8.3
  • v0.8.4
  • v0.8.5
  • v0.9.0
  • v1.0.0
  • v2025.01
  • v2025.04
119 results
Show changes
Commits on Source (24)
......@@ -61,7 +61,7 @@ include_HEADERS += velociraptor_struct.h velociraptor_io.h random.h memuse.h mpi
include_HEADERS += black_holes.h black_holes_io.h black_holes_properties.h black_holes_struct.h
include_HEADERS += feedback.h feedback_struct.h feedback_properties.h
include_HEADERS += space_unique_id.h line_of_sight.h io_compression.h
include_HEADERS += logger_history.h
include_HEADERS += logger_history.h mpicache.h
# source files for EAGLE cooling
QLA_COOLING_SOURCES =
......@@ -126,7 +126,7 @@ AM_SOURCES += gravity_properties.c gravity.c multipole.c
AM_SOURCES += collectgroup.c hydro_space.c equation_of_state.c io_compression.c
AM_SOURCES += chemistry.c cosmology.c mesh_gravity.c velociraptor_interface.c
AM_SOURCES += output_list.c velociraptor_dummy.c logger_io.c memuse.c mpiuse.c memuse_rnodes.c fof.c
AM_SOURCES += hashmap.c pressure_floor.c logger_history.c
AM_SOURCES += hashmap.c pressure_floor.c logger_history.c mpicache.c
AM_SOURCES += $(QLA_COOLING_SOURCES)
AM_SOURCES += $(EAGLE_COOLING_SOURCES) $(EAGLE_FEEDBACK_SOURCES)
AM_SOURCES += $(GRACKLE_COOLING_SOURCES) $(GEAR_FEEDBACK_SOURCES)
......
......@@ -28,6 +28,7 @@
/* Standard includes */
#include <stdint.h>
#include <unistd.h>
#define atomic_add(v, i) __sync_fetch_and_add(v, i)
#define atomic_sub(v, i) __sync_fetch_and_sub(v, i)
......@@ -189,6 +190,27 @@ __attribute__((always_inline)) INLINE static void atomic_max_ll(
} while (test_val != old_val);
}
/**
* @brief Atomic max operation on size_t.
*
* This is a text-book implementation based on an atomic CAS.
*
* @param address The address to update.
* @param y The value to update the address with.
*/
__attribute__((always_inline)) INLINE static void atomic_max_st(
volatile size_t *const address, const size_t y) {
size_t test_val, old_val, new_val;
old_val = *address;
do {
test_val = old_val;
new_val = max(old_val, y);
old_val = atomic_cas(address, test_val, new_val);
} while (test_val != old_val);
}
/**
* @brief Atomic max operation on floats.
*
......
......@@ -95,9 +95,11 @@ void engine_addtasks_send_gravity(struct engine *e, struct cell *ci,
t_grav = scheduler_addtask(s, task_type_send, task_subtype_gpart,
ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_grav);
t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend_gpart,
ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_ti);
/* The sends should unlock the down pass. */
scheduler_addunlock(s, t_grav, ci->grav.super->grav.down);
......@@ -169,20 +171,25 @@ void engine_addtasks_send_hydro(struct engine *e, struct cell *ci,
t_xv = scheduler_addtask(s, task_type_send, task_subtype_xv, ci->mpi.tag,
0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_xv);
t_rho = scheduler_addtask(s, task_type_send, task_subtype_rho,
ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_rho);
#ifdef EXTRA_HYDRO_LOOP
t_gradient = scheduler_addtask(s, task_type_send, task_subtype_gradient,
ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_gradient);
#endif
t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend_part,
ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_ti);
if (with_limiter) {
t_limiter = scheduler_addtask(s, task_type_send, task_subtype_limiter,
ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_limiter);
}
#ifdef EXTRA_HYDRO_LOOP
......@@ -280,6 +287,7 @@ void engine_addtasks_send_stars(struct engine *e, struct cell *ci,
#endif
t_sf_counts = scheduler_addtask(s, task_type_send, task_subtype_sf_counts,
ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_sf_counts);
scheduler_addunlock(s, ci->hydro.star_formation, t_sf_counts);
}
......@@ -300,9 +308,11 @@ void engine_addtasks_send_stars(struct engine *e, struct cell *ci,
/* Create the tasks and their dependencies? */
t_feedback = scheduler_addtask(s, task_type_send, task_subtype_spart,
ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_feedback);
t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend_spart,
ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_ti);
/* The send_stars task should unlock the super_cell's kick task. */
scheduler_addunlock(s, t_feedback, ci->hydro.super->stars.stars_out);
......@@ -381,19 +391,24 @@ void engine_addtasks_send_black_holes(struct engine *e, struct cell *ci,
/* Create the tasks and their dependencies? */
t_rho = scheduler_addtask(s, task_type_send, task_subtype_bpart_rho,
ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_rho);
t_bh_merger = scheduler_addtask(
s, task_type_send, task_subtype_bpart_merger, ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_bh_merger);
t_gas_swallow = scheduler_addtask(
s, task_type_send, task_subtype_part_swallow, ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_gas_swallow);
t_feedback =
scheduler_addtask(s, task_type_send, task_subtype_bpart_feedback,
ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_feedback);
t_ti = scheduler_addtask(s, task_type_send, task_subtype_tend_bpart,
ci->mpi.tag, 0, ci, cj);
scheduler_cache_mpitask(s->send_mpicache, cj->nodeID, t_ti);
/* The send_black_holes task should unlock the super_cell's BH exit point
* task. */
......@@ -478,19 +493,24 @@ void engine_addtasks_recv_hydro(struct engine *e, struct cell *c,
/* Create the tasks. */
t_xv = scheduler_addtask(s, task_type_recv, task_subtype_xv, c->mpi.tag, 0,
c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_xv);
t_rho = scheduler_addtask(s, task_type_recv, task_subtype_rho, c->mpi.tag,
0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_rho);
#ifdef EXTRA_HYDRO_LOOP
t_gradient = scheduler_addtask(s, task_type_recv, task_subtype_gradient,
c->mpi.tag, 0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_gradient);
#endif
t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend_part,
c->mpi.tag, 0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_ti);
if (with_limiter) {
t_limiter = scheduler_addtask(s, task_type_recv, task_subtype_limiter,
c->mpi.tag, 0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_limiter);
}
}
......@@ -592,6 +612,7 @@ void engine_addtasks_recv_stars(struct engine *e, struct cell *c,
#endif
t_sf_counts = scheduler_addtask(s, task_type_recv, task_subtype_sf_counts,
c->mpi.tag, 0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_sf_counts);
}
/* Have we reached a level where there are any stars tasks ? */
......@@ -605,9 +626,11 @@ void engine_addtasks_recv_stars(struct engine *e, struct cell *c,
/* Create the tasks. */
t_feedback = scheduler_addtask(s, task_type_recv, task_subtype_spart,
c->mpi.tag, 0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_feedback);
t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend_spart,
c->mpi.tag, 0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_ti);
if (with_star_formation && c->hydro.count > 0) {
......@@ -687,18 +710,23 @@ void engine_addtasks_recv_black_holes(struct engine *e, struct cell *c,
/* Create the tasks. */
t_rho = scheduler_addtask(s, task_type_recv, task_subtype_bpart_rho,
c->mpi.tag, 0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_rho);
t_bh_merger = scheduler_addtask(
s, task_type_recv, task_subtype_bpart_merger, c->mpi.tag, 0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_bh_merger);
t_gas_swallow = scheduler_addtask(
s, task_type_recv, task_subtype_part_swallow, c->mpi.tag, 0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_gas_swallow);
t_feedback = scheduler_addtask(
s, task_type_recv, task_subtype_bpart_feedback, c->mpi.tag, 0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_feedback);
t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend_bpart,
c->mpi.tag, 0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_ti);
}
if (t_rho != NULL) {
......@@ -780,9 +808,11 @@ void engine_addtasks_recv_gravity(struct engine *e, struct cell *c,
/* Create the tasks. */
t_grav = scheduler_addtask(s, task_type_recv, task_subtype_gpart,
c->mpi.tag, 0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_grav);
t_ti = scheduler_addtask(s, task_type_recv, task_subtype_tend_gpart,
c->mpi.tag, 0, c, NULL);
scheduler_cache_mpitask(s->recv_mpicache, c->nodeID, t_ti);
}
/* If we have tasks, link them. */
......@@ -3512,6 +3542,11 @@ void engine_maketasks(struct engine *e) {
/* Re-set the scheduler. */
scheduler_reset(sched, engine_estimate_nr_tasks(e));
#ifdef WITH_MPI
/* Initialise for one-sided MPI. */
scheduler_osmpi_init(sched);
#endif
ticks tic2 = getticks();
/* Construct the first hydro loop over neighbours */
......@@ -3771,6 +3806,11 @@ void engine_maketasks(struct engine *e) {
/* Set the tasks age. */
e->tasks_age = 0;
#ifdef WITH_MPI
/* Initialise windows and offsets for one-sided MPI. */
scheduler_osmpi_init_buffers(sched);
#endif
if (e->verbose)
message("took %.3f %s (including reweight).",
clocks_from_ticks(getticks() - tic), clocks_getunit());
......
......@@ -94,7 +94,7 @@ extern int engine_rank;
int len = 1024; \
char buf[len]; \
MPI_Error_string(res, buf, &len); \
fprintf(stderr, "%s\n\n", buf); \
fprintf(stderr, "[%04i] %s\n\n", engine_rank, buf); \
memdump(engine_rank); \
MPI_Abort(MPI_COMM_WORLD, -1); \
})
......
/* This file is part of SWIFT.
* Copyright (c) 2020 Peter W. Draper (p.w.draper@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/**
* @file mpicache.c
* @brief file of routines to cache MPI task messages
*/
/* Config parameters. */
#include "../config.h"
/* Standard includes. */
#include <limits.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
/* Local defines. */
#include "mpicache.h"
#include "scheduler.h"
/* Local includes. */
#include "atomic.h"
#include "clocks.h"
#include "error.h"
#include "memuse_rnodes.h"
#include "task.h"
/* Index of 2D array. */
#define INDEX2(nx, x, y) (nx * y + x)
/* Size of the cache entry buffer increment. */
#define CACHE_INC_SIZE 5000
/* Bit shift to accomodate all the bits of the maximum subtype. */
int mpicache_subtype_shift = 0;
/* Comparator function to sort by node, subtype, tag order. */
#ifdef WITH_MPI
static int mpicache_entry_cmp(const void *a, const void *b) {
const struct mpicache_entry *e1 = (const struct mpicache_entry *)a;
const struct mpicache_entry *e2 = (const struct mpicache_entry *)b;
int comp = e1->node - e2->node;
if (comp < 0) return -1;
if (comp > 0) return 1;
/* Same node, check subtype. */
comp = e1->task->subtype - e2->task->subtype;
if (comp < 0) return -1;
if (comp > 0) return 1;
/* Same subtype, check tag. */
return e1->task->flags - e2->task->flags;
}
#endif
/**
* @brief reallocate the cache entries to make more space available.
*/
static void mpicache_reallocate(struct mpicache *cache, int ind) {
if (ind == 0) {
/* Need to perform initialization. */
if ((cache->entries = (struct mpicache_entry *)malloc(
CACHE_INC_SIZE * sizeof(struct mpicache_entry))) == NULL) {
error("Failed to allocate MPI cache.");
}
/* Last action, releases threads. */
cache->entries_size = CACHE_INC_SIZE;
} else {
struct mpicache_entry *new_entries;
if ((new_entries = (struct mpicache_entry *)malloc(
sizeof(struct mpicache_entry) *
(cache->entries_size + CACHE_INC_SIZE))) == NULL) {
error("Failed to re-allocate MPI cache.");
}
/* Wait for all writes to the old buffer to complete. */
while (cache->entries_done < cache->entries_size)
;
/* Copy to new buffer. */
memcpy(new_entries, cache->entries,
sizeof(struct mpicache_entry) * cache->entries_size);
free(cache->entries);
cache->entries = new_entries;
/* Last action, releases waiting threads. */
atomic_add(&cache->entries_size, CACHE_INC_SIZE);
}
}
/**
* @brief Initialize an mpicache for use.
*
* @return the new mpicache
*/
struct mpicache *mpicache_init() {
/* Initialise the bitshifts needed to create the compact key. */
if (mpicache_subtype_shift == 0) {
mpicache_subtype_shift =
(sizeof(int) * CHAR_BIT) - __builtin_clz(task_subtype_count);
}
/* Create an initial cache. */
struct mpicache *cache =
(struct mpicache *)calloc(1, sizeof(struct mpicache));
return cache;
}
/**
* @brief Add a new MPI task to the cache.
*
* @param cache the #mpicache
* @param send_node the MPI rank that the message originates from.
* @param t the #task struct.
*/
void mpicache_add(struct mpicache *cache, int node, struct task *t) {
/* Append this to the cache. */
size_t ind = atomic_inc(&(cache->nr_entries));
if (ind == cache->entries_size) mpicache_reallocate(cache, ind);
/* Wait if needed while the reallocation occurs. */
while (ind > cache->entries_size)
;
/* Derive the size in bytes, may not be set yet. */
#ifdef WITH_MPI
t->size = scheduler_mpi_size(t);
#endif
/* And store. */
cache->entries[ind].node = node;
cache->entries[ind].task = t;
atomic_inc(&(cache->entries_done));
return;
}
/**
* @brief Free any data associated with the cache.
*
* @param cache the #mpicache
*/
void mpicache_destroy(struct mpicache *cache) {
free(cache->window_node_offsets);
free(cache->entries);
free(cache);
}
/**
* @brief Apply the cache.
*
* The expected results are the updating of the MPI tasks so that their
* offsets fields match the MPI window used and the expected sizes of the
* windows are updated in the cache.
*
* @param cache the #mpicache.
* @param nr_ranks number of MPI ranks.
*/
void mpicache_apply(struct mpicache *cache, int nr_ranks, const char *prefix) {
#ifdef WITH_MPI
/* Do nothing if we have nothing. */
if (cache->nr_entries == 0) {
return;
}
/* First job is to sort the entries to gives us the entries in
* node|subtask|tag order. Within each node section the subtask|tag should
* be the same on the send and recv sides, that is necessary so that the
* offsets match. */
qsort(cache->entries, cache->nr_entries, sizeof(struct mpicache_entry),
mpicache_entry_cmp);
/* Lists of offsets for each node in the subtype window. Keep this square
* for convenience. */
cache->window_node_offsets =
(size_t *)malloc(task_subtype_count * nr_ranks * sizeof(size_t));
for (int k = 0; k < task_subtype_count * nr_ranks; k++)
cache->window_node_offsets[k] = LLONG_MAX;
size_t node_offset[task_subtype_count] = {0};
size_t task_offset[task_subtype_count] = {0};
for (size_t k = 0; k < cache->nr_entries; k++) {
struct task *task = cache->entries[k].task;
int node = cache->entries[k].node;
int subtype = task->subtype;
if (cache->window_node_offsets[INDEX2(task_subtype_count, subtype, node)] ==
LLONG_MAX) {
/* Offset for this node and subtype not seen, so this is the first. */
node_offset[subtype] += task_offset[subtype];
cache->window_node_offsets[INDEX2(task_subtype_count, subtype, node)] =
node_offset[subtype];
// message("%s node %d offset for subtype %d = %zu", prefix, node,
// subtype, node_offset[subtype]);
/* And now first in this task subtype once more. */
task_offset[subtype] = 0;
}
/* Window sizes are in bytes. */
cache->window_sizes[subtype] +=
task->size + scheduler_osmpi_tobytes(scheduler_osmpi_header_size);
/* Offsets are in blocks. */
task->offset = task_offset[subtype];
task_offset[subtype] +=
scheduler_osmpi_toblocks(task->size) + scheduler_osmpi_header_size;
// message("%s %d applied task %d subtype %d at %zd tag %lld node %d "
// "size %zu blocks %zu node offsets %zu %zu",
// prefix, task->ci->cellID, task->type, subtype, task->offset,
// task->flags, node, task->size,
// scheduler_osmpi_toblocks(task->size) +
// scheduler_osmpi_header_size,
// cache->window_node_offsets[INDEX2(task_subtype_count, subtype,
// node)], cache->window_node_offsets[INDEX2(task_subtype_count,
// subtype, node)]+task->offset);
}
#endif /* WITH_MPI */
}
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2020 Peter W. Draper (p.w.draper@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
#ifndef SWIFT_MPICACHE_H
#define SWIFT_MPICACHE_H
/* Config parameters. */
#include "../config.h"
/* Local includes. */
#include "cycle.h"
#include "task.h"
/* Includes. */
#include <stdlib.h>
/* A single cache entry. */
struct mpicache_entry {
int node;
struct task *task;
};
/* Shift needed for subtype bits. */
extern int mpicache_subtype_shift;
/* A cache. */
struct mpicache {
volatile size_t entries_size;
volatile size_t nr_entries;
volatile size_t entries_done;
struct mpicache_entry *volatile entries;
size_t window_sizes[task_subtype_count];
size_t *window_node_offsets;
};
/* API. */
struct mpicache *mpicache_init(void);
void mpicache_add(struct mpicache *cache, int node, struct task *t);
void mpicache_destroy(struct mpicache *cache);
void mpicache_apply(struct mpicache *cache, int nr_ranks, const char *prefix);
#endif /* SWIFT_MPICACHE_H */
......@@ -327,10 +327,12 @@ void mpiuse_log_dump(const char *filename, ticks stepticks) {
/* Now check any still active logs, these are errors all should match. */
if (mpiuse_current != 0) {
message("Some MPI requests have not been completed");
message("Some MPI requests have not been completed, %zd bytes in flight",
mpiuse_current);
for (size_t k = 0; k < log_count; k++) {
if (mpiuse_log[k].active)
message("%s/%s: %d->%d: %zd/%d)", taskID_names[mpiuse_log[k].type],
message("%s/%s: %d->%d: size: %zd tag: %d",
taskID_names[mpiuse_log[k].type],
subtaskID_names[mpiuse_log[k].subtype], engine_rank,
mpiuse_log[k].otherrank, mpiuse_log[k].size, mpiuse_log[k].tag);
}
......
......@@ -156,17 +156,6 @@ enum lossy_compression_schemes output_options_get_field_compression(
output_options->select_output, field, compression_level,
lossy_compression_schemes_names[compression_level_current_default]);
#ifdef SWIFT_DEBUG_CHECKS
int should_write =
strcmp(lossy_compression_schemes_names[compression_do_not_write],
compression_level);
message(
"Check for whether %s should be written returned %s from a provided "
"value of \"%s\"",
field, should_write ? "True" : "False", compression_level);
#endif
return compression_scheme_from_name(compression_level);
}
......@@ -212,21 +201,6 @@ enum lossy_compression_schemes output_options_get_ptype_default_compression(
"compression must be set on a field-by-field basis.",
snapshot_type, part_type);
#ifdef SWIFT_DEBUG_CHECKS
/* Check whether we could translate the level string to a known entry. */
if (level_index >= compression_level_count)
error(
"Could not resolve compression level \"%s\" as default compression "
"level of particle type %s in snapshot type %s.",
compression_level, part_type_names[part_type], snapshot_type);
message(
"Determined default compression level of %s in snapshot type %s "
"as \"%s\", corresponding to level code %d",
part_type_names[part_type], snapshot_type, compression_level,
level_index);
#endif
return (enum lossy_compression_schemes)level_index;
}
......@@ -245,25 +219,6 @@ int output_options_get_num_fields_to_write(
int selection_id =
parser_get_section_id(output_options->select_output, selection_name);
#ifdef SWIFT_DEBUG_CHECKS
/* The only situation where we might legitimately not find the selection
* name is if it is the default. Everything else means trouble. */
if (strcmp(selection_name, select_output_header_default_name) &&
selection_id < 0)
error(
"Output selection '%s' could not be located in output_options "
"structure. Please investigate.",
selection_name);
/* While we're at it, make sure the selection ID is not impossibly high */
if (selection_id >= output_options->select_output->sectionCount)
error(
"Output selection '%s' was apparently located in index %d of the "
"output_options structure, but this only has %d sections.",
selection_name, selection_id,
output_options->select_output->sectionCount);
#endif
/* Special treatment for absent `Default` section */
if (selection_id < 0)
selection_id = output_options->select_output->sectionCount;
......
......@@ -225,8 +225,8 @@ void queue_init(struct queue *q, struct task *tasks) {
* @param prev The previous #task extracted from this #queue.
* @param blocking Block until access to the queue is granted.
*/
struct task *queue_gettask(struct queue *q, const struct task *prev,
int blocking) {
struct task *queue_gettask(struct scheduler *s, struct queue *q,
const struct task *prev, int blocking) {
swift_lock_type *qlock = &q->lock;
struct task *res = NULL;
......@@ -257,7 +257,7 @@ struct task *queue_gettask(struct queue *q, const struct task *prev,
for (ind = 0; ind < old_qcount; ind++) {
/* Try to lock the next task. */
if (task_lock(&qtasks[entries[ind].tid])) break;
if (task_lock(s, &qtasks[entries[ind].tid])) break;
/* Should we de-prioritize this task? */
if ((1ULL << qtasks[entries[ind].tid].type) &
......@@ -330,9 +330,14 @@ void queue_dump(int nodeID, int index, FILE *file, struct queue *q) {
/* Loop over the queue entries. */
for (int k = 0; k < q->count; k++) {
struct task *t = &q->tasks[q->entries[k].tid];
#ifdef WITH_MPI
fprintf(file, "%d %d %d %s %s %.2f %lld %zd\n", nodeID, index, k,
taskID_names[t->type], subtaskID_names[t->subtype], t->weight,
t->flags, t->size);
#else
fprintf(file, "%d %d %d %s %s %.2f\n", nodeID, index, k,
taskID_names[t->type], subtaskID_names[t->subtype], t->weight);
#endif
}
/* Release the task lock. */
......
......@@ -78,8 +78,8 @@ struct queue {
} __attribute__((aligned(queue_struct_align)));
/* Function prototypes. */
struct task *queue_gettask(struct queue *q, const struct task *prev,
int blocking);
struct task *queue_gettask(struct scheduler *s, struct queue *q,
const struct task *prev, int blocking);
void queue_init(struct queue *q, struct task *tasks);
void queue_insert(struct queue *q, struct task *t);
void queue_clean(struct queue *q);
......
......@@ -163,7 +163,6 @@ void *runner_main(void *data) {
#ifdef SWIFT_DEBUG_TASKS
/* Mark the thread we run on */
t->rid = r->cpuid;
/* And recover the pair direction */
if (t->type == task_type_pair || t->type == task_type_sub_pair) {
struct cell *ci_temp = ci;
......
......@@ -57,6 +57,10 @@
#include "timers.h"
#include "version.h"
/* Index into 2 and 3D arrays. */
#define INDEX3(nx, ny, x, y, z) (nx * ny * z + nx * y + x)
#define INDEX2(nx, x, y) (nx * y + x)
/**
* @brief Re-set the list of active tasks.
*/
......@@ -1433,7 +1437,7 @@ void scheduler_ranktasks(struct scheduler *s) {
}
/**
* @brief (Re)allocate the task arrays.
* @brief (Re)allocate the task arrays and reset one-sided MPI.
*
* @param s The #scheduler.
* @param size The maximum number of tasks in the #scheduler.
......@@ -1884,6 +1888,8 @@ void scheduler_start(struct scheduler *s) {
scheduler_enqueue_mapper(s->tid_active, s->active_count, s);
}
// scheduler_dump_queues(s->space->e);
/* Clear the list of active tasks. */
s->active_count = 0;
......@@ -1920,9 +1926,6 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
/* Otherwise, look for a suitable queue. */
else {
#ifdef WITH_MPI
int err = MPI_SUCCESS;
#endif
/* Find the previous owner for each task type, and do
any pre-processing needed. */
......@@ -1961,103 +1964,46 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
case task_type_recv:
#ifdef WITH_MPI
{
size_t size = 0; /* Size in bytes. */
size_t count = 0; /* Number of elements to receive */
MPI_Datatype type = MPI_BYTE; /* Type of the elements */
void *buff = NULL; /* Buffer to accept elements */
if (t->subtype == task_subtype_tend_part) {
count = size =
t->ci->mpi.pcell_size * sizeof(struct pcell_step_hydro);
buff = t->buff = malloc(count);
} else if (t->subtype == task_subtype_tend_gpart) {
count = size = t->ci->mpi.pcell_size * sizeof(struct pcell_step_grav);
buff = t->buff = malloc(count);
} else if (t->subtype == task_subtype_tend_spart) {
count = size =
t->ci->mpi.pcell_size * sizeof(struct pcell_step_stars);
buff = t->buff = malloc(count);
} else if (t->subtype == task_subtype_tend_bpart) {
count = size =
t->ci->mpi.pcell_size * sizeof(struct pcell_step_black_holes);
buff = t->buff = malloc(count);
} else if (t->subtype == task_subtype_part_swallow) {
count = size =
t->ci->hydro.count * sizeof(struct black_holes_part_data);
buff = t->buff = malloc(count);
} else if (t->subtype == task_subtype_bpart_merger) {
count = size =
sizeof(struct black_holes_bpart_data) * t->ci->black_holes.count;
buff = t->buff = malloc(count);
/* Size of message in bytes. */
t->size = scheduler_mpi_size(t);
/* Allocate memory for tasks that receive temporary data for
* unpacking. */
if (t->subtype == task_subtype_tend_part ||
t->subtype == task_subtype_tend_gpart ||
t->subtype == task_subtype_tend_spart ||
t->subtype == task_subtype_tend_bpart ||
t->subtype == task_subtype_part_swallow ||
t->subtype == task_subtype_bpart_merger ||
t->subtype == task_subtype_sf_counts ||
t->subtype == task_subtype_multipole) {
t->buff = malloc(t->size);
} else if (t->subtype == task_subtype_xv ||
t->subtype == task_subtype_rho ||
t->subtype == task_subtype_gradient ||
t->subtype == task_subtype_limiter) {
count = t->ci->hydro.count;
size = count * sizeof(struct part);
type = part_mpi_type;
buff = t->ci->hydro.parts;
t->buff = t->ci->hydro.parts;
} else if (t->subtype == task_subtype_gpart) {
count = t->ci->grav.count;
size = count * sizeof(struct gpart);
type = gpart_mpi_type;
buff = t->ci->grav.parts;
t->buff = t->ci->grav.parts;
} else if (t->subtype == task_subtype_spart) {
count = t->ci->stars.count;
size = count * sizeof(struct spart);
type = spart_mpi_type;
buff = t->ci->stars.parts;
t->buff = t->ci->stars.parts;
} else if (t->subtype == task_subtype_bpart_rho ||
t->subtype == task_subtype_bpart_swallow ||
t->subtype == task_subtype_bpart_feedback) {
count = t->ci->black_holes.count;
size = count * sizeof(struct bpart);
type = bpart_mpi_type;
buff = t->ci->black_holes.parts;
} else if (t->subtype == task_subtype_multipole) {
count = t->ci->mpi.pcell_size;
size = count * sizeof(struct gravity_tensors);
type = multipole_mpi_type;
buff = t->buff = malloc(size);
} else if (t->subtype == task_subtype_sf_counts) {
count = size = t->ci->mpi.pcell_size * sizeof(struct pcell_sf);
buff = t->buff = malloc(count);
t->buff = t->ci->black_holes.parts;
} else {
error("Unknown communication sub-type");
}
err = MPI_Irecv(buff, count, type, t->ci->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
if (err != MPI_SUCCESS) {
mpi_error(err, "Failed to emit irecv for particle data.");
}
/* And log, if logging enabled. */
mpiuse_log_allocation(t->type, t->subtype, &t->req, 1, size,
mpiuse_log_allocation(t->type, t->subtype, &t->buff, 1, t->size,
t->ci->nodeID, t->flags);
qid = 1 % s->nr_queues;
......@@ -2069,51 +2015,38 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
case task_type_send:
#ifdef WITH_MPI
{
size_t size = 0; /* Size in bytes. */
size_t count = 0; /* Number of elements to send */
MPI_Datatype type = MPI_BYTE; /* Type of the elements */
void *buff = NULL; /* Buffer to send */
/* Set the size of message in bytes and point buff at message. . */
t->size = scheduler_mpi_size(t);
if (t->subtype == task_subtype_tend_part) {
size = count =
t->ci->mpi.pcell_size * sizeof(struct pcell_step_hydro);
buff = t->buff = malloc(size);
cell_pack_end_step_hydro(t->ci, (struct pcell_step_hydro *)buff);
t->buff = malloc(t->size);
cell_pack_end_step_hydro(t->ci, (struct pcell_step_hydro *)t->buff);
} else if (t->subtype == task_subtype_tend_gpart) {
size = count = t->ci->mpi.pcell_size * sizeof(struct pcell_step_grav);
buff = t->buff = malloc(size);
cell_pack_end_step_grav(t->ci, (struct pcell_step_grav *)buff);
t->buff = malloc(t->size);
cell_pack_end_step_grav(t->ci, (struct pcell_step_grav *)t->buff);
} else if (t->subtype == task_subtype_tend_spart) {
size = count =
t->ci->mpi.pcell_size * sizeof(struct pcell_step_stars);
buff = t->buff = malloc(size);
cell_pack_end_step_stars(t->ci, (struct pcell_step_stars *)buff);
t->buff = malloc(t->size);
cell_pack_end_step_stars(t->ci, (struct pcell_step_stars *)t->buff);
} else if (t->subtype == task_subtype_tend_bpart) {
size = count =
t->ci->mpi.pcell_size * sizeof(struct pcell_step_black_holes);
buff = t->buff = malloc(size);
cell_pack_end_step_black_holes(t->ci,
(struct pcell_step_black_holes *)buff);
t->buff = malloc(t->size);
cell_pack_end_step_black_holes(
t->ci, (struct pcell_step_black_holes *)t->buff);
} else if (t->subtype == task_subtype_part_swallow) {
size = count =
t->ci->hydro.count * sizeof(struct black_holes_part_data);
buff = t->buff = malloc(size);
cell_pack_part_swallow(t->ci, (struct black_holes_part_data *)buff);
t->buff = malloc(t->size);
cell_pack_part_swallow(t->ci,
(struct black_holes_part_data *)t->buff);
} else if (t->subtype == task_subtype_bpart_merger) {
size = count =
sizeof(struct black_holes_bpart_data) * t->ci->black_holes.count;
buff = t->buff = malloc(size);
t->buff = malloc(t->size);
cell_pack_bpart_swallow(t->ci,
(struct black_holes_bpart_data *)t->buff);
......@@ -2122,66 +2055,38 @@ void scheduler_enqueue(struct scheduler *s, struct task *t) {
t->subtype == task_subtype_gradient ||
t->subtype == task_subtype_limiter) {
count = t->ci->hydro.count;
size = count * sizeof(struct part);
type = part_mpi_type;
buff = t->ci->hydro.parts;
t->buff = t->ci->hydro.parts;
} else if (t->subtype == task_subtype_gpart) {
count = t->ci->grav.count;
size = count * sizeof(struct gpart);
type = gpart_mpi_type;
buff = t->ci->grav.parts;
t->buff = t->ci->grav.parts;
} else if (t->subtype == task_subtype_spart) {
count = t->ci->stars.count;
size = count * sizeof(struct spart);
type = spart_mpi_type;
buff = t->ci->stars.parts;
t->buff = t->ci->stars.parts;
} else if (t->subtype == task_subtype_bpart_rho ||
t->subtype == task_subtype_bpart_swallow ||
t->subtype == task_subtype_bpart_feedback) {
count = t->ci->black_holes.count;
size = count * sizeof(struct bpart);
type = bpart_mpi_type;
buff = t->ci->black_holes.parts;
t->buff = t->ci->black_holes.parts;
} else if (t->subtype == task_subtype_multipole) {
count = t->ci->mpi.pcell_size;
size = count * sizeof(struct gravity_tensors);
type = multipole_mpi_type;
buff = t->buff = malloc(size);
cell_pack_multipoles(t->ci, (struct gravity_tensors *)buff);
t->buff = malloc(t->size);
cell_pack_multipoles(t->ci, (struct gravity_tensors *)t->buff);
} else if (t->subtype == task_subtype_sf_counts) {
size = count = t->ci->mpi.pcell_size * sizeof(struct pcell_sf);
buff = t->buff = malloc(size);
t->buff = malloc(t->size);
cell_pack_sf_counts(t->ci, (struct pcell_sf *)t->buff);
} else {
error("Unknown communication sub-type");
}
if (size > s->mpi_message_limit) {
err = MPI_Isend(buff, count, type, t->cj->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
} else {
err = MPI_Issend(buff, count, type, t->cj->nodeID, t->flags,
subtaskMPI_comms[t->subtype], &t->req);
}
if (err != MPI_SUCCESS) {
mpi_error(err, "Failed to emit isend for particle data.");
}
/* And log, if logging enabled. */
mpiuse_log_allocation(t->type, t->subtype, &t->req, 1, size,
mpiuse_log_allocation(t->type, t->subtype, &t->buff, 1, t->size,
t->cj->nodeID, t->flags);
qid = 0;
......@@ -2318,7 +2223,7 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
/* Try to get a task from the suggested queue. */
if (s->queues[qid].count > 0 || s->queues[qid].count_incoming > 0) {
TIMER_TIC
res = queue_gettask(&s->queues[qid], prev, 0);
res = queue_gettask(s, &s->queues[qid], prev, 0);
TIMER_TOC(timer_qget);
if (res != NULL) break;
}
......@@ -2333,7 +2238,7 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
for (int k = 0; k < scheduler_maxsteal && count > 0; k++) {
const int ind = rand_r(&seed) % count;
TIMER_TIC
res = queue_gettask(&s->queues[qids[ind]], prev, 0);
res = queue_gettask(s, &s->queues[qids[ind]], prev, 0);
TIMER_TOC(timer_qsteal);
if (res != NULL)
break;
......@@ -2352,7 +2257,7 @@ struct task *scheduler_gettask(struct scheduler *s, int qid,
#endif
{
pthread_mutex_lock(&s->sleep_mutex);
res = queue_gettask(&s->queues[qid], prev, 1);
res = queue_gettask(s, &s->queues[qid], prev, 1);
if (res == NULL && s->waiting > 0) {
pthread_cond_wait(&s->sleep_cond, &s->sleep_mutex);
}
......@@ -2425,6 +2330,17 @@ void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks,
s->tasks_ind = NULL;
pthread_key_create(&s->local_seed_pointer, NULL);
scheduler_reset(s, nr_tasks);
#ifdef WITH_MPI
/* No of ranks in the system. */
int res = MPI_Comm_size(MPI_COMM_WORLD, &s->nr_ranks);
if (res != MPI_SUCCESS)
mpi_error(res, "Failed to determine number of MPI ranks");
/* Clear the MPI one sided windows. */
for (int k = 0; k < task_subtype_count; k++)
s->osmpi_windows[k] = MPI_WIN_NULL;
#endif
}
/**
......@@ -2456,6 +2372,9 @@ void scheduler_print_tasks(const struct scheduler *s, const char *fileName) {
*/
void scheduler_clean(struct scheduler *s) {
scheduler_free_tasks(s);
#ifdef WITH_MPI
scheduler_osmpi_free(s);
#endif
swift_free("unlocks", s->unlocks);
swift_free("unlock_ind", s->unlock_ind);
for (int i = 0; i < s->nr_queues; ++i) queue_clean(&s->queues[i]);
......@@ -2547,15 +2466,18 @@ void scheduler_dump_queues(struct engine *e) {
struct scheduler *s = &e->sched;
char dumpfile[35];
static int index = 0;
#ifdef WITH_MPI
/* Open a file per rank and write the header. Use per rank to avoid MPI
* calls that can interact with other blocking ones. */
snprintf(dumpfile, sizeof(dumpfile), "queue_dump_MPI-step%d.dat_%d", e->step,
e->nodeID);
snprintf(dumpfile, sizeof(dumpfile), "queue_dump_MPI-step%d.dat_%d_%d",
e->step, e->nodeID, index);
#else
snprintf(dumpfile, sizeof(dumpfile), "queue_dump-step%d.dat", e->step);
snprintf(dumpfile, sizeof(dumpfile), "queue_dump-step%d.dat_%d", e->step,
index);
#endif
index++;
FILE *file_thread = fopen(dumpfile, "w");
fprintf(file_thread, "# rank queue index type subtype weight\n");
......@@ -2631,3 +2553,165 @@ void scheduler_report_task_times(const struct scheduler *s,
message("took %.3f %s.", clocks_from_ticks(getticks() - tic),
clocks_getunit());
}
/**
* @brief Determine the size of an MPI message based on the subtype.
*
* @param t the MPI #task
*/
size_t scheduler_mpi_size(struct task *t) {
size_t size = 0;
#ifdef WITH_MPI
if (t->subtype == task_subtype_tend_part) {
size = t->ci->mpi.pcell_size * sizeof(struct pcell_step_hydro);
} else if (t->subtype == task_subtype_tend_gpart) {
size = t->ci->mpi.pcell_size * sizeof(struct pcell_step_grav);
} else if (t->subtype == task_subtype_tend_spart) {
size = t->ci->mpi.pcell_size * sizeof(struct pcell_step_stars);
} else if (t->subtype == task_subtype_tend_bpart) {
size = t->ci->mpi.pcell_size * sizeof(struct pcell_step_black_holes);
} else if (t->subtype == task_subtype_part_swallow) {
size = t->ci->hydro.count * sizeof(struct black_holes_part_data);
} else if (t->subtype == task_subtype_bpart_merger) {
size = sizeof(struct black_holes_bpart_data) * t->ci->black_holes.count;
} else if (t->subtype == task_subtype_xv || t->subtype == task_subtype_rho ||
t->subtype == task_subtype_gradient ||
t->subtype == task_subtype_limiter) {
size = t->ci->hydro.count * sizeof(struct part);
} else if (t->subtype == task_subtype_gpart) {
size = t->ci->grav.count * sizeof(struct gpart);
} else if (t->subtype == task_subtype_spart) {
size = t->ci->stars.count * sizeof(struct spart);
} else if (t->subtype == task_subtype_bpart_rho ||
t->subtype == task_subtype_bpart_swallow ||
t->subtype == task_subtype_bpart_feedback) {
size = t->ci->black_holes.count * sizeof(struct bpart);
} else if (t->subtype == task_subtype_multipole) {
size = t->ci->mpi.pcell_size * sizeof(struct gravity_tensors);
} else if (t->subtype == task_subtype_sf_counts) {
size = t->ci->mpi.pcell_size * sizeof(struct pcell_sf);
} else {
error("Unknown communication sub-type");
}
#endif
return size;
}
/**
* @brief Initialise the caches for handling one-sided MPI.
*
* @param s the #scheduler
*/
void scheduler_osmpi_init(struct scheduler *s) {
#ifdef WITH_MPI
/* Free any previous caches and windows. */
scheduler_osmpi_free(s);
/* And get new caches. Windows and buffers can only be initialised once more
* after all the tasks have been made. */
s->send_mpicache = mpicache_init();
s->recv_mpicache = mpicache_init();
#endif
}
/**
* @brief Allocate the windows buffers for one-sided MPI exchanges.
*
* @param s the #scheduler
*/
void scheduler_osmpi_init_buffers(struct scheduler *s) {
#ifdef WITH_MPI
/* First apply the caches to the tasks, so we get the window sizes and the
* remote offsets set in the tasks. */
mpicache_apply(s->send_mpicache, s->nr_ranks, "send");
mpicache_apply(s->recv_mpicache, s->nr_ranks, "recv");
/* We have one window per subtype, which is large enough for all exchanges. */
for (int k = 0; k < task_subtype_count; k++) {
s->osmpi_sizes[k] = s->recv_mpicache->window_sizes[k];
/* Note windows can have zero size, but must exist on both sides of any
* exchange and this is effectively a collective operation across the
* communicator. */
int err = MPI_Win_allocate(s->osmpi_sizes[k], scheduler_osmpi_bytesinblock,
MPI_INFO_NULL, subtaskMPI_comms[k],
&s->osmpi_ptrs[k], &s->osmpi_windows[k]);
if (err != MPI_SUCCESS) {
mpi_error(err, "Failed to create osmpi window for subtype: %d", k);
}
/* Assert a shared lock with all the other processes on this window. */
err = MPI_Win_lock_all(MPI_MODE_NOCHECK, s->osmpi_windows[k]);
if (err != MPI_SUCCESS) {
mpi_error(err, "Failed to lock osmpi window");
}
}
/* Now we need to build an array that allows all nodes to find their offsets
* into the windows of any other node per subtype and exchange this with
* all the other nodes. */
s->global_offsets = calloc(task_subtype_count * s->nr_ranks * s->nr_ranks,
scheduler_osmpi_bytesinblock);
for (int k = 0; k < task_subtype_count; k++) {
for (int j = 0; j < s->nr_ranks; j++) {
size_t offset =
s->recv_mpicache
->window_node_offsets[INDEX2(task_subtype_count, k, j)];
if (offset != LLONG_MAX) {
s->global_offsets[INDEX3(task_subtype_count, s->nr_ranks, k, j,
engine_rank)] = offset;
}
}
}
MPI_Allreduce(MPI_IN_PLACE, s->global_offsets,
task_subtype_count * s->nr_ranks * s->nr_ranks,
scheduler_osmpi_mpi_blocktype, MPI_SUM, MPI_COMM_WORLD);
#endif
}
/**
* @brief Free any buffers used for one-sided MPI exchanges.
*
* @param s the #scheduler
*/
void scheduler_osmpi_free(struct scheduler *s) {
#ifdef WITH_MPI
/* Free the MPI caches. */
if (s->send_mpicache != NULL) mpicache_destroy(s->send_mpicache);
s->send_mpicache = NULL;
if (s->recv_mpicache != NULL) mpicache_destroy(s->recv_mpicache);
s->recv_mpicache = NULL;
/* Free the windows, locks and associated buffers. */
for (int k = 0; k < task_subtype_count; k++) {
if (s->osmpi_windows[k] != MPI_WIN_NULL) {
MPI_Win_unlock_all(s->osmpi_windows[k]);
MPI_Win_free(&s->osmpi_windows[k]);
s->osmpi_windows[k] = MPI_WIN_NULL;
}
s->osmpi_ptrs[k] = NULL;
}
/* Free the offsets cache. */
if (s->global_offsets != NULL) free(s->global_offsets);
s->global_offsets = NULL;
#endif
}
......@@ -26,7 +26,7 @@
/* MPI headers. */
#ifdef WITH_MPI
#include <mpi.h>
#endif
#endif /* WITH_MPI */
/* Some standard headers. */
#include <pthread.h>
......@@ -35,6 +35,7 @@
#include "cell.h"
#include "inline.h"
#include "lock.h"
#include "mpicache.h"
#include "queue.h"
#include "task.h"
#include "threadpool.h"
......@@ -54,6 +55,32 @@
#define scheduler_flag_none 0
#define scheduler_flag_steal (1 << 1)
/* Constants for one-sided, os, MPI work. Flags for controlling access. */
#define scheduler_osmpi_locked -2
#define scheduler_osmpi_unlocked -3
/* Size of a block of memory. Need to send MPI messages aligned to this size. */
#define scheduler_osmpi_blocktype size_t
#define scheduler_osmpi_mpi_blocktype MPI_AINT
#define scheduler_osmpi_bytesinblock sizeof(size_t)
/* Size of message header control block in blocks. The lock flag, size and tag
* and originating node (for consistency checks).
* XXX make some of these debugging checks. */
#define scheduler_osmpi_header_size 4
/* Number of threads we can use for sending. */
#define scheduler_osmpi_max_sends 1
/* Convert a byte count into a number of blocks, rounds up. */
#define scheduler_osmpi_toblocks(nr_bytes) \
((nr_bytes + (scheduler_osmpi_bytesinblock - 1)) / \
scheduler_osmpi_bytesinblock)
/* Convert a block count into a number of bytes. */
#define scheduler_osmpi_tobytes(nr_blocks) \
(nr_blocks * scheduler_osmpi_bytesinblock)
/* Data of a scheduler. */
struct scheduler {
/* Scheduler flags. */
......@@ -111,6 +138,24 @@ struct scheduler {
/* Total ticks spent running the tasks */
ticks total_ticks;
#ifdef WITH_MPI
/* MPI windows for one-sided messages. */
MPI_Win osmpi_windows[task_subtype_count];
size_t osmpi_sizes[task_subtype_count];
volatile scheduler_osmpi_blocktype *osmpi_ptrs[task_subtype_count];
/* Number of MPI ranks in the system. */
int nr_ranks;
/* Caches for capturing the MPI tasks and working out the window sizes and
* offsets. */
struct mpicache *send_mpicache;
struct mpicache *recv_mpicache;
/* Array of offsets for each node into another nodes subtypes. */
scheduler_osmpi_blocktype *global_offsets;
#endif
};
/* Inlined functions (for speed). */
......@@ -178,6 +223,24 @@ scheduler_activate_recv(struct scheduler *s, struct link *link,
return l;
}
/* Forward declaration. */
size_t scheduler_mpi_size(struct task *t);
/**
* @brief Add an MPI task a cache and set the message size.
*
* @param s The #scheduler.
* @param cache the #mpicache, usually one of those in the scheduler.
* @param node the MPI rank that the message originates from.
*/
__attribute__((always_inline)) INLINE static void scheduler_cache_mpitask(
struct mpicache *cache, int node, struct task *t) {
#if WITH_MPI
mpicache_add(cache, node, t);
t->size = scheduler_mpi_size(t);
#endif
}
/* Function prototypes. */
void scheduler_clear_active(struct scheduler *s);
void scheduler_init(struct scheduler *s, struct space *space, int nr_tasks,
......@@ -209,4 +272,7 @@ void scheduler_dump_queues(struct engine *e);
void scheduler_report_task_times(const struct scheduler *s,
const int nr_threads);
void scheduler_osmpi_init(struct scheduler *s);
void scheduler_osmpi_init_buffers(struct scheduler *s);
void scheduler_osmpi_free(struct scheduler *s);
#endif /* SWIFT_SCHEDULER_H */
......@@ -48,6 +48,9 @@
#include "lock.h"
#include "mpiuse.h"
/* Index into 3D array. */
#define INDEX3(nx, ny, x, y, z) (nx * ny * z + nx * y + x)
/* Task type names. */
const char *taskID_names[task_type_count] = {
"none",
......@@ -623,43 +626,175 @@ void task_unlock(struct task *t) {
*
* @param t the #task.
*/
int task_lock(struct task *t) {
int task_lock(struct scheduler *s, struct task *t) {
const enum task_types type = t->type;
const enum task_subtypes subtype = t->subtype;
struct cell *ci = t->ci, *cj = t->cj;
#ifdef WITH_MPI
int res = 0, err = 0;
MPI_Status stat;
#endif
switch (type) {
/* Communication task? */
case task_type_recv:
case task_type_send:
#ifdef WITH_MPI
/* Check the status of the MPI request. */
if ((err = MPI_Test(&t->req, &res, &stat)) != MPI_SUCCESS) {
char buff[MPI_MAX_ERROR_STRING];
int len;
MPI_Error_string(err, buff, &len);
error(
"Failed to test request on send/recv task (type=%s/%s tag=%lld, "
"%s).",
taskID_names[t->type], subtaskID_names[t->subtype], t->flags, buff);
{
/* Data has the actual data and room for the header.
* XXX horrible extra data copy, can we separate headers? */
scheduler_osmpi_blocktype datasize =
scheduler_osmpi_toblocks(t->size) + scheduler_osmpi_header_size;
scheduler_osmpi_blocktype *dataptr =
calloc(datasize, scheduler_osmpi_bytesinblock);
/* First element is marked as LOCKED, so only we can update. */
dataptr[0] = scheduler_osmpi_locked;
dataptr[1] = t->size;
dataptr[2] = t->flags;
dataptr[3] = engine_rank;
memcpy(&dataptr[scheduler_osmpi_header_size], t->buff, t->size);
if (s->space->e->verbose)
message(
"Sending message to %d from %d subtype %d tag %zu size %zu"
" (cf %lld %zu) offset %zu",
cj->nodeID, ci->nodeID, subtype, dataptr[2], dataptr[1], t->flags,
t->size, t->offset);
/* And send to the destination rank. Need offset for this message into
* the remote window. */
size_t index = INDEX3(task_subtype_count, s->nr_ranks, subtype,
engine_rank, cj->nodeID);
size_t offset = s->global_offsets[index] + t->offset;
// if (t->subtype == task_subtype_xv) {
// message("sending our offsets %zu %zu %zu to %d "
// "subtype %d tag %lld size %zu",
// s->global_offsets[index], t->offset, offset,
// cj->nodeID, subtype, t->flags, t->size);
//}
int err = MPI_Accumulate(dataptr, datasize, scheduler_osmpi_mpi_blocktype,
cj->nodeID, offset, datasize,
scheduler_osmpi_mpi_blocktype, MPI_REPLACE,
s->osmpi_windows[subtype]);
if (err != MPI_SUCCESS) {
mpi_error(err, "Failed to send particle data.");
}
if (s->space->e->verbose) {
message(
"Sent message to %d subtype %d tag %zu size %zu offset %zu"
" (cf %lld %zu)",
cj->nodeID, subtype, dataptr[2], dataptr[1], offset, t->flags,
t->size);
}
/* Now we change the first element to unlocked so that the remote end
* can find out that the data has arrived. */
scheduler_osmpi_blocktype newval[1];
scheduler_osmpi_blocktype oldval[1];
newval[0] = scheduler_osmpi_unlocked;
oldval[0] = 0;
err = MPI_Compare_and_swap(&newval[0], dataptr, &oldval[0],
scheduler_osmpi_mpi_blocktype, cj->nodeID,
offset, s->osmpi_windows[subtype]);
// dataptr[0] = scheduler_osmpi_unlocked;
// err = MPI_Accumulate(dataptr, 1, scheduler_osmpi_mpi_blocktype,
// cj->nodeID, offset, 1,
// scheduler_osmpi_mpi_blocktype, MPI_REPLACE,
// s->osmpi_windows[subtype]);
// if (err != MPI_SUCCESS) {
// mpi_error(err, "Compare and swap send error for particle data.");
//}
// XXX try to live without this.
err = MPI_Win_flush(cj->nodeID, s->osmpi_windows[subtype]);
if (err != MPI_SUCCESS) mpi_error(err, "MPI_Win_flush failed");
/* Release data. XXX not yet, we need to wait, perhaps a local poll?*/
free(dataptr); // XXX reuse this surely...
if (s->space->e->verbose)
message("Sent and ack message to %d subtype %d tag %lld size %zu",
cj->nodeID, subtype, t->flags, t->size);
/* And log deactivation, if logging enabled. */
if (res) {
mpiuse_log_allocation(t->type, t->subtype, &t->req, 0, 0, 0, 0);
mpiuse_log_allocation(type, subtype, &t->buff, 0, 0, 0, 0);
return 1;
}
#endif
break;
case task_type_recv:
#ifdef WITH_MPI
{
/* Check for a message waiting for this subtype from our expected
* node. */
size_t index = INDEX3(task_subtype_count, s->nr_ranks, subtype,
ci->nodeID, engine_rank);
size_t offset = s->global_offsets[index] + t->offset;
// if (t->subtype == task_subtype_xv) {
// message("%d %d task recv: goff[%d,%d,%d] = offsets total %zu node %zu
// task %zu tag %lld",
// engine_rank, ci->cellID, subtype, engine_rank, ci->nodeID,
// offset, s->global_offsets[index], t->offset, t->flags);
//}
volatile scheduler_osmpi_blocktype *dataptr =
&s->osmpi_ptrs[subtype][offset];
if (dataptr[0] == (scheduler_osmpi_blocktype)scheduler_osmpi_unlocked) {
/* Message from our remote and subtype waiting, does it match our tag
* and size? */
if (t->flags == (int)dataptr[2] && t->size == dataptr[1] &&
ci->nodeID == (int)dataptr[3]) {
if (s->space->e->verbose)
message(
"Accepted from %d subtype %d tag %lld size %zu"
" offset %zu",
ci->nodeID, subtype, t->flags, t->size, offset);
/* And log deactivation, if logging enabled. */
mpiuse_log_allocation(type, subtype, &t->buff, 0, 0, 0, 0);
/* Ready to process. So copy to local buffers. */
memcpy(t->buff, (void *)&dataptr[scheduler_osmpi_header_size],
t->size);
/* Ready for next recv (iff reused). */
dataptr[0] = scheduler_osmpi_locked;
dataptr[1] = scheduler_osmpi_locked;
dataptr[2] = scheduler_osmpi_locked;
dataptr[3] = scheduler_osmpi_locked;
return 1;
} else {
message(
"missed recv at our offsets %zu %zu %zu from %d "
"subtype %d tag %lld size %zu "
"see tag %zu size %zu from %zu",
s->global_offsets[index], t->offset, offset, ci->nodeID, subtype,
t->flags, t->size, dataptr[2], dataptr[1], dataptr[3]);
}
}
return res;
/* Need to allow for some MPI progession. Since we make no MPI calls
* (by intent receive is a passive target so only the sender should
* make calls that move data). */
int flag = 0;
int ret = MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
&flag, MPI_STATUS_IGNORE);
if (ret != MPI_SUCCESS) mpi_error(ret, "MPI_Iprobe failed");
return 0;
}
#else
error("SWIFT was not compiled with MPI support.");
#endif
break;
break;
case task_type_kick1:
case task_type_kick2:
......@@ -1027,7 +1162,7 @@ void task_create_mpi_comms(void) {
}
}
/**
* @brief Create global communicators for each of the subtasks.
* @brief Free global communicators for each of the subtasks.
*/
void task_free_mpi_comms(void) {
for (int i = 0; i < task_subtype_count; i++) {
......
......@@ -33,6 +33,8 @@
/* Forward declarations to avoid circular inclusion dependencies. */
struct cell;
struct engine;
struct scheduler;
struct task;
#define task_align 128
......@@ -222,8 +224,11 @@ struct task {
/*! Buffer for this task's communications */
void *buff;
/*! MPI request corresponding to this task */
MPI_Request req;
/*! Size of the buffer. */
size_t size;
/*! Offset of the buffer in remote window. */
size_t offset;
#endif
......@@ -275,7 +280,7 @@ struct task {
/* Function prototypes. */
void task_unlock(struct task *t);
float task_overlap(const struct task *ta, const struct task *tb);
int task_lock(struct task *t);
int task_lock(struct scheduler *s, struct task *t);
void task_do_rewait(struct task *t);
void task_print(const struct task *t);
void task_dump_all(struct engine *e, int step);
......
......@@ -23,9 +23,11 @@
#include "../config.h"
/* Local headers. */
#include "error.h"
#include "inline.h"
#include "intrinsics.h"
#include <limits.h>
#include <math.h>
#include <stdint.h>
......@@ -59,6 +61,11 @@ __attribute__((const)) static INLINE integertime_t
get_integer_timestep(timebin_t bin) {
if (bin <= 0) return 0;
#ifdef SWIFT_DEBUG_CHECKS
if ((size_t)bin > (sizeof(integertime_t) * CHAR_BIT))
error("Time bin %zu exceeds limit of %zd", (size_t)bin,
sizeof(integertime_t) * CHAR_BIT);
#endif
return 1LL << (bin + 1);
}
......
......@@ -102,6 +102,12 @@ for f in infiles:
recvs[key].append(line[:-1])
# Now output. Note we could have unmatched recv keys, we don't check for that.
print "# send_stic send_etic send_dtic send_step send_rank send_otherrank " + \
"send_type send_itype send_subtype send_isubtype send_activation " + \
"send_tag send_size send_sum " + \
"recv_stic recv_etic recv_dtic recv_step recv_rank recv_otherrank " + \
"recv_type recv_itype recv_subtype recv_isubtype recv_activation " + \
"recv_tag recv_size recv_sum "
for key in sends:
if key in recvs:
if len(sends[key]) == 1 and len(recvs[key]) == 1:
......