diff --git a/src/engine.c b/src/engine.c index 7560771b2ffb4e05562a61b505b64a2a58cdc889..7967bdf01aa726655326998eb020a0ed0492ba31 100644 --- a/src/engine.c +++ b/src/engine.c @@ -52,6 +52,7 @@ #include "atomic.h" #include "cell.h" #include "clocks.h" +#include "cooling.h" #include "cycle.h" #include "debug.h" #include "error.h" @@ -64,10 +65,12 @@ #include "partition.h" #include "profiler.h" #include "proxy.h" +#include "restart.h" #include "runner.h" #include "serial_io.h" #include "single_io.h" #include "sort_part.h" +#include "sourceterms.h" #include "statistics.h" #include "timers.h" #include "tools.h" @@ -3726,10 +3729,11 @@ void engine_prepare(struct engine *e) { #ifdef SWIFT_DEBUG_CHECKS if (e->forcerepart || e->forcerebuild) { /* Check that all cells have been drifted to the current time. - * That can include cells that have not - * previously been active on this rank. */ - space_check_drift_point(e->s, e->ti_old, - e->policy & engine_policy_self_gravity); + * That can include cells that have not previously been active on this + * rank. Skip if haven't got any cells (yet). */ + if (e->s->cells_top != NULL) + space_check_drift_point(e->s, e->ti_old, + e->policy & engine_policy_self_gravity); } #endif @@ -3742,7 +3746,7 @@ void engine_prepare(struct engine *e) { /* Unskip active tasks and check for rebuild */ engine_unskip(e); - /* Re-rank the tasks every now and then. */ + /* Re-rank the tasks every now and then. XXX this never executes. */ if (e->tasks_age % engine_tasksreweight == 1) { scheduler_reweight(&e->sched, e->verbose); } @@ -4143,8 +4147,7 @@ void engine_first_init_particles(struct engine *e) { const ticks tic = getticks(); /* Set the particles in a state where they are ready for a run */ - space_first_init_parts(e->s, e->chemistry); - space_first_init_xparts(e->s, e->cooling_func); + space_first_init_parts(e->s, e->chemistry, e->cooling_func); space_first_init_gparts(e->s, e->gravity_properties); space_first_init_sparts(e->s); @@ -4283,7 +4286,7 @@ void engine_init_particles(struct engine *e, int flag_entropy_ICs, prev_id = &s->parts[k].id; } if (failed > 0) - message( + error( "Have %d particle pairs with the same locations.\n" "Cannot continue", failed); @@ -4441,7 +4444,7 @@ void engine_step(struct engine *e) { gravity_exact_force_check(e->s, e, 1e-1); #endif - /* Let's trigger a rebuild every-so-often for good measure */ + /* Let's trigger a non-SPH rebuild every-so-often for good measure */ if (!(e->policy & engine_policy_hydro) && // MATTHIEU improve this (e->policy & engine_policy_self_gravity) && e->step % 20 == 0) e->forcerebuild = 1; @@ -4454,9 +4457,8 @@ void engine_step(struct engine *e) { e->forcerebuild = e->collect_group1.forcerebuild; /* Save some statistics ? */ - if (e->time - e->timeLastStatistics >= e->deltaTimeStatistics) { + if (e->time - e->timeLastStatistics >= e->deltaTimeStatistics) e->save_stats = 1; - } /* Do we want a snapshot? */ if (e->ti_end_min >= e->ti_nextSnapshot && e->ti_nextSnapshot > 0) @@ -4464,8 +4466,9 @@ void engine_step(struct engine *e) { /* Drift everybody (i.e. what has not yet been drifted) */ /* to the current time */ - if (e->dump_snapshot || e->forcerebuild || e->forcerepart || e->save_stats) - engine_drift_all(e); + int drifted_all = + (e->dump_snapshot || e->forcerebuild || e->forcerepart || e->save_stats); + if (drifted_all) engine_drift_all(e); /* Write a snapshot ? */ if (e->dump_snapshot) { @@ -4505,6 +4508,48 @@ void engine_step(struct engine *e) { /* Time in ticks at the end of this step. */ e->toc_step = getticks(); #endif + + /* Final job is to create a restart file if needed. */ + engine_dump_restarts(e, drifted_all, e->restart_onexit && engine_is_done(e)); +} + +/** + * @brief dump restart files if it is time to do so and dumps are enabled. + * + * @param e the engine. + * @param drifted_all true if a drift_all has just been performed. + * @param force force a dump, if dumping is enabled. + */ +void engine_dump_restarts(struct engine *e, int drifted_all, int force) { + + if (e->restart_dump) { + ticks tic = getticks(); + + /* Dump when the time has arrived, or we are told to. */ + int dump = ((tic > e->restart_next) || force); + +#ifdef WITH_MPI + /* Synchronize this action from rank 0 (ticks may differ between + * machines). */ + MPI_Bcast(&dump, 1, MPI_INT, 0, MPI_COMM_WORLD); +#endif + if (dump) { + + /* Drift all particles first (may have just been done). */ + if (!drifted_all) engine_drift_all(e); + restart_write(e, e->restart_file); + + if (e->verbose) + message("Dumping restart files took %.3f %s", + clocks_from_ticks(getticks() - tic), clocks_getunit()); + + /* Time after which next dump will occur. */ + e->restart_next += e->restart_dt; + + /* Flag that we dumped the restarts */ + e->step_props |= engine_step_prop_restarts; + } + } } /** @@ -5038,7 +5083,7 @@ void engine_dump_snapshot(struct engine *e) { e->dump_snapshot = 0; clocks_gettime(&time2); - //if (e->verbose) + if (e->verbose) message("writing particle properties took %.3f %s.", (float)clocks_diff(&time1, &time2), clocks_getunit()); } @@ -5099,18 +5144,17 @@ void engine_unpin() { } /** - * @brief init an engine with the given number of threads, queues, and - * the given policy. + * @brief init an engine struct with the necessary properties for the + * simulation. + * + * Note do not use when restarting. Engine initialisation + * is completed by a call to engine_config(). * * @param e The #engine. * @param s The #space in which this #runner will run. * @param params The parsed parameter file. - * @param nr_nodes The number of MPI ranks. - * @param nodeID The MPI rank of this node. - * @param nr_threads The number of threads per MPI rank. * @param Ngas total number of gas particles in the simulation. * @param Ndm total number of gravity particles in the simulation. - * @param with_aff use processor affinity, if supported. * @param policy The queuing policy to use. * @param verbose Is this #engine talkative ? * @param reparttype What type of repartition algorithm are we using ? @@ -5123,41 +5167,28 @@ void engine_unpin() { * @param chemistry The chemistry information. * @param sourceterms The properties of the source terms function. */ -void engine_init(struct engine *e, struct space *s, - const struct swift_params *params, int nr_nodes, int nodeID, - int nr_threads, long long Ngas, long long Ndm, int with_aff, - int policy, int verbose, struct repartition *reparttype, - const struct unit_system *internal_units, - const struct phys_const *physical_constants, - const struct hydro_props *hydro, - const struct gravity_props *gravity, - const struct external_potential *potential, - const struct cooling_function_data *cooling_func, - const struct chemistry_data *chemistry, - struct sourceterms *sourceterms) { +void engine_init( + struct engine *e, struct space *s, const struct swift_params *params, + long long Ngas, long long Ndm, int policy, int verbose, + struct repartition *reparttype, const struct unit_system *internal_units, + const struct phys_const *physical_constants, + const struct hydro_props *hydro, const struct gravity_props *gravity, + const struct external_potential *potential, + const struct cooling_function_data *cooling_func, + const struct chemistry_data *chemistry, struct sourceterms *sourceterms) { /* Clean-up everything */ bzero(e, sizeof(struct engine)); - /* Store the values. */ + /* Store the all values in the fields of the engine. */ e->s = s; - e->nr_threads = nr_threads; e->policy = policy; e->step = 0; - e->nr_nodes = nr_nodes; - e->nodeID = nodeID; e->total_nr_parts = Ngas; e->total_nr_gparts = Ndm; e->proxy_ind = NULL; e->nr_proxies = 0; - e->forcerebuild = 1; - e->forcerepart = 0; e->reparttype = reparttype; - e->dump_snapshot = 0; - e->save_stats = 0; - e->step_props = engine_step_prop_none; - e->links = NULL; - e->nr_links = 0; e->timeBegin = parser_get_param_double(params, "TimeIntegration:time_begin"); e->timeEnd = parser_get_param_double(params, "TimeIntegration:time_end"); e->timeOld = e->timeBegin; @@ -5179,10 +5210,9 @@ void engine_init(struct engine *e, struct space *s, parser_get_opt_param_int(params, "Snapshots:compression", 0); e->snapshotUnits = malloc(sizeof(struct unit_system)); units_init_default(e->snapshotUnits, params, "Snapshots", internal_units); + e->snapshotOutputCount = 0; e->dt_min = parser_get_param_double(params, "TimeIntegration:dt_min"); e->dt_max = parser_get_param_double(params, "TimeIntegration:dt_max"); - e->file_stats = NULL; - e->file_timesteps = NULL; e->deltaTimeStatistics = parser_get_param_double(params, "Statistics:delta_time"); e->timeLastStatistics = 0; @@ -5201,18 +5231,74 @@ void engine_init(struct engine *e, struct space *s, e->cputime_last_step = 0; e->last_repartition = 0; #endif - engine_rank = nodeID; /* Make the space link back to the engine. */ s->e = e; + /* Setup the timestep */ + e->timeBase = (e->timeEnd - e->timeBegin) / max_nr_timesteps; + e->timeBase_inv = 1.0 / e->timeBase; + e->ti_current = 0; +} + +/** + * @brief configure an engine with the given number of threads, queues + * and core affinity. Also initialises the scheduler and opens various + * output files, computes the next timestep and initialises the + * threadpool. + * + * Assumes the engine is correctly initialised i.e. is restored from a restart + * file or has been setup by engine_init(). When restarting any output log + * files are positioned so that further output is appended. Note that + * parameters are not read from the engine, just the parameter file, this + * allows values derived in this function to be changed between runs. + * When not restarting params should be the same as given to engine_init(). + * + * @param restart true when restarting the application. + * @param e The #engine. + * @param params The parsed parameter file. + * @param nr_nodes The number of MPI ranks. + * @param nodeID The MPI rank of this node. + * @param nr_threads The number of threads per MPI rank. + * @param with_aff use processor affinity, if supported. + * @param verbose Is this #engine talkative ? + * @param restart_file The name of our restart file. + */ +void engine_config(int restart, struct engine *e, + const struct swift_params *params, int nr_nodes, int nodeID, + int nr_threads, int with_aff, int verbose, + const char *restart_file) { + + /* Store the values and initialise global fields. */ + e->nodeID = nodeID; + e->nr_threads = nr_threads; + e->nr_nodes = nr_nodes; + e->proxy_ind = NULL; + e->nr_proxies = 0; + e->forcerebuild = 1; + e->forcerepart = 0; + e->dump_snapshot = 0; + e->save_stats = 0; + e->step_props = engine_step_prop_none; + e->links = NULL; + e->nr_links = 0; + e->file_stats = NULL; + e->file_timesteps = NULL; + e->verbose = verbose; + e->wallclock_time = 0.f; + e->restart_dump = 0; + e->restart_file = restart_file; + e->restart_next = 0; + e->restart_dt = 0; + engine_rank = nodeID; + /* Get the number of queues */ int nr_queues = parser_get_opt_param_int(params, "Scheduler:nr_queues", nr_threads); if (nr_queues <= 0) nr_queues = e->nr_threads; if (nr_queues != nr_threads) message("Number of task queues set to %d", nr_queues); - s->nr_queues = nr_queues; + e->s->nr_queues = nr_queues; /* Deal with affinity. For now, just figure out the number of cores. */ #if defined(HAVE_SETAFFINITY) @@ -5250,7 +5336,7 @@ void engine_init(struct engine *e, struct space *s, } #if defined(HAVE_LIBNUMA) && defined(_GNU_SOURCE) - if ((policy & engine_policy_cputight) != engine_policy_cputight) { + if ((e->policy & engine_policy_cputight) != engine_policy_cputight) { if (numa_available() >= 0) { if (nodeID == 0) message("prefer NUMA-distant CPUs"); @@ -5346,19 +5432,31 @@ void engine_init(struct engine *e, struct space *s, /* Open some files */ if (e->nodeID == 0) { + + /* When restarting append to these files. */ + char *mode; + if (restart) + mode = "a"; + else + mode = "w"; + char energyfileName[200] = ""; parser_get_opt_param_string(params, "Statistics:energy_file_name", energyfileName, engine_default_energy_file_name); sprintf(energyfileName + strlen(energyfileName), ".txt"); - e->file_stats = fopen(energyfileName, "w"); - fprintf(e->file_stats, - "#%14s %14s %14s %14s %14s %14s %14s %14s %14s %14s %14s %14s %14s " - "%14s %14s %14s %14s %14s %14s\n", - "Time", "Mass", "E_tot", "E_kin", "E_int", "E_pot", "E_pot_self", - "E_pot_ext", "E_radcool", "Entropy", "p_x", "p_y", "p_z", "ang_x", - "ang_y", "ang_z", "com_x", "com_y", "com_z"); - fflush(e->file_stats); + e->file_stats = fopen(energyfileName, mode); + + if (!restart) { + fprintf( + e->file_stats, + "#%14s %14s %14s %14s %14s %14s %14s %14s %14s %14s %14s %14s %14s " + "%14s %14s %14s %14s %14s %14s\n", + "Time", "Mass", "E_tot", "E_kin", "E_int", "E_pot", "E_pot_self", + "E_pot_ext", "E_radcool", "Entropy", "p_x", "p_y", "p_z", "ang_x", + "ang_y", "ang_z", "com_x", "com_y", "com_z"); + fflush(e->file_stats); + } char timestepsfileName[200] = ""; parser_get_opt_param_string(params, "Statistics:timestep_file_name", @@ -5367,30 +5465,35 @@ void engine_init(struct engine *e, struct space *s, sprintf(timestepsfileName + strlen(timestepsfileName), "_%d.txt", nr_nodes * nr_threads); - e->file_timesteps = fopen(timestepsfileName, "w"); - fprintf(e->file_timesteps, - "# Host: %s\n# Branch: %s\n# Revision: %s\n# Compiler: %s, " - "Version: %s \n# " - "Number of threads: %d\n# Number of MPI ranks: %d\n# Hydrodynamic " - "scheme: %s\n# Hydrodynamic kernel: %s\n# No. of neighbours: %.2f " - "+/- %.4f\n# Eta: %f\n", - hostname(), git_branch(), git_revision(), compiler_name(), - compiler_version(), e->nr_threads, e->nr_nodes, SPH_IMPLEMENTATION, - kernel_name, e->hydro_properties->target_neighbours, - e->hydro_properties->delta_neighbours, - e->hydro_properties->eta_neighbours); - - fprintf(e->file_timesteps, - "# Step Properties: Rebuild=%d, Redistribute=%d, Repartition=%d, " - "Statistics=%d, Snapshot=%d\n", - engine_step_prop_rebuild, engine_step_prop_redistribute, - engine_step_prop_repartition, engine_step_prop_statistics, - engine_step_prop_snapshot); - - fprintf(e->file_timesteps, "# %6s %14s %14s %12s %12s %12s %16s [%s] %6s\n", - "Step", "Time", "Time-step", "Updates", "g-Updates", "s-Updates", - "Wall-clock time", clocks_getunit(), "Props"); - fflush(e->file_timesteps); + e->file_timesteps = fopen(timestepsfileName, mode); + + if (!restart) { + fprintf( + e->file_timesteps, + "# Host: %s\n# Branch: %s\n# Revision: %s\n# Compiler: %s, " + "Version: %s \n# " + "Number of threads: %d\n# Number of MPI ranks: %d\n# Hydrodynamic " + "scheme: %s\n# Hydrodynamic kernel: %s\n# No. of neighbours: %.2f " + "+/- %.4f\n# Eta: %f\n", + hostname(), git_branch(), git_revision(), compiler_name(), + compiler_version(), e->nr_threads, e->nr_nodes, SPH_IMPLEMENTATION, + kernel_name, e->hydro_properties->target_neighbours, + e->hydro_properties->delta_neighbours, + e->hydro_properties->eta_neighbours); + + fprintf(e->file_timesteps, + "# Step Properties: Rebuild=%d, Redistribute=%d, Repartition=%d, " + "Statistics=%d, Snapshot=%d, Restarts=%d\n", + engine_step_prop_rebuild, engine_step_prop_redistribute, + engine_step_prop_repartition, engine_step_prop_statistics, + engine_step_prop_snapshot, engine_step_prop_restarts); + + fprintf(e->file_timesteps, + "# %6s %14s %14s %12s %12s %12s %16s [%s] %6s\n", "Step", "Time", + "Time-step", "Updates", "g-Updates", "s-Updates", + "Wall-clock time", clocks_getunit(), "Props"); + fflush(e->file_timesteps); + } } /* Print policy */ @@ -5419,9 +5522,11 @@ void engine_init(struct engine *e, struct space *s, e->dt_min, e->dt_max); /* Deal with timestep */ - e->timeBase = (e->timeEnd - e->timeBegin) / max_nr_timesteps; - e->timeBase_inv = 1.0 / e->timeBase; - e->ti_current = 0; + if (!restart) { + e->timeBase = (e->timeEnd - e->timeBegin) / max_nr_timesteps; + e->timeBase_inv = 1.0 / e->timeBase; + e->ti_current = 0; + } /* Info about time-steps */ if (e->nodeID == 0) { @@ -5461,6 +5566,34 @@ void engine_init(struct engine *e, struct space *s, /* Find the time of the first output */ engine_compute_next_snapshot_time(e); + /* Whether restarts are enabled. Yes by default. Can be changed on restart. */ + e->restart_dump = parser_get_opt_param_int(params, "Restarts:enable", 1); + + /* Whether restarts should be dumped on exit. Not by default. Can be changed + * on restart. */ + e->restart_onexit = parser_get_opt_param_int(params, "Restarts:onexit", 0); + + /* Hours between restart dumps. Can be changed on restart. */ + float dhours = + parser_get_opt_param_float(params, "Restarts:delta_hours", 6.0); + if (e->nodeID == 0) { + if (e->restart_dump) + message("Restarts will be dumped every %f hours", dhours); + else + message("WARNING: restarts will not be dumped"); + + if (e->verbose && e->restart_onexit) + message("Restarts will be dumped after the final step"); + } + + /* Internally we use ticks, so convert into a delta ticks. Assumes we can + * convert from ticks into milliseconds. */ + e->restart_dt = clocks_to_ticks(dhours * 60.0 * 60.0 * 1000.0); + + /* The first dump will happen no sooner than restart_dt ticks in the + * future. */ + e->restart_next = getticks() + e->restart_dt; + /* Construct types for MPI communications */ #ifdef WITH_MPI part_create_mpi_types(); @@ -5480,16 +5613,24 @@ void engine_init(struct engine *e, struct space *s, /* Expected average for tasks per cell. If set to zero we use a heuristic * guess based on the numbers of cells and how many tasks per cell we expect. + * On restart this number cannot be estimated (no cells yet), so we recover + * from the end of the dumped run. Can be changed on restart. */ e->tasks_per_cell = parser_get_opt_param_int(params, "Scheduler:tasks_per_cell", 0); + int maxtasks = 0; + if (restart) + maxtasks = e->restart_max_tasks; + else + maxtasks = engine_estimate_nr_tasks(e); /* Init the scheduler. */ - scheduler_init(&e->sched, e->s, engine_estimate_nr_tasks(e), nr_queues, - (policy & scheduler_flag_steal), e->nodeID, &e->threadpool); + scheduler_init(&e->sched, e->s, maxtasks, nr_queues, + (e->policy & scheduler_flag_steal), e->nodeID, &e->threadpool); /* Maximum size of MPI task messages, in KB, that should not be buffered, - * that is sent using MPI_Issend, not MPI_Isend. 4Mb by default. + * that is sent using MPI_Issend, not MPI_Isend. 4Mb by default. Can be + * changed on restart. */ e->sched.mpi_message_limit = parser_get_opt_param_int(params, "Scheduler:mpi_message_limit", 4) * 1024; @@ -5646,3 +5787,113 @@ void engine_clean(struct engine *e) { space_clean(e->s); threadpool_clean(&e->threadpool); } + +/** + * @brief Write the engine struct and its contents to the given FILE as a + * stream of bytes. + * + * @param e the engine + * @param stream the file stream + */ +void engine_struct_dump(struct engine *e, FILE *stream) { + + /* Dump the engine. Save the current tasks_per_cell estimate. */ + e->restart_max_tasks = engine_estimate_nr_tasks(e); + restart_write_blocks(e, sizeof(struct engine), 1, stream, "engine", + "engine struct"); + + /* And all the engine pointed data, these use their own dump functions. */ + space_struct_dump(e->s, stream); + units_struct_dump(e->internal_units, stream); + units_struct_dump(e->snapshotUnits, stream); + +#ifdef WITH_MPI + /* Save the partition for restoration. */ + partition_store_celllist(e->s, e->reparttype); + partition_struct_dump(e->reparttype, stream); +#endif + + phys_const_struct_dump(e->physical_constants, stream); + hydro_props_struct_dump(e->hydro_properties, stream); + gravity_props_struct_dump(e->gravity_properties, stream); + potential_struct_dump(e->external_potential, stream); + cooling_struct_dump(e->cooling_func, stream); + sourceterms_struct_dump(e->sourceterms, stream); + parser_struct_dump(e->parameter_file, stream); +} + +/** + * @brief Re-create an engine struct and its contents from the given FILE + * stream. + * + * @param e the engine + * @param stream the file stream + */ +void engine_struct_restore(struct engine *e, FILE *stream) { + + /* Read the engine. */ + restart_read_blocks(e, sizeof(struct engine), 1, stream, NULL, + "engine struct"); + + /* Re-initializations as necessary for our struct and its members. */ + e->sched.tasks = NULL; + e->sched.tasks_ind = NULL; + e->sched.tid_active = NULL; + e->sched.size = 0; + + /* Now for the other pointers, these use their own restore functions. */ + /* Note all this memory leaks, but is used once. */ + struct space *s = malloc(sizeof(struct space)); + space_struct_restore(s, stream); + e->s = s; + s->e = e; + + struct unit_system *us = malloc(sizeof(struct unit_system)); + units_struct_restore(us, stream); + e->internal_units = us; + + us = malloc(sizeof(struct unit_system)); + units_struct_restore(us, stream); + e->snapshotUnits = us; + +#ifdef WITH_MPI + struct repartition *reparttype = malloc(sizeof(struct repartition)); + partition_struct_restore(reparttype, stream); + e->reparttype = reparttype; +#endif + + struct phys_const *physical_constants = malloc(sizeof(struct phys_const)); + phys_const_struct_restore(physical_constants, stream); + e->physical_constants = physical_constants; + + struct hydro_props *hydro_properties = malloc(sizeof(struct hydro_props)); + hydro_props_struct_restore(hydro_properties, stream); + e->hydro_properties = hydro_properties; + + struct gravity_props *gravity_properties = + malloc(sizeof(struct gravity_props)); + gravity_props_struct_restore(gravity_properties, stream); + e->gravity_properties = gravity_properties; + + struct external_potential *external_potential = + malloc(sizeof(struct external_potential)); + potential_struct_restore(external_potential, stream); + e->external_potential = external_potential; + + struct cooling_function_data *cooling_func = + malloc(sizeof(struct cooling_function_data)); + cooling_struct_restore(cooling_func, stream); + e->cooling_func = cooling_func; + + struct sourceterms *sourceterms = malloc(sizeof(struct sourceterms)); + sourceterms_struct_restore(sourceterms, stream); + e->sourceterms = sourceterms; + + struct swift_params *parameter_file = malloc(sizeof(struct swift_params)); + parser_struct_restore(parameter_file, stream); + e->parameter_file = parameter_file; + + /* Want to force a rebuild before using this engine. Wait to repartition.*/ + e->forcerebuild = 1; + e->forcerepart = 0; +} diff --git a/src/parallel_io.c b/src/parallel_io.c index 057f0220b26b049fff7138f6181b8f53ff5bc711..90aeddd6af7e3a65a3e50d5cbc8c12e9c0c93fa0 100644 --- a/src/parallel_io.c +++ b/src/parallel_io.c @@ -253,9 +253,8 @@ void readArray(hid_t grp, struct io_props props, size_t N, long long N_total, * @param snapshot_units The #unit_system used in the snapshots. */ void prepareArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile, - char* partTypeGroupName, struct io_props props, - long long N_total, - const struct unit_system* snapshot_units) { + char* partTypeGroupName, struct io_props props, + long long N_total, const struct unit_system* snapshot_units) { /* Create data space */ const hid_t h_space = H5Screate(H5S_SIMPLE); @@ -282,7 +281,6 @@ void prepareArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile, /* Make sure the chunks are not larger than the dataset */ if (chunk_shape[0] > N_total) chunk_shape[0] = N_total; - /* Change shape of data space */ hid_t h_err = H5Sset_extent_simple(h_space, rank, shape, NULL); if (h_err < 0) @@ -300,10 +298,10 @@ void prepareArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile, /* } */ /* Create dataset */ - const hid_t h_data = H5Dcreate(grp, props.name, io_hdf5_type(props.type), - h_space, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); - if (h_data < 0) - error("Error while creating dataspace '%s'.", props.name); + const hid_t h_data = + H5Dcreate(grp, props.name, io_hdf5_type(props.type), h_space, H5P_DEFAULT, + H5P_DEFAULT, H5P_DEFAULT); + if (h_data < 0) error("Error while creating dataspace '%s'.", props.name); /* Write unit conversion factors for this data set */ char buffer[FIELD_BUFFER_SIZE]; @@ -319,7 +317,7 @@ void prepareArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile, /* Add a line to the XMF */ xmf_write_line(xmfFile, fileName, partTypeGroupName, props.name, N_total, - props.dimension, props.type); + props.dimension, props.type); /* Close everything */ H5Pclose(h_plist_id); @@ -327,7 +325,6 @@ void prepareArray(struct engine* e, hid_t grp, char* fileName, FILE* xmfFile, H5Sclose(h_space); } - void writeArray_chunk(struct engine* e, hid_t h_data, hid_t h_plist_id, const struct io_props props, size_t N, long long offset, const struct unit_system* internal_units, @@ -437,7 +434,6 @@ void writeArray_chunk(struct engine* e, hid_t h_data, hid_t h_plist_id, H5Sclose(h_filespace); } - /** * @brief Writes a data array in given HDF5 group. * @@ -469,8 +465,7 @@ void writeArray(struct engine* e, hid_t grp, char* fileName, /* Open dataset */ const hid_t h_data = H5Dopen(grp, props.name, H5P_DEFAULT); - if (h_data < 0) - error("Error while opening dataset '%s'.", props.name); + if (h_data < 0) error("Error while opening dataset '%s'.", props.name); /* Given the limitations of ROM-IO we will need to write the data in chunk of HDF5_PARALLEL_IO_MAX_BYTES bytes per node until all the nodes are done. */ @@ -509,7 +504,7 @@ void writeArray(struct engine* e, hid_t grp, char* fileName, /* Close everything */ H5Dclose(h_data); - //H5Pclose(h_plist_id); +// H5Pclose(h_plist_id); #ifdef IO_SPEED_MEASUREMENT MPI_Barrier(MPI_COMM_WORLD); @@ -803,15 +798,15 @@ void read_ic_parallel(char* fileName, const struct unit_system* internal_units, H5Fclose(h_file); } -void prepare_file(struct engine* e, const char* baseName, - int outputCount, long long N_total[6], - const struct unit_system* internal_units, - const struct unit_system* snapshot_units) { +void prepare_file(struct engine* e, const char* baseName, int outputCount, + long long N_total[6], + const struct unit_system* internal_units, + const struct unit_system* snapshot_units) { struct part* parts = e->s->parts; struct gpart* gparts = e->s->gparts; struct spart* sparts = e->s->sparts; - + FILE* xmfFile = 0; int periodic = e->s->periodic; int numFiles = 1; @@ -829,14 +824,12 @@ void prepare_file(struct engine* e, const char* baseName, /* Open HDF5 file with the chosen parameters */ hid_t h_file = H5Fcreate(fileName, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT); - if (h_file < 0) - error("Error while opening file '%s'.", fileName); + if (h_file < 0) error("Error while opening file '%s'.", fileName); /* Write the part of the XMF file corresponding to this * specific output */ xmf_write_outputheader(xmfFile, fileName, e->time); - /* Open header to write simulation properties */ /* message("Writing runtime parameters..."); */ hid_t h_grp = @@ -922,10 +915,10 @@ void prepare_file(struct engine* e, const char* baseName, if (h_grp < 0) error("Error while creating parameters group"); parser_write_params_to_hdf5(e->parameter_file, h_grp); H5Gclose(h_grp); - + /* Print the system of Units used in the spashot */ io_write_unit_system(h_file, snapshot_units, "Units"); - + /* Print the system of Units used internally */ io_write_unit_system(h_file, internal_units, "InternalCodeUnits"); @@ -934,12 +927,12 @@ void prepare_file(struct engine* e, const char* baseName, /* Don't do anything if no particle of this kind */ if (N_total[ptype] == 0) continue; - + /* Add the global information for that particle type to * the XMF meta-file */ xmf_write_groupheader(xmfFile, fileName, N_total[ptype], - (enum part_type)ptype); - + (enum part_type)ptype); + /* Create the particle group in the file */ char partTypeGroupName[PARTICLE_GROUP_BUFFER_SIZE]; snprintf(partTypeGroupName, PARTICLE_GROUP_BUFFER_SIZE, "/PartType%d", @@ -975,7 +968,7 @@ void prepare_file(struct engine* e, const char* baseName, /* Prepare everything */ for (int i = 0; i < num_fields; ++i) prepareArray(e, h_grp, fileName, xmfFile, partTypeGroupName, list[i], - N_total[ptype], snapshot_units); + N_total[ptype], snapshot_units); /* Close particle group */ H5Gclose(h_grp); @@ -1043,16 +1036,17 @@ void write_output_parallel(struct engine* e, const char* baseName, * broadcast from there */ MPI_Bcast(&N_total, 6, MPI_LONG_LONG_INT, mpi_size - 1, comm); - /* Now everybody konws its offset and the total number of - * particles of each type */ +/* Now everybody konws its offset and the total number of + * particles of each type */ #ifdef IO_SPEED_MEASUREMENT ticks tic = getticks(); #endif /* Rank 0 prepares the file */ - if(mpi_rank == 0) - prepare_file(e, baseName, outputCount, N_total, internal_units, snapshot_units); + if (mpi_rank == 0) + prepare_file(e, baseName, outputCount, N_total, internal_units, + snapshot_units); MPI_Barrier(MPI_COMM_WORLD); @@ -1218,9 +1212,9 @@ void write_output_parallel(struct engine* e, const char* baseName, /* Write everything */ for (int i = 0; i < num_fields; ++i) - writeArray(e, h_grp, fileName, partTypeGroupName, list[i], - Nparticles, N_total[ptype], mpi_rank, offset[ptype], - internal_units, snapshot_units); + writeArray(e, h_grp, fileName, partTypeGroupName, list[i], Nparticles, + N_total[ptype], mpi_rank, offset[ptype], internal_units, + snapshot_units); /* Free temporary array */ if (dmparts) {