diff --git a/src/Makefile.am b/src/Makefile.am index cc5269181a37fedc6b2d8db66e0bd4c2842a3887..e041246dd3f64c84045387e84f14274ec555cae5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -34,7 +34,7 @@ include_HEADERS += mapped_tracking_element.hpp # Source files for the reader AM_SOURCES = header.cpp mapped_file.cpp time_array.cpp tools.cpp reader.cpp AM_SOURCES += logfile.cpp index_file.cpp parameters.cpp reader_generate_index.cpp -AM_SOURCES += cosmology.cpp single_field.cpp openmp.cpp +AM_SOURCES += cosmology.cpp single_field.cpp AM_SOURCES += parser.cpp particle_type.cpp if HAVEPYTHON diff --git a/src/cache.hpp b/src/cache.hpp index fb5f70deddb00c52299cbfd3e384cd34c76b174d..650d9a1321dc8873b070a908c7bdbbcdfd6955a3 100644 --- a/src/cache.hpp +++ b/src/cache.hpp @@ -153,6 +153,8 @@ class Cache { return mPrev.fields[i].GetField(); } + void SetCurrentIndex(size_t i) { mCurrentIndex = i; }; + /* Size getters */ INLINE size_t Size() const { return mSize; } INLINE size_t Capacity() const { return mPrev.time.capacity(); } @@ -271,13 +273,17 @@ inline int64_t Cache::Save(int64_t index, double time_before, } inline void Cache::RemoveParticle(size_t index) { + // TODO make it thread safe. + /* The tricky part is to consider a swap between two + * particles that should be removed */ /* Decrease the counters */ - size_t size; -#pragma omp atomic capture - { - size = mSize; - mSize--; + mSize--; + const size_t size = mSize; + + /* Ensures that the particle was not already removed */ + if (index >= size) { + return; } /* Swap with the last particle */ @@ -295,6 +301,14 @@ inline void Cache::RemoveParticle(size_t index) { mPrev.time[index] = mPrev.time[size]; mNext.time[index] = mNext.time[size]; mNext.offset[index] = mNext.offset[size]; + +#ifdef CSDS_DEBUG_CHECKS + /* Ensure that something will go wrong if trying to access + * the removed particle */ + mPrev.time[size] = -1; + mNext.time[size] = -2; + mNext.offset[size] = 0; +#endif } #ifdef CSDS_DEBUG_CHECKS @@ -307,6 +321,9 @@ inline void Cache::Check() { if (t1 < t0) { csds_error("Error within the cache"); } + if (mNext.offset[part] == 0) { + csds_error("Found a wrong offset in the cache"); + } } /* Check if the particles are unique */ diff --git a/src/index_file.hpp b/src/index_file.hpp index 4d397722de854acf149297b5db40441f1e09327d..84fd5ebb97de5d9ed2d968529f90d8a5bafc85d6 100644 --- a/src/index_file.hpp +++ b/src/index_file.hpp @@ -111,8 +111,89 @@ class IndexFile : public MappedFile { uint64_t GetNumberRemovedParticles(enum part_type type) const { return mNPartsRemoved[type]; } + uint64_t GetCurrentNumberCreatedParticles(enum part_type type, + size_t offset) const { - // TODO make the two comparators protected + /* Get the history of created particles. */ + struct index_data *data = GetCreatedHistory(type); + uint64_t nparts = GetNumberCreatedParticles(type); + + /* Do we have any new particle? */ + if (GetNumberCreatedParticles(type) == 0 || offset < data[0].offset) { + return 0; + } + + /* Find the correct record */ + struct index_data value = {.id = 0, .offset = offset}; + struct index_data *it = + std::lower_bound(data, data + nparts, value, CompareIndexDataOffset); + size_t ret = it - data; + + /* lower_bound: Returns an iterator pointing to the first + element in the range [first,last) which does + not compare less than val. + Therefore we need to decrease it in order to have the last value before + val + */ + if (offset != it->offset) { + ret--; + } + +#ifdef CSDS_DEBUG_CHECKS + /* Check if the binary search is correct. */ + if (data[ret].offset > offset) { + csds_error("Failed to obtain the number of new particles."); + } + if (ret != nparts - 1 && data[ret + 1].offset < offset) { + csds_error("Failed to obtain the number of removed particles"); + } +#endif + + /* We need the count, not the index. */ + return ret + 1; + } + + uint64_t GetCurrentNumberRemovedParticles(enum part_type type, + size_t offset) const { + + /* Get the history of created particles. */ + struct index_data *data = GetRemovedHistory(type); + const uint64_t nparts = GetNumberRemovedParticles(type); + + /* Do we have any new particle? */ + if (GetNumberRemovedParticles(type) == 0 || offset < data[0].offset) { + return 0; + } + + /* Get the last created particle */ + struct index_data value = {.id = 0, .offset = offset}; + struct index_data *it = + std::lower_bound(data, data + nparts, value, CompareIndexDataOffset); + size_t ret = it - data; + + /* lower_bound: Returns an iterator pointing to the first + element in the range [first,last) which does + not compare less than val. + Therefore we need to decrease it in order to have the last value before + val + */ + if (offset != it->offset) { + ret--; + } + + /* Check if the binary search is correct. */ + if (data[ret].offset > offset) { + csds_error("Failed to obtain the number of removed particles."); + } + if (ret != nparts - 1 && data[ret + 1].offset < offset) { + csds_error("Failed to obtain the number of removed particles"); + } + + /* We need the count, not the index. */ + return ret + 1; + } + + protected: static bool CompareIndexDataOffset(const struct index_data &d1, const struct index_data &d2) { return d1.offset < d2.offset; @@ -122,7 +203,6 @@ class IndexFile : public MappedFile { return d1.id < d2.id; } - protected: /** @brief Sort the file according to the IDs */ void SortFile(const std::string filename, int verbose); /** @brief Write within the index file that the arrays are sorted. */ diff --git a/src/openmp.cpp b/src/openmp.cpp deleted file mode 100644 index 8bfe54812b55cdd092b2f8e8d5c1928b140a89bc..0000000000000000000000000000000000000000 --- a/src/openmp.cpp +++ /dev/null @@ -1,26 +0,0 @@ -/******************************************************************************* - * This file is part of CSDS. - * Copyright (c) 2021 Loic Hausammann (loic.hausammann@epfl.ch) - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Lesser General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - ******************************************************************************/ -#include "openmp.hpp" - -#include "../config.hpp" - -size_t Slices::sAvailableSlice; -size_t Slices::sSliceSize; -std::vector<size_t> Slices::sFreeIndices; -int Slices::sCurrentIndexFreeIndices; diff --git a/src/openmp.hpp b/src/openmp.hpp index c31af69314689c16b11f44839cc0d146e69e5a40..7d08c73033b401da2a9e2cca2a7f50d6a30dd814 100644 --- a/src/openmp.hpp +++ b/src/openmp.hpp @@ -66,81 +66,4 @@ INLINE static bool csds_in_parallel(void) { #endif } -/** - * @brief This class is a thread safe system of slices implemented - * for parallel loops. - * This should be a private variable. - */ -class Slices { - public: - Slices(size_t max_size) - : mMaxSize(max_size), mCurrentSlice(0), mCurrentIndex(sSliceSize) { - if (sFreeIndices.empty()) { - sFreeIndices.reserve(csds_get_max_threads() * sSliceSize); - } - sAvailableSlice = 0; - sSliceSize = std::min<int>(max_size / csds_get_max_threads(), 200); - } - - ~Slices() { - size_t current = mCurrentSlice + mCurrentIndex; - while (current < mMaxSize) { -#pragma omp critical - sFreeIndices.push_back(current); - mCurrentIndex++; - current = mCurrentSlice + mCurrentIndex; - } - }; - - size_t GetFromFreeIndices() { - int index = 0; -#pragma omp atomic capture - { - index = sCurrentIndexFreeIndices; - sCurrentIndexFreeIndices++; - } - while (index < (int)sFreeIndices.size()) { - }; - return sFreeIndices[index]; - } - - size_t GetCurrentIndex() { - /* Simple case, we return the index */ - if (mCurrentIndex < sSliceSize) { - size_t out = mCurrentSlice + mCurrentIndex; - mCurrentIndex++; - if (out < mMaxSize) - return out; - else - return GetFromFreeIndices(); - } - - /* Need a new slice */ -#pragma omp atomic capture - { - mCurrentSlice = sAvailableSlice; - sAvailableSlice += sSliceSize; - } - mCurrentIndex = 0; - - /* The slice is still valid */ - if (mCurrentSlice < mMaxSize) { - return mCurrentSlice; - } - - /* The slice is not valid, thus reach - * sFreeIndices */ - return GetFromFreeIndices(); - } - - protected: - static size_t sAvailableSlice; - static size_t sSliceSize; - static std::vector<size_t> sFreeIndices; - static int sCurrentIndexFreeIndices; - size_t mMaxSize; - size_t mCurrentSlice; - size_t mCurrentIndex; -}; - #endif // CSDS_OPENMP_H diff --git a/src/reader.cpp b/src/reader.cpp index 4679c865b1bac16d15a9b79326918e26a41596a7..811bae79e54592a7a2805c5d2201b0e9feb7314b 100644 --- a/src/reader.cpp +++ b/src/reader.cpp @@ -208,42 +208,9 @@ size_t Reader::CountNumberNewParticles(enum part_type part_type) { const size_t threshold = mTime.time_offset; - /* Get the history of created particles. */ - struct index_data *data = mIndex.index_next->GetCreatedHistory(part_type); - - /* Do we have any new particle? */ - if (mIndex.index_next->GetNumberCreatedParticles(part_type) == 0 || - threshold < data[0].offset) { - return 0; - } - /* Get the last created particle */ - // TODO move inside index file - uint64_t nparts = mIndex.index_next->GetNumberCreatedParticles(part_type); - struct index_data value = {.id = 0, .offset = threshold}; - struct index_data *it = std::lower_bound(data, data + nparts, value, - IndexFile::CompareIndexDataOffset); - size_t ret = it - data; - - /* lower_bound: Returns an iterator pointing to the first - element in the range [first,last) which does - not compare less than val. - Therefore we need to decrease it in order to have the last value before val - */ - if (threshold != it->offset) { - ret--; - } - - /* Check if the binary search is correct. */ - if (data[ret].offset > threshold) { - csds_error("Failed to obtain the number of new particles."); - } - if (ret != nparts - 1 && data[ret + 1].offset < threshold) { - csds_error("Failed to obtain the number of removed particles"); - } - - /* We need the count, not the index. */ - return ret + 1; + return mIndex.index_next->GetCurrentNumberCreatedParticles(part_type, + threshold); } size_t Reader::CountNumberRemovedParticles(enum part_type part_type) { @@ -252,42 +219,8 @@ size_t Reader::CountNumberRemovedParticles(enum part_type part_type) { const size_t threshold = mTime.time_offset; - /* Get the history of created particles. */ - struct index_data *data = mIndex.index_next->GetRemovedHistory(part_type); - - /* Do we have any new particle? */ - if (mIndex.index_next->GetNumberRemovedParticles(part_type) == 0 || - threshold < data[0].offset) { - return 0; - } - - /* Get the last created particle */ - const uint64_t nparts = - mIndex.index_next->GetNumberRemovedParticles(part_type); - struct index_data value = {.id = 0, .offset = threshold}; - struct index_data *it = std::lower_bound(data, data + nparts, value, - IndexFile::CompareIndexDataOffset); - size_t ret = it - data; - - /* lower_bound: Returns an iterator pointing to the first - element in the range [first,last) which does - not compare less than val. - Therefore we need to decrease it in order to have the last value before val - */ - if (threshold != it->offset) { - ret--; - } - - /* Check if the binary search is correct. */ - if (data[ret].offset > threshold) { - csds_error("Failed to obtain the number of removed particles."); - } - if (ret != nparts - 1 && data[ret + 1].offset < threshold) { - csds_error("Failed to obtain the number of removed particles"); - } - - /* We need the count, not the index. */ - return ret + 1; + return mIndex.index_next->GetCurrentNumberRemovedParticles(part_type, + threshold); } void Reader::GetNumberParticles( @@ -310,34 +243,24 @@ void Reader::GetNumberParticles( } /* Here we simply need to count the number of particles removed between the two last time records */ - struct index_data *removed = - mIndex.index_prev->GetRemovedHistory((part_type)i); size_t size_max = mIndex.index_prev->GetNumberRemovedParticles((part_type)i); - - struct index_data value = {.id = 0, .offset = mTime.time_offset}; - struct index_data *it = - std::upper_bound(removed, removed + size_max, value, - IndexFile::CompareIndexDataOffset); - n_parts[i] = size_max - (it - removed); + size_t removed = mIndex.index_prev->GetCurrentNumberRemovedParticles( + (part_type)i, mTime.time_offset); + n_parts[i] = size_max - removed; /* In between the last two records, we also store the particles active in the last step, therefore we need to count the particles created */ - struct index_data *created = - mIndex.index_prev->GetCreatedHistory((part_type)i); size_max = mIndex.index_prev->GetNumberCreatedParticles((part_type)i); - - it = std::upper_bound(created, created + size_max, value, - IndexFile::CompareIndexDataOffset); - n_parts[i] -= size_max - (it - created); + size_t created = mIndex.index_prev->GetCurrentNumberCreatedParticles( + (part_type)i, mTime.time_offset); + n_parts[i] -= size_max - created; } /* Normal case */ else { /* Count the number of particles present in the last index file. */ const uint64_t count_prev = - final_index - ? mIndex.index_prev->GetNumberRemovedParticles((part_type)i) - : mIndex.index_prev->GetNumberParticles((part_type)i); + mIndex.index_prev->GetNumberParticles((part_type)i); /* Count the number of particles that have been created since last index. */ const uint64_t count_new = CountNumberNewParticles((enum part_type)i); @@ -908,30 +831,41 @@ void Reader::UpdateParticlesSingleType( mCache[type].GetFieldIndex(output[field].GetField()); } - /* Update the particles */ + /* Declare a few variables for the loop */ const size_t initial_size_cache = mCache[type].Size(); + size_t number_particle_removed = 0; - std::vector<size_t> particle_removed; - particle_removed.resize(0.05 * initial_size_cache); - const size_t NOT_SET = initial_size_cache + 1 + prev_npart; - Slices slices(n_part[type]); - -#pragma omp parallel for firstprivate(slices) - for (size_t part = 0; part < initial_size_cache; part++) { - size_t current_output_index = NOT_SET; + std::vector<size_t> particle_removed(0.05 * initial_size_cache); + + size_t output_index = prev_npart; + + message(initial_size_cache); + + /* Update the particles */ +#pragma omp parallel for + for (size_t cache_index = 0; cache_index < initial_size_cache; + cache_index++) { + size_t current_output_index = 0; bool removed = false; + + /* Do all the fields */ for (size_t field = 0; field < output.size(); field++) { - /* Get the output */ + /* Generate a temporary output */ const size_t field_size = output[field].GetField().GetFieldSize(); - char current_output[field_size]; + char temporary_output[field_size]; /* Update the particle */ removed = - UpdateSingleParticle(part, type, time, offset_time, - field_index_in_cache[field], current_output); + UpdateSingleParticle(cache_index, type, time, offset_time, + field_index_in_cache[field], temporary_output); /* Move to the next particle if this one is removed */ if (removed) { +#ifdef CSDS_DEBUG_CHECKS + if (field != 0) { + csds_error("Should not happen"); + } +#endif /* Increase the array size if needed */ if (number_particle_removed == particle_removed.capacity()) { #pragma omp critical @@ -946,29 +880,53 @@ void Reader::UpdateParticlesSingleType( current = number_particle_removed; number_particle_removed++; } - particle_removed[current] = part; + particle_removed[current] = cache_index; break; } /* Get the output index */ - if (current_output_index == NOT_SET) { - current_output_index = prev_npart + slices.GetCurrentIndex(); + if (field == 0) { +#pragma omp atomic capture + { + current_output_index = output_index; + output_index++; + } } +#ifdef CSDS_DEBUG_CHECKS + /* Ensures that we get a correct index */ + if (current_output_index >= prev_npart + initial_size_cache) { + csds_error("Something wrong happened with the slice"); + } +#endif + /* Copy back into the output */ - memcpy(output[field][current_output_index], current_output, field_size); + memcpy(output[field][current_output_index], temporary_output, field_size); } } /* Remove the particles from the cache */ // TODO this can be merged with the addition of particles -#pragma omp parallel + particle_removed.resize(number_particle_removed); + std::sort(particle_removed.begin(), particle_removed.end()); for (size_t i = 0; i < number_particle_removed; i++) { - mCache[type].RemoveParticle(particle_removed[i]); + // TODO thread safe + size_t ind = number_particle_removed - 1 - i; + ind = particle_removed[ind]; + mCache[type].RemoveParticle(ind); } +#ifdef CSDS_DEBUG_CHECKS + /* Check if we have the correct next offsets */ + for (size_t i = 0; i < mCache[type].Size(); i++) { + if (mCache[type].GetOffsetAfter(i) < offset_time) { + csds_error("Error within the cache"); + } + } +#endif + /* Now look for the new particles */ - size_t index_output = initial_size_cache - number_particle_removed; + mCache[type].SetCurrentIndex(mCache[type].Size()); /* Check if we need to update the cache with the previous index file */ const TimeArray × = mLog->GetTimeArray(); @@ -977,6 +935,7 @@ void Reader::UpdateParticlesSingleType( const bool do_previous = mCache[type].GetLastTimeOffset() < offset_index; /* Get the histories */ + // TODO use a pointer to IndexFile struct index_data *prev_created = mIndex.index_prev->GetCreatedHistory(type); struct index_data *next_created = have_next_index ? mIndex.index_next->GetCreatedHistory(type) : NULL; @@ -990,12 +949,11 @@ void Reader::UpdateParticlesSingleType( : size_next; /* Get the last created particle */ - struct index_data value = {.id = 0, - .offset = mCache[type].GetLastTimeOffset()}; - struct index_data *it = - std::upper_bound(index_data, index_data + size_max, value, - IndexFile::CompareIndexDataOffset); - size_t center = it - index_data; + size_t center = do_previous + ? mIndex.index_prev->GetCurrentNumberCreatedParticles( + type, mCache[type].GetLastTimeOffset()) + : mIndex.index_next->GetCurrentNumberCreatedParticles( + type, mCache[type].GetLastTimeOffset()); /* Create a temporary buffer */ std::vector<CsdsArray> buffers; @@ -1024,15 +982,16 @@ void Reader::UpdateParticlesSingleType( /* Write the particle back into the array */ for (size_t field = 0; field < output.size(); field++) { const Field ¤t = output[field].GetField(); - char *output_buffer = output[field][index_output]; + char *output_buffer = output[field][output_index]; memcpy(output_buffer, buffers[field][0], current.GetFieldSize()); } /* Update the counter */ - index_output++; + output_index++; } /* Do the same for the next index file */ + // TODO use a function for this and the previous one if (have_next_index && do_previous) { index_data = next_created; size_max = mIndex.index_next->GetNumberCreatedParticles(type); @@ -1055,18 +1014,18 @@ void Reader::UpdateParticlesSingleType( /* Write the particle back into the array */ for (size_t field = 0; field < output.size(); field++) { const Field ¤t = output[field].GetField(); - char *output_buffer = output[field][index_output]; + char *output_buffer = output[field][output_index]; memcpy(output_buffer, buffers[field][0], current.GetFieldSize()); } /* Update the counter */ - index_output++; + output_index++; } } /* Ensures that we found the correct number of particles */ - if (index_output - prev_npart != output[type].Size()) { - csds_error("Some particles are missing: " << index_output - prev_npart + if (output_index - prev_npart != output[type].Size()) { + csds_error("Some particles are missing: " << output_index - prev_npart << " != " << output[type].Size()); } }