diff --git a/src/csds_logfile.c b/src/csds_logfile.c index f36939115d2ab635af1e9d4c7de522bba7844ab4..55c596ded5bb3b726a72536c92c9fae686490631 100644 --- a/src/csds_logfile.c +++ b/src/csds_logfile.c @@ -134,7 +134,7 @@ void csds_logfile_check_record_consistency(struct csds_logfile *log) { (current_time - init_time) * (100. - current) / current; /* Print the bar */ - tools_print_progress(current, remaining_time, "Reversing the offsets"); + tools_print_progress(current, remaining_time, "Checking offsets"); /* Compute the next update of the progress bar */ next_percentage += delta_percentage; diff --git a/src/csds_reader_generate_index.c b/src/csds_reader_generate_index.c index 181655d122a890beef477c6949bab029408723e3..72c2ebdfe7678d3b7f6a2b561aa354cb886d922b 100644 --- a/src/csds_reader_generate_index.c +++ b/src/csds_reader_generate_index.c @@ -453,6 +453,127 @@ size_t csds_reader_get_initial_state(const struct csds_reader *reader, return offset_max; } +/** + * @brief Structure for #csds_reader_update_particles_to_next_index_mapper + */ +struct update_particle_data { + + /* The reader */ + const struct csds_reader *reader; + + /* The time offset to reach */ + size_t time_offset; + + /* Current percentage */ + float percentage; + + /* Total number of particles */ + size_t number_particles; + + /* Lock for printing */ + swift_lock_type print_lock; + + /* Time when starting the update. */ + int init_time; +}; + +/** + * @brief Mapper function of #csds_reader_update_particles_to_next_index. + * + * This function loops over all the known particles and updates their offset + * until reaching the required one. + * + * @param map_data The #index_data to process + * @param num_elements The number of elements in map_data. + * @param extra_data The extra parameters (#update_particle_data) + */ +void csds_reader_update_particles_to_next_index_mapper(void *map_data, + int num_elements, + void *extra_data) { + + /* Get a few pointers */ + struct index_data *current_state = (struct index_data *)map_data; + struct update_particle_data *data = (struct update_particle_data *)extra_data; + const struct csds_reader *reader = data->reader; + const struct csds_logfile *log = &reader->log; + const struct header *h = &log->header; + + /* Loop over the particles */ + for (int i = 0; i < num_elements; i++) { + size_t current_offset = current_state[i].offset; + + /* Skip the flagged particles. */ + if (current_offset == PARTICLE_REMOVED) continue; + + /* Get the full mask */ + size_t full_mask = 0; + size_t h_offset = 0; + csds_loader_io_read_mask(h, log->log.map + current_offset, &full_mask, + &h_offset); + + /* Remove the special mask */ + full_mask = full_mask & !SPECIAL_FLAGS_MASK; + + /* Find the last offset before the current time */ + size_t last_full_offset = current_offset; + current_offset += h_offset; + while (1) { + /* Get the mask */ + size_t mask = 0; + h_offset = 0; + csds_loader_io_read_mask(h, log->log.map + current_offset, &mask, + &h_offset); + + /* update the offset */ + current_offset += h_offset; + if (current_offset > data->time_offset) { + break; + } + + /* The particle should not have a special flag + due to the previous loop */ + if (mask & SPECIAL_FLAGS_MASK) { + error("Found a special flag when updating the particles."); + } + + /* Update the last full offset */ + if (full_mask == mask) { + last_full_offset = current_offset; + } + + /* Are we at the end of the file? */ + if (h_offset == 0) { + break; + } + } + + /* Update the offset */ + current_state[i].offset = last_full_offset; + } + + if (reader->verbose) { + /* Update the counter */ + atomic_add_f(&data->percentage, num_elements / (float)data->number_particles); + + /* Update the message */ + if (lock_trylock(&data->print_lock)) { + /* Get the current state */ + float percent = 0; + __atomic_load(&data->percentage, &percent, __ATOMIC_RELAXED); + percent *= 100.f; + + /* Compute the remaining time */ + const int current_time = + clocks_diff_ticks(getticks(), clocks_start_ticks) / 1000.0; + const int remaining_time = + (current_time - data->init_time) * (100. - percent) / percent; + + /* Print the message */ + tools_print_progress(percent, remaining_time, "Updating offsets"); + } + } +} + /** * @brief Update the state of the simulation until the next index file along * with the history. @@ -481,6 +602,11 @@ size_t csds_reader_update_state_to_next_index( size_t offset = init_offset; int step = 0; if (reader->verbose) printf("\n"); + + /* Record the initial time */ + const int init_time = + clocks_diff_ticks(getticks(), clocks_start_ticks) / 1000.0; + while (offset < time_record.offset) { /* Print status */ @@ -488,10 +614,20 @@ size_t csds_reader_update_state_to_next_index( step += 1; if (step % 100 == 0) { step = 0; + + /* Get the percentage */ float percent = offset - init_offset; percent /= time_record.offset - init_offset; - printf("\rLooking for new or removed particles %.1f%%", - 100.f * percent); + percent *= 100.f; + + /* Get the remaining time */ + const int current_time = + clocks_diff_ticks(getticks(), clocks_start_ticks) / 1000.0; + const int remaining_time = + (current_time - init_time) * (100. - percent) / percent; + + tools_print_progress(percent, remaining_time, + "Looking for new or removed particles"); } } size_t mask = 0; @@ -553,79 +689,47 @@ size_t csds_reader_update_state_to_next_index( printf("\n"); } - /* Update the offsets of current_state - * No need to update the others as they contain - * data about when particles are removed/created*/ - size_t count = 0; - size_t total = 0; - for (int type = 0; type < swift_type_count; type++) { - total += current_state[type].size; - } - for (int type = 0; type < swift_type_count; type++) { - for (size_t i = 0; i < current_state[type].size; i++) { - size_t current_offset = current_state[type].data[i].offset; - - /* Print the update */ - if (reader->verbose) { - count += 1; - if (count % 100 == 0) { - printf("\rUpdating particles %.1f %%", 100.f * (float)count / total); - } - } - - /* Skip the flagged particles. */ - if (current_offset == PARTICLE_REMOVED) continue; + /* Create the threadpool */ + struct threadpool tp; + threadpool_init(&tp, reader->params.number_threads); - /* Get the full mask */ - size_t full_mask = 0; - size_t h_offset = 0; - csds_loader_io_read_mask(h, log->log.map + current_offset, &full_mask, - &h_offset); + /* Create the input */ + struct update_particle_data extra_data; + extra_data.reader = reader; + extra_data.time_offset = time_record.offset; + extra_data.init_time = + clocks_diff_ticks(getticks(), clocks_start_ticks) / 1000.0; + extra_data.percentage = 0.f; + extra_data.number_particles = 0; - /* Remove the special mask */ - full_mask = full_mask & !SPECIAL_FLAGS_MASK; + /* Initialize variables */ + if (lock_init(&extra_data.print_lock) != 0) + error("Failed to initialize the lock"); - /* Find the last offset before the current time */ - size_t last_full_offset = current_offset; - current_offset += h_offset; - while (1) { - /* Get the mask */ - size_t mask = 0; - h_offset = 0; - csds_loader_io_read_mask(h, log->log.map + current_offset, &mask, - &h_offset); - - /* update the offset */ - current_offset += h_offset; - if (current_offset > time_record.offset) { - break; - } - - /* The particle should not have a special flag - due to the previous loop */ - if (mask & SPECIAL_FLAGS_MASK) { - error("Found a special flag when updating the particles."); - } - - /* Update the last full offset */ - if (full_mask == mask) { - last_full_offset = current_offset; - } - - /* Are we at the end of the file? */ - if (h_offset == 0) { - break; - } - } + for (int type = 0; type < swift_type_count; type++) { + extra_data.number_particles += current_state[type].size; + } - /* Update the offset */ - current_state[type].data[i].offset = last_full_offset; - } + /* Update the offsets of current_state + * No need to update the others as they contain + * data about when particles are removed/created*/ + for (int type = 0; type < swift_type_count; type++) { + /* Update the offsets */ + threadpool_map(&tp, csds_reader_update_particles_to_next_index_mapper, + current_state[type].data, current_state[type].size, + sizeof(struct index_data), threadpool_auto_chunk_size, + &extra_data); } /* Cleanup the output */ if (reader->verbose) printf("\n"); + /* Free the memory */ + threadpool_clean(&tp); + if (lock_destroy(&extra_data.print_lock) != 0) { + error("Failed to destroy the lock"); + } + return offset; } @@ -687,7 +791,9 @@ void csds_reader_generate_index_files(const struct csds_reader *reader, const double dt = (t_max - t_min) / (number_index - 1); for (int file_number = 1; file_number < number_index; file_number++) { - /* Get the corresponding time record */ + /* Get the corresponding time record. + * The index files are only here to speedup the code, + * no need to have the exact time. */ const double current_approximate_time = t_min + file_number * dt; const size_t index_time = time_array_get_index_from_time(&log->times, current_approximate_time);