/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
* Matthieu Schaller (schaller@strw.leidenuniv.nl)
* 2015 Peter W. Draper (p.w.draper@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see .
*
******************************************************************************/
/* Config parameters. */
#include
/* System includes. */
#include
#include
#include
#include
#ifdef HAVE_LIBNUMA
#include
#endif
/* This object's header. */
#include "engine.h"
/* Local headers. */
#include "fof.h"
#include "line_of_sight.h"
#include "mpiuse.h"
#include "part.h"
#include "pressure_floor.h"
#include "proxy.h"
#include "rt.h"
#include "star_formation.h"
#include "star_formation_logger.h"
#include "stars_io.h"
#include "statistics.h"
#include "version.h"
extern int engine_max_parts_per_ghost;
extern int engine_max_sparts_per_ghost;
extern int engine_max_parts_per_cooling;
/**
* @brief dump diagnostic data on tasks, memuse, mpiuse, queues.
*
* @param e the #engine
*/
void engine_dump_diagnostic_data(struct engine *e) {
/* OK, do our work. */
message("Dumping engine tasks in step: %d", e->step);
task_dump_active(e);
#ifdef SWIFT_MEMUSE_REPORTS
/* Dump the currently logged memory. */
message("Dumping memory use report");
memuse_log_dump_error(e->nodeID);
#endif
#if defined(SWIFT_MPIUSE_REPORTS) && defined(WITH_MPI)
/* Dump the MPI interactions in the step. */
mpiuse_log_dump_error(e->nodeID);
#endif
/* Add more interesting diagnostics. */
scheduler_dump_queues(e);
}
/* Particle cache size. */
#define CACHE_SIZE 512
#ifdef SWIFT_DUMPER_THREAD
/**
* @brief dumper thread action, checks got the existence of the .dump file
* every 5 seconds and does the dump if found.
*
* @param p the #engine
*/
static void *engine_dumper_poll(void *p) {
struct engine *e = (struct engine *)p;
#ifdef WITH_MPI
char dumpfile[10];
snprintf(dumpfile, sizeof(dumpfile), ".dump.%d", e->nodeID);
#else
const char *dumpfile = ".dump";
#endif
while (1) {
if (access(dumpfile, F_OK) == 0) {
engine_dump_diagnostic_data(e);
/* Delete the file. */
unlink(dumpfile);
message("Dumping completed");
fflush(stdout);
}
/* Take a breath. */
sleep(5);
}
return NULL;
}
#endif /* SWIFT_DUMPER_THREAD */
#ifdef SWIFT_DUMPER_THREAD
/**
* @brief creates the dumper thread.
*
* This watches for the creation of a ".dump" file in the current directory
* and if found dumps the current state of the tasks and memory use (if also
* configured).
*
* @param e the #engine
*
*/
static void engine_dumper_init(struct engine *e) {
pthread_t dumper;
#ifdef WITH_MPI
char dumpfile[10];
snprintf(dumpfile, sizeof(dumpfile), ".dump.%d", e->nodeID);
#else
const char *dumpfile = ".dump";
#endif
/* Make sure the .dump file is not present, that is bad when starting up. */
struct stat buf;
if (stat(dumpfile, &buf) == 0) unlink(dumpfile);
/* Thread does not exit, so nothing to do but create it. */
pthread_create(&dumper, NULL, &engine_dumper_poll, e);
}
#endif /* SWIFT_DUMPER_THREAD */
/**
* @brief configure an engine with the given number of threads, queues
* and core affinity. Also initialises the scheduler and opens various
* output files, computes the next timestep and initialises the
* threadpool.
*
* Assumes the engine is correctly initialised i.e. is restored from a restart
* file or has been setup by engine_init(). When restarting any output log
* files are positioned so that further output is appended. Note that
* parameters are not read from the engine, just the parameter file, this
* allows values derived in this function to be changed between runs.
* When not restarting params should be the same as given to engine_init().
*
* @param restart true when restarting the application.
* @param fof true when starting a stand-alone FOF call.
* @param e The #engine.
* @param params The parsed parameter file.
* @param nr_nodes The number of MPI ranks.
* @param nodeID The MPI rank of this node.
* @param nr_task_threads The number of engine threads per MPI rank.
* @param nr_pool_threads The number of threadpool threads per MPI rank.
* @param with_aff use processor affinity, if supported.
* @param verbose Is this #engine talkative ?
* @param restart_file The name of our restart file.
* @param reparttype What type of repartition algorithm are we using.
*/
void engine_config(int restart, int fof, struct engine *e,
struct swift_params *params, int nr_nodes, int nodeID,
int nr_task_threads, int nr_pool_threads, int with_aff,
int verbose, const char *restart_dir,
const char *restart_file, struct repartition *reparttype) {
struct clocks_time tic, toc;
if (nodeID == 0) clocks_gettime(&tic);
/* Store the values and initialise global fields. */
e->nodeID = nodeID;
e->nr_threads = nr_task_threads;
e->nr_pool_threads = nr_pool_threads;
e->nr_nodes = nr_nodes;
e->proxy_ind = NULL;
e->nr_proxies = 0;
e->forcerebuild = 1;
e->forcerepart = 0;
e->restarting = restart;
e->step_props = engine_step_prop_none;
e->links = NULL;
e->nr_links = 0;
e->file_stats = NULL;
e->file_timesteps = NULL;
e->file_rt_subcycles = NULL;
e->sfh_logger = NULL;
e->verbose = verbose;
e->wallclock_time = 0.f;
e->restart_dump = 0;
e->restart_dir = restart_dir;
e->restart_file = restart_file;
e->resubmit = 0;
e->restart_next = 0;
e->restart_dt = 0;
e->run_fof = 0;
/* Seed rand(). */
srand(clocks_random_seed());
/* Allow repartitioning to be changed between restarts. On restart this is
* already allocated and freed on exit, so we need to copy over. */
#ifdef WITH_MPI
if (restart) {
int *celllist = e->reparttype->celllist;
int ncelllist = e->reparttype->ncelllist;
memcpy(e->reparttype, reparttype, sizeof(struct repartition));
e->reparttype->celllist = celllist;
e->reparttype->ncelllist = ncelllist;
} else {
e->reparttype = reparttype;
}
#endif
if (restart && fof) {
error(
"Can't configure the engine to be a stand-alone FOF and restarting "
"from a check-point at the same time!");
}
/* Welcome message */
if (e->nodeID == 0) message("Running simulation '%s'.", e->run_name);
/* Check-pointing properties */
e->restart_stop_steps =
parser_get_opt_param_int(params, "Restarts:stop_steps", 100);
e->restart_max_hours_runtime =
parser_get_opt_param_float(params, "Restarts:max_run_time", FLT_MAX);
e->resubmit_after_max_hours =
parser_get_opt_param_int(params, "Restarts:resubmit_on_exit", 0);
if (e->resubmit_after_max_hours)
parser_get_param_string(params, "Restarts:resubmit_command",
e->resubmit_command);
/* Get the number of queues */
int nr_queues =
parser_get_opt_param_int(params, "Scheduler:nr_queues", e->nr_threads);
if (nr_queues <= 0) nr_queues = e->nr_threads;
if (nr_queues != nr_task_threads)
message("Number of task queues set to %d", nr_queues);
e->s->nr_queues = nr_queues;
/* Get the frequency of the dependency graph dumping */
e->sched.frequency_dependency = parser_get_opt_param_int(
params, "Scheduler:dependency_graph_frequency", 0);
if (e->sched.frequency_dependency < 0) {
error("Scheduler:dependency_graph_frequency should be >= 0");
}
/* Get cellID for extra dependency graph dumps of specific cell */
e->sched.dependency_graph_cellID = parser_get_opt_param_longlong(
params, "Scheduler:dependency_graph_cell", 0LL);
/* Get the frequency of the task level dumping */
e->sched.frequency_task_levels = parser_get_opt_param_int(
params, "Scheduler:task_level_output_frequency", 0);
if (e->sched.frequency_task_levels < 0) {
error("Scheduler:task_level_output_frequency should be >= 0");
}
#if defined(SWIFT_DEBUG_CHECKS)
e->sched.deadlock_waiting_time_ms = parser_get_opt_param_float(
params, "Scheduler:deadlock_waiting_time_s", -1.f);
/* User provides parameter in s. We want it in ms. */
e->sched.deadlock_waiting_time_ms *= 1000.f;
#endif
/* Deal with affinity. For now, just figure out the number of cores. */
#if defined(HAVE_SETAFFINITY)
const int nr_cores = sysconf(_SC_NPROCESSORS_ONLN);
cpu_set_t *entry_affinity = engine_entry_affinity();
const int nr_affinity_cores = CPU_COUNT(entry_affinity);
if (nr_cores > CPU_SETSIZE) /* Unlikely, except on e.g. SGI UV. */
error("must allocate dynamic cpu_set_t (too many cores per node)");
if (verbose && with_aff) {
char *buf = (char *)malloc((nr_cores + 1) * sizeof(char));
buf[nr_cores] = '\0';
for (int j = 0; j < nr_cores; ++j) {
/* Reversed bit order from convention, but same as e.g. Intel MPI's
* I_MPI_PIN_DOMAIN explicit mask: left-to-right, LSB-to-MSB. */
buf[j] = CPU_ISSET(j, entry_affinity) ? '1' : '0';
}
message("Affinity at entry: %s", buf);
free(buf);
}
int *cpuid = NULL;
cpu_set_t cpuset;
if (with_aff) {
cpuid = (int *)malloc(nr_affinity_cores * sizeof(int));
int skip = 0;
for (int k = 0; k < nr_affinity_cores; k++) {
int c;
for (c = skip; c < CPU_SETSIZE && !CPU_ISSET(c, entry_affinity); ++c) {
/* Nothing to do here */
}
cpuid[k] = c;
skip = c + 1;
}
#if defined(HAVE_LIBNUMA) && defined(_GNU_SOURCE)
if ((e->policy & engine_policy_cputight) != engine_policy_cputight) {
if (numa_available() >= 0) {
if (nodeID == 0) message("prefer NUMA-distant CPUs");
/* Get list of numa nodes of all available cores. */
int *nodes = (int *)malloc(nr_affinity_cores * sizeof(int));
int nnodes = 0;
for (int i = 0; i < nr_affinity_cores; i++) {
nodes[i] = numa_node_of_cpu(cpuid[i]);
if (nodes[i] > nnodes) nnodes = nodes[i];
}
nnodes += 1;
/* Count cores per node. */
int *core_counts = (int *)malloc(nnodes * sizeof(int));
for (int i = 0; i < nr_affinity_cores; i++) {
core_counts[nodes[i]] = 0;
}
for (int i = 0; i < nr_affinity_cores; i++) {
core_counts[nodes[i]] += 1;
}
/* Index cores within each node. */
int *core_indices = (int *)malloc(nr_affinity_cores * sizeof(int));
for (int i = nr_affinity_cores - 1; i >= 0; i--) {
core_indices[i] = core_counts[nodes[i]];
core_counts[nodes[i]] -= 1;
}
/* Now sort so that we pick adjacent cpuids from different nodes
* by sorting internal node core indices. */
int done = 0;
while (!done) {
done = 1;
for (int i = 1; i < nr_affinity_cores; i++) {
if (core_indices[i] < core_indices[i - 1]) {
int t = cpuid[i - 1];
cpuid[i - 1] = cpuid[i];
cpuid[i] = t;
t = core_indices[i - 1];
core_indices[i - 1] = core_indices[i];
core_indices[i] = t;
done = 0;
}
}
}
free(nodes);
free(core_counts);
free(core_indices);
}
}
#endif
} else {
if (nodeID == 0) message("no processor affinity used");
} /* with_aff */
/* Avoid (unexpected) interference between engine and runner threads. We can
* do this once we've made at least one call to engine_entry_affinity and
* maybe numa_node_of_cpu(sched_getcpu()), even if the engine isn't already
* pinned. */
if (with_aff) engine_unpin();
#endif
if (with_aff && nodeID == 0) {
#ifdef HAVE_SETAFFINITY
#ifdef WITH_MPI
printf("[%04i] %s engine_init: cpu map is [ ", nodeID,
clocks_get_timesincestart());
#else
printf("%s engine_init: cpu map is [ ", clocks_get_timesincestart());
#endif
for (int i = 0; i < nr_affinity_cores; i++) printf("%i ", cpuid[i]);
printf("].\n");
#endif
}
/* Are we doing stuff in parallel? */
if (nr_nodes > 1) {
#ifndef WITH_MPI
error("SWIFT was not compiled with MPI support.");
#else
/* Make sure the corresponding policy is set and make space for the proxies
*/
e->policy |= engine_policy_mpi;
if ((e->proxies = (struct proxy *)calloc(engine_maxproxies,
sizeof(struct proxy))) == NULL)
error("Failed to allocate memory for proxies.");
e->nr_proxies = 0;
/* Use synchronous MPI sends and receives when redistributing. */
e->syncredist =
parser_get_opt_param_int(params, "DomainDecomposition:synchronous", 0);
/* Collect the hostname of each rank into a file */
const int hostname_buffer_length = 256;
char my_hostname[256] = {0};
sprintf(my_hostname, "%s", hostname());
char *hostnames = NULL;
if (nodeID == 0) {
hostnames =
(char *)calloc(nr_nodes * hostname_buffer_length, sizeof(char));
if (hostnames == NULL)
error("Failed to allocate memory for hostname list");
}
MPI_Gather(my_hostname, hostname_buffer_length, MPI_BYTE, hostnames,
hostname_buffer_length, MPI_BYTE, 0, MPI_COMM_WORLD);
if (nodeID == 0) {
FILE *ranklog = NULL;
if (restart) {
ranklog = fopen("rank_hostname.log", "a");
if (ranklog == NULL)
error("Could not create file 'rank_hostname.log'.");
} else {
ranklog = fopen("rank_hostname.log", "w");
if (ranklog == NULL)
error("Could not open file 'rank_hostname.log' for writing.");
}
/* Write the header every restart-cycle. It does not hurt. */
fprintf(ranklog, "# step rank hostname\n");
for (int i = 0; i < nr_nodes; ++i)
fprintf(ranklog, "%d %d %s\n", e->step, i,
hostnames + hostname_buffer_length * i);
fclose(ranklog);
free(hostnames);
}
#endif
}
/* Open some global files */
if (!fof && e->nodeID == 0) {
/* When restarting append to these files. */
const char *mode;
if (restart)
mode = "a";
else
mode = "w";
char energyfileName[200] = "";
parser_get_opt_param_string(params, "Statistics:energy_file_name",
energyfileName,
engine_default_energy_file_name);
sprintf(energyfileName + strlen(energyfileName), ".txt");
e->file_stats = fopen(energyfileName, mode);
if (e->file_stats == NULL)
error("Could not open the file '%s' with mode '%s'.", energyfileName,
mode);
if (!restart)
stats_write_file_header(e->file_stats, e->internal_units,
e->physical_constants);
char timestepsfileName[200] = "";
parser_get_opt_param_string(params, "Statistics:timestep_file_name",
timestepsfileName,
engine_default_timesteps_file_name);
sprintf(timestepsfileName + strlen(timestepsfileName), ".txt");
e->file_timesteps = fopen(timestepsfileName, mode);
if (e->file_timesteps == NULL)
error("Could not open the file '%s' with mode '%s'.", timestepsfileName,
mode);
#ifndef RT_NONE
char rtSubcyclesFileName[200] = "";
parser_get_opt_param_string(params, "Statistics:rt_subcycles_file_name",
rtSubcyclesFileName,
engine_default_rt_subcycles_file_name);
sprintf(rtSubcyclesFileName + strlen(rtSubcyclesFileName), ".txt");
e->file_rt_subcycles = fopen(rtSubcyclesFileName, mode);
if (e->file_rt_subcycles == NULL)
error("Could not open the file '%s' with mode '%s'.", rtSubcyclesFileName,
mode);
#endif
if (!restart) {
fprintf(
e->file_timesteps,
"# Host: %s\n# Branch: %s\n# Revision: %s\n# Compiler: %s, "
"Version: %s \n# "
"Number of threads: %d\n# Number of MPI ranks: %d\n# Hydrodynamic "
"scheme: %s\n# Hydrodynamic kernel: %s\n# No. of neighbours: %.2f "
"+/- %.4f\n# Eta: %f\n# Config: %s\n# CFLAGS: %s\n",
hostname(), git_branch(), git_revision(), compiler_name(),
compiler_version(), e->nr_threads, e->nr_nodes, SPH_IMPLEMENTATION,
kernel_name, e->hydro_properties->target_neighbours,
e->hydro_properties->delta_neighbours,
e->hydro_properties->eta_neighbours, configuration_options(),
compilation_cflags());
fprintf(
e->file_timesteps,
"# Step Properties: Rebuild=%d, Redistribute=%d, Repartition=%d, "
"Statistics=%d, Snapshot=%d, Restarts=%d STF=%d, FOF=%d, mesh=%d\n",
engine_step_prop_rebuild, engine_step_prop_redistribute,
engine_step_prop_repartition, engine_step_prop_statistics,
engine_step_prop_snapshot, engine_step_prop_restarts,
engine_step_prop_stf, engine_step_prop_fof, engine_step_prop_mesh);
fprintf(e->file_timesteps,
"# %6s %14s %12s %12s %14s %9s %12s %12s %12s %12s %12s %16s "
"[%s] %6s %12s [%s]\n",
"Step", "Time", "Scale-factor", "Redshift", "Time-step",
"Time-bins", "Updates", "g-Updates", "s-Updates", "Sink-Updates",
"b-Updates", "Wall-clock time", clocks_getunit(), "Props",
"Dead time", clocks_getunit());
fflush(e->file_timesteps);
#ifndef RT_NONE
fprintf(
e->file_rt_subcycles,
"# Host: %s\n# Branch: %s\n# Revision: %s\n# Compiler: %s, "
"Version: %s \n# "
"Number of threads: %d\n# Number of MPI ranks: %d\n# Hydrodynamic "
"scheme: %s\n# Hydrodynamic kernel: %s\n# No. of neighbours: %.2f "
"+/- %.4f\n# Eta: %f\n# Radiative Transfer Scheme: %s\n# Max Number "
"RT sub-cycles: %d\n# Config: %s\n# CFLAGS: %s\n",
hostname(), git_branch(), git_revision(), compiler_name(),
compiler_version(), e->nr_threads, e->nr_nodes, SPH_IMPLEMENTATION,
kernel_name, e->hydro_properties->target_neighbours,
e->hydro_properties->delta_neighbours,
e->hydro_properties->eta_neighbours, RT_IMPLEMENTATION,
e->max_nr_rt_subcycles, configuration_options(),
compilation_cflags());
fprintf(
e->file_rt_subcycles,
"# Step Properties: Rebuild=%d, Redistribute=%d, Repartition=%d, "
"Statistics=%d, Snapshot=%d, Restarts=%d STF=%d, FOF=%d, mesh=%d\n",
engine_step_prop_rebuild, engine_step_prop_redistribute,
engine_step_prop_repartition, engine_step_prop_statistics,
engine_step_prop_snapshot, engine_step_prop_restarts,
engine_step_prop_stf, engine_step_prop_fof, engine_step_prop_mesh);
fprintf(e->file_rt_subcycles,
"# Note: Sub-cycle=0 is performed during the regular SWIFT step, "
"alongside hydro, gravity etc.\n");
fprintf(e->file_rt_subcycles,
"# For this reason, the wall-clock time and dead time is "
"not available for it, and is written as -1.\n");
fprintf(e->file_rt_subcycles,
"# %6s %9s %14s %12s %12s %14s %9s %12s %16s [%s] %12s [%s]\n",
"Step", "Sub-cycle", "Time", "Scale-factor", "Redshift",
"Time-step", "Time-bins", "RT-Updates", "Wall-clock time",
clocks_getunit(), "Dead time", clocks_getunit());
fflush(e->file_rt_subcycles);
#endif // compiled with RT
}
/* Initialize the SFH logger if running with star formation */
if (e->policy & engine_policy_star_formation) {
e->sfh_logger = fopen("SFR.txt", mode);
if (e->sfh_logger == NULL)
error("Could not open the file 'SFR.txt' with mode '%s'.", mode);
if (!restart) {
star_formation_logger_init_log_file(e->sfh_logger, e->internal_units,
e->physical_constants);
fflush(e->sfh_logger);
}
}
}
/* Print policy */
engine_print_policy(e);
if (!fof) {
/* Print information about the hydro scheme */
if (e->policy & engine_policy_hydro) {
if (e->nodeID == 0) hydro_props_print(e->hydro_properties);
if (e->nodeID == 0) pressure_floor_print(e->pressure_floor_props);
if (e->nodeID == 0) entropy_floor_print(e->entropy_floor);
}
/* Print information about the gravity scheme */
if (e->policy & engine_policy_self_gravity)
if (e->nodeID == 0) gravity_props_print(e->gravity_properties);
/* Print information about the stellar scheme */
if (e->policy & engine_policy_stars)
if (e->nodeID == 0) stars_props_print(e->stars_properties);
/* Print information about the RT scheme */
if (e->policy & engine_policy_rt) {
rt_props_print(e->rt_props);
if (e->nodeID == 0) {
if (e->max_nr_rt_subcycles <= 1)
message("WARNING: running without RT sub-cycling.");
else {
/* Make sure max_nr_rt_subcycles is an acceptable power of 2 */
timebin_t power_subcycles = 0;
while ((e->max_nr_rt_subcycles > (1 << power_subcycles)) &&
power_subcycles < num_time_bins)
++power_subcycles;
if (power_subcycles == num_time_bins)
error("TimeIntegration:max_nr_rt_subcycles=%d too big",
e->max_nr_rt_subcycles);
if ((1 << power_subcycles) > e->max_nr_rt_subcycles)
error("TimeIntegration:max_nr_rt_subcycles=%d not a power of 2",
e->max_nr_rt_subcycles);
message("Running up to %d RT sub-cycles per hydro step.",
e->max_nr_rt_subcycles);
}
}
}
/* Check we have sensible time bounds */
if (e->time_begin >= e->time_end)
error(
"Final simulation time (t_end = %e) must be larger than the start "
"time (t_beg = %e)",
e->time_end, e->time_begin);
/* Check we have sensible time-step values */
if (e->dt_min > e->dt_max)
error(
"Minimal time-step size (%e) must be smaller than maximal time-step "
"size (%e)",
e->dt_min, e->dt_max);
/* Info about time-steps */
if (e->nodeID == 0) {
message("Absolute minimal timestep size: %e", e->time_base);
float dt_min = e->time_end - e->time_begin;
while (dt_min > e->dt_min) dt_min /= 2.f;
message("Minimal timestep size (on time-line): %e", dt_min);
float dt_max = e->time_end - e->time_begin;
while (dt_max > e->dt_max) dt_max /= 2.f;
message("Maximal timestep size (on time-line): %e", dt_max);
}
if (e->dt_min < e->time_base && e->nodeID == 0)
error(
"Minimal time-step size smaller than the absolute possible minimum "
"dt=%e",
e->time_base);
if (!(e->policy & engine_policy_cosmology))
if (e->dt_max > (e->time_end - e->time_begin) && e->nodeID == 0)
error("Maximal time-step size larger than the simulation run time t=%e",
e->time_end - e->time_begin);
/* Read (or re-read the list of outputs */
engine_init_output_lists(e, params, e->output_options);
/* Check whether output quantities make sense */
if (e->policy & engine_policy_cosmology) {
if (e->delta_time_snapshot <= 1.)
error("Time between snapshots (%e) must be > 1.",
e->delta_time_snapshot);
if (e->delta_time_statistics <= 1.)
error("Time between statistics (%e) must be > 1.",
e->delta_time_statistics);
if (e->a_first_snapshot < e->cosmology->a_begin)
error(
"Scale-factor of first snapshot (%e) must be after the simulation "
"start a=%e.",
e->a_first_snapshot, e->cosmology->a_begin);
if (e->a_first_statistics < e->cosmology->a_begin)
error(
"Scale-factor of first stats output (%e) must be after the "
"simulation start a=%e.",
e->a_first_statistics, e->cosmology->a_begin);
if (e->policy & engine_policy_structure_finding) {
if (e->delta_time_stf == -1. && !e->snapshot_invoke_stf)
error("A value for `StructureFinding:delta_time` must be specified");
if (e->a_first_stf_output < e->cosmology->a_begin)
error(
"Scale-factor of first stf output (%e) must be after the "
"simulation start a=%e.",
e->a_first_stf_output, e->cosmology->a_begin);
}
if (e->policy & engine_policy_fof &&
e->fof_properties->seed_black_holes_enabled) {
if (e->delta_time_fof <= 1.)
error("Time between FOF (%e) must be > 1.", e->delta_time_fof);
if (e->a_first_fof_call < e->cosmology->a_begin)
error(
"Scale-factor of first fof call (%e) must be after the "
"simulation start a=%e.",
e->a_first_fof_call, e->cosmology->a_begin);
}
} else {
if (e->delta_time_snapshot <= 0.)
error("Time between snapshots (%e) must be positive.",
e->delta_time_snapshot);
if (e->delta_time_statistics <= 0.)
error("Time between statistics (%e) must be positive.",
e->delta_time_statistics);
/* Find the time of the first output */
if (e->time_first_snapshot < e->time_begin)
error(
"Time of first snapshot (%e) must be after the simulation start "
"t=%e.",
e->time_first_snapshot, e->time_begin);
if (e->time_first_statistics < e->time_begin)
error(
"Time of first stats output (%e) must be after the simulation "
"start t=%e.",
e->time_first_statistics, e->time_begin);
if (e->policy & engine_policy_structure_finding) {
if (e->delta_time_stf == -1. && !e->snapshot_invoke_stf)
error("A value for `StructureFinding:delta_time` must be specified");
if (e->delta_time_stf <= 0. && e->delta_time_stf != -1.)
error("Time between STF (%e) must be positive.", e->delta_time_stf);
if (e->time_first_stf_output < e->time_begin)
error(
"Time of first STF (%e) must be after the simulation start t=%e.",
e->time_first_stf_output, e->time_begin);
}
}
/* Get the total mass */
e->total_mass = 0.;
for (size_t i = 0; i < e->s->nr_gparts; ++i)
e->total_mass += e->s->gparts[i].mass;
/* Reduce the total mass */
#ifdef WITH_MPI
MPI_Allreduce(MPI_IN_PLACE, &e->total_mass, 1, MPI_DOUBLE, MPI_SUM,
MPI_COMM_WORLD);
#endif
#if defined(WITH_CSDS)
if ((e->policy & engine_policy_csds) && e->nodeID == 0)
message(
"WARNING: There is currently no way of predicting the output "
"size, please use the CSDS carefully");
#endif
/* Find the time of the first snapshot output */
engine_compute_next_snapshot_time(e, restart);
/* Find the time of the first statistics output */
engine_compute_next_statistics_time(e);
/* Find the time of the first line of sight output
* and verify the outputs */
if (e->policy & engine_policy_line_of_sight) {
engine_compute_next_los_time(e);
los_io_output_check(e);
}
/* Find the time of the first stf output */
if (e->policy & engine_policy_structure_finding) {
engine_compute_next_stf_time(e);
}
/* Find the time of the first stf output */
if (e->policy & engine_policy_fof &&
e->fof_properties->seed_black_holes_enabled) {
engine_compute_next_fof_time(e);
}
/* Check that we are invoking VELOCIraptor only if we have it */
if (e->snapshot_invoke_stf &&
!(e->policy & engine_policy_structure_finding)) {
error(
"Invoking VELOCIraptor after snapshots but structure finding wasn't "
"activated at runtime (Use --velociraptor).");
}
/* Whether restarts are enabled. Yes by default. Can be changed on restart.
*/
e->restart_dump = parser_get_opt_param_int(params, "Restarts:enable", 1);
/* Whether to save backup copies of the previous restart files. */
e->restart_save = parser_get_opt_param_int(params, "Restarts:save", 1);
/* Whether restarts should be dumped on exit. Not by default. Can be changed
* on restart. */
e->restart_onexit = parser_get_opt_param_int(params, "Restarts:onexit", 0);
/* Read the number of Lustre OSTs to distribute the restart files over */
e->restart_lustre_OST_count =
parser_get_opt_param_int(params, "Restarts:lustre_OST_count", 0);
/* Hours between restart dumps. Can be changed on restart. */
float dhours =
parser_get_opt_param_float(params, "Restarts:delta_hours", 5.0f);
if (e->nodeID == 0) {
if (e->restart_dump)
message("Restarts will be dumped every %f hours", dhours);
else
message("WARNING: restarts will not be dumped");
if (e->verbose && e->restart_onexit)
message("Restarts will be dumped after the final step");
}
/* Internally we use ticks, so convert into a delta ticks. Assumes we can
* convert from ticks into milliseconds. */
e->restart_dt = clocks_to_ticks(dhours * 60.0 * 60.0 * 1000.0);
/* The first dump will happen no sooner than restart_dt ticks in the
* future. */
e->restart_next = getticks() + e->restart_dt;
}
/* Construct types for MPI communications */
#ifdef WITH_MPI
part_create_mpi_types();
multipole_create_mpi_types();
stats_create_mpi_type();
proxy_create_mpi_type();
task_create_mpi_comms();
#ifdef WITH_FOF
fof_create_mpi_types();
#endif /* WITH_FOF */
#endif /* WITH_MPI */
if (!fof) {
/* Initialise the collection group. */
collectgroup_init();
}
/* Initialize the threadpool. */
threadpool_init(&e->threadpool, nr_pool_threads);
if (e->nodeID == 0)
message("Using %d threads in the thread-pool", nr_pool_threads);
/* Cells per thread buffer. */
e->s->cells_sub =
(struct cell **)calloc(nr_pool_threads + 1, sizeof(struct cell *));
e->s->multipoles_sub = (struct gravity_tensors **)calloc(
nr_pool_threads + 1, sizeof(struct gravity_tensors *));
/* First of all, init the barrier and lock it. */
if (swift_barrier_init(&e->wait_barrier, NULL, e->nr_threads + 1) != 0 ||
swift_barrier_init(&e->run_barrier, NULL, e->nr_threads + 1) != 0)
error("Failed to initialize barrier.");
/* Expected average for tasks per cell. If set to zero we use a heuristic
* guess based on the numbers of cells and how many tasks per cell we expect.
* On restart this number cannot be estimated (no cells yet), so we recover
* from the end of the dumped run. Can be changed on restart. */
e->tasks_per_cell =
parser_get_opt_param_float(params, "Scheduler:tasks_per_cell", 0.0);
e->tasks_per_cell_max = 0.0f;
float maxtasks = 0;
if (restart)
maxtasks = e->restart_max_tasks;
else
maxtasks = engine_estimate_nr_tasks(e);
/* Estimated number of links per tasks */
e->links_per_tasks =
parser_get_opt_param_float(params, "Scheduler:links_per_tasks", 25.);
/* Init the scheduler. */
scheduler_init(&e->sched, e->s, maxtasks, nr_queues,
(e->policy & scheduler_flag_steal), e->nodeID, &e->threadpool);
/* Maximum size of MPI task messages, in KB, that should not be buffered,
* that is sent using MPI_Issend, not MPI_Isend. 4Mb by default. Can be
* changed on restart.
*/
e->sched.mpi_message_limit =
parser_get_opt_param_int(params, "Scheduler:mpi_message_limit", 4) * 1024;
if (restart) {
/* Overwrite the constants for the scheduler */
space_maxsize = parser_get_opt_param_int(params, "Scheduler:cell_max_size",
space_maxsize);
space_subsize_pair_hydro = parser_get_opt_param_int(
params, "Scheduler:cell_sub_size_pair_hydro", space_subsize_pair_hydro);
space_subsize_self_hydro = parser_get_opt_param_int(
params, "Scheduler:cell_sub_size_self_hydro", space_subsize_self_hydro);
space_subsize_pair_stars = parser_get_opt_param_int(
params, "Scheduler:cell_sub_size_pair_stars", space_subsize_pair_stars);
space_subsize_self_stars = parser_get_opt_param_int(
params, "Scheduler:cell_sub_size_self_stars", space_subsize_self_stars);
space_subsize_pair_grav = parser_get_opt_param_int(
params, "Scheduler:cell_sub_size_pair_grav", space_subsize_pair_grav);
space_subsize_self_grav = parser_get_opt_param_int(
params, "Scheduler:cell_sub_size_self_grav", space_subsize_self_grav);
space_splitsize = parser_get_opt_param_int(
params, "Scheduler:cell_split_size", space_splitsize);
space_subdepth_diff_grav =
parser_get_opt_param_int(params, "Scheduler:cell_subdepth_diff_grav",
space_subdepth_diff_grav_default);
space_extra_parts = parser_get_opt_param_int(
params, "Scheduler:cell_extra_parts", space_extra_parts_default);
space_extra_sparts = parser_get_opt_param_int(
params, "Scheduler:cell_extra_sparts", space_extra_sparts_default);
space_extra_gparts = parser_get_opt_param_int(
params, "Scheduler:cell_extra_gparts", space_extra_gparts_default);
space_extra_bparts = parser_get_opt_param_int(
params, "Scheduler:cell_extra_bparts", space_extra_bparts_default);
space_extra_sinks = parser_get_opt_param_int(
params, "Scheduler:cell_extra_sinks", space_extra_sinks_default);
/* Do we want any spare particles for on the fly creation?
This condition should be the same than in space.c */
if (!(e->policy & engine_policy_star_formation ||
e->policy & engine_policy_sinks) ||
!swift_star_formation_model_creates_stars) {
space_extra_sparts = 0;
space_extra_gparts = 0;
space_extra_sinks = 0;
}
engine_max_parts_per_ghost =
parser_get_opt_param_int(params, "Scheduler:engine_max_parts_per_ghost",
engine_max_parts_per_ghost);
engine_max_sparts_per_ghost = parser_get_opt_param_int(
params, "Scheduler:engine_max_sparts_per_ghost",
engine_max_sparts_per_ghost);
engine_max_parts_per_cooling = parser_get_opt_param_int(
params, "Scheduler:engine_max_parts_per_cooling",
engine_max_parts_per_cooling);
}
/* Allocate and init the threads. */
if (swift_memalign("runners", (void **)&e->runners, SWIFT_CACHE_ALIGNMENT,
e->nr_threads * sizeof(struct runner)) != 0)
error("Failed to allocate threads array.");
for (int k = 0; k < e->nr_threads; k++) {
e->runners[k].id = k;
e->runners[k].e = e;
if (pthread_create(&e->runners[k].thread, NULL, &runner_main,
&e->runners[k]) != 0)
error("Failed to create runner thread.");
/* Try to pin the runner to a given core */
if (with_aff &&
(e->policy & engine_policy_setaffinity) == engine_policy_setaffinity) {
#if defined(HAVE_SETAFFINITY)
/* Set a reasonable queue ID. */
int coreid = k % nr_affinity_cores;
e->runners[k].cpuid = cpuid[coreid];
if (nr_queues < e->nr_threads)
e->runners[k].qid = cpuid[coreid] * nr_queues / nr_affinity_cores;
else
e->runners[k].qid = k;
/* Set the cpu mask to zero | e->id. */
CPU_ZERO(&cpuset);
CPU_SET(cpuid[coreid], &cpuset);
/* Apply this mask to the runner's pthread. */
if (pthread_setaffinity_np(e->runners[k].thread, sizeof(cpu_set_t),
&cpuset) != 0)
error("Failed to set thread affinity.");
#else
error("SWIFT was not compiled with affinity enabled.");
#endif
} else {
e->runners[k].cpuid = k;
e->runners[k].qid = k * nr_queues / e->nr_threads;
}
/* Allocate particle caches. */
e->runners[k].ci_gravity_cache.count = 0;
e->runners[k].cj_gravity_cache.count = 0;
gravity_cache_init(&e->runners[k].ci_gravity_cache, space_splitsize);
gravity_cache_init(&e->runners[k].cj_gravity_cache, space_splitsize);
#ifdef WITH_VECTORIZATION
e->runners[k].ci_cache.count = 0;
e->runners[k].cj_cache.count = 0;
cache_init(&e->runners[k].ci_cache, CACHE_SIZE);
cache_init(&e->runners[k].cj_cache, CACHE_SIZE);
#endif
if (verbose) {
if (with_aff)
message("runner %i on cpuid=%i with qid=%i.", e->runners[k].id,
e->runners[k].cpuid, e->runners[k].qid);
else
message("runner %i using qid=%i no cpuid.", e->runners[k].id,
e->runners[k].qid);
}
}
#ifdef WITH_CSDS
if ((e->policy & engine_policy_csds) && !restart) {
/* Write the particle csds header */
csds_write_file_header(e->csds);
}
#endif
/* Initialise the structure finder */
#ifdef HAVE_VELOCIRAPTOR
if (e->policy & engine_policy_structure_finding) velociraptor_init(e);
#endif
/* Free the affinity stuff */
#if defined(HAVE_SETAFFINITY)
if (with_aff) {
free(cpuid);
}
#endif
#ifdef SWIFT_DUMPER_THREAD
/* Start the dumper thread.*/
engine_dumper_init(e);
#endif
/* Wait for the runner threads to be in place. */
swift_barrier_wait(&e->wait_barrier);
if (e->nodeID == 0) {
clocks_gettime(&toc);
message("took %.3f %s.", clocks_diff(&tic, &toc), clocks_getunit());
fflush(stdout);
}
}