Commit 7de9b5e3 authored by Matthieu Schaller's avatar Matthieu Schaller
Browse files

Merge branch 'streaming_io' into 'master'

Streaming io

Some infrastructure needed for particle logging, should not break anything as it's not used yet.

Next step is to write the bits that log particle data at different resolutions.

See merge request !297
parents e328f5da 22d93658
......@@ -44,7 +44,7 @@ include_HEADERS = space.h runner.h queue.h task.h lock.h cell.h part.h const.h \
common_io.h single_io.h multipole.h map.h tools.h partition.h clocks.h parser.h \
physical_constants.h physical_constants_cgs.h potential.h version.h \
hydro_properties.h riemann.h threadpool.h cooling.h cooling_struct.h sourceterms.h \
sourceterms_struct.h statistics.h memswap.h profiler.h
sourceterms_struct.h statistics.h memswap.h profiler.h dump.h
# Common source files
AM_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \
......@@ -53,7 +53,7 @@ AM_SOURCES = space.c runner.c queue.c task.c cell.c engine.c \
kernel_hydro.c tools.c part.c partition.c clocks.c parser.c \
physical_constants.c potential.c hydro_properties.c \
runner_doiact_fft.c threadpool.c cooling.c sourceterms.c \
statistics.c profiler.c
statistics.c profiler.c dump.c
# Include files for distribution, not installation.
nobase_noinst_HEADERS = align.h approx_math.h atomic.h cycle.h error.h inline.h kernel_hydro.h kernel_gravity.h \
......@@ -83,7 +83,7 @@ nobase_noinst_HEADERS = align.h approx_math.h atomic.h cycle.h error.h inline.h
cooling/none/cooling.h cooling/none/cooling_struct.h \
cooling/const_du/cooling.h cooling/const_du/cooling_struct.h \
cooling/const_lambda/cooling.h cooling/const_lambda/cooling_struct.h \
memswap.h
memswap.h dump.h
# Sources and flags for regular library
......
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2016 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/* Config parameters. */
#include "../config.h"
/* Some standard headers. */
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
/* This object's header. */
#include "dump.h"
/* Local headers. */
#include "atomic.h"
#include "error.h"
/**
* @brief Obtain a chunk of memory from a dump.
*
* @param d The #dump.
* @param count The number of bytes requested.
* @param offset The offset of the returned memory address within the dump file.
* @return A pointer to the memory-mapped chunk of data.
*/
void *dump_get(struct dump *d, size_t count, size_t *offset) {
size_t local_offset = atomic_add(&d->count, count);
*offset = local_offset + d->file_offset;
return d->data + local_offset;
}
/**
* @brief Ensure that at least size bytes are available in the #dump.
*/
void dump_ensure(struct dump *d, size_t size) {
/* If we have enough space already, just bail. */
if (d->size - d->count > size) return;
/* Unmap the current data. */
size_t trunc_count = d->count & d->page_mask;
if (munmap(d->data, trunc_count > 0 ? trunc_count : 1) != 0) {
error("Failed to unmap %zi bytes of dump data (%s).", trunc_count,
strerror(errno));
}
/* Update the size and count. */
d->file_offset += trunc_count;
d->count -= trunc_count;
d->size = (size * dump_grow_ensure_factor + ~d->page_mask) & d->page_mask;
/* Re-allocate the file size. */
if (posix_fallocate(d->fd, d->file_offset, d->size) != 0) {
error("Failed to pre-allocate the dump file.");
}
/* Re-map starting at the end of the file. */
if ((d->data = mmap(NULL, d->size, PROT_WRITE, MAP_SHARED, d->fd,
d->file_offset)) == MAP_FAILED) {
error("Failed to allocate map of size %zi bytes (%s).", d->size,
strerror(errno));
}
}
/**
* @brief Flush the #dump to disk.
*/
void dump_sync(struct dump *d) {
if (msync(d->data, d->count, MS_SYNC) != 0)
error("Failed to sync memory-mapped data.");
}
/**
* @brief Finalize the #dump.
*/
void dump_close(struct dump *d) {
/* Unmap the data in memory. */
if (munmap(d->data, d->count) != 0) {
error("Failed to unmap dump data (%s).", strerror(errno));
}
/* Truncate the file to the correct length. */
if (ftruncate(d->fd, d->file_offset + d->count) != 0) {
error("Failed to truncate dump file (%s).", strerror(errno));
}
/* Close the memory-mapped file. */
if (close(d->fd) != 0) error("Failed to close memory-mapped file.");
}
/**
* @brief Initialize a file dump.
*
* @param d The #dump to initialize.
* @param filename The fully qualified name of the file in which to dump,
* note that it will be overwritten.
* @param size The initial buffer size for this #dump.
*/
void dump_init(struct dump *d, const char *filename, size_t size) {
/* Create the output file. */
if ((d->fd = open(filename, O_CREAT | O_RDWR, 0660)) == -1) {
error("Failed to create dump file '%s' (%s).", filename, strerror(errno));
}
/* Adjust the size to be at least the page size. */
const size_t page_mask = ~(sysconf(_SC_PAGE_SIZE) - 1);
size = (size + ~page_mask) & page_mask;
/* Pre-allocate the file size. */
if (posix_fallocate(d->fd, 0, size) != 0) {
error("Failed to pre-allocate the dump file.");
}
/* Map memory to the created file. */
if ((d->data = mmap(NULL, size, PROT_WRITE, MAP_SHARED, d->fd, 0)) ==
MAP_FAILED) {
error("Failed to allocate map of size %zi bytes (%s).", size,
strerror(errno));
}
/* Init some counters. */
d->size = size;
d->count = 0;
d->file_offset = 0;
d->page_mask = page_mask;
}
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2016 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
#ifndef SWIFT_DUMP_H
#define SWIFT_DUMP_H
/* Includes. */
#include "lock.h"
/* Some constants. */
#define dump_grow_ensure_factor 10
/** The dump struct. */
struct dump {
/* The memory-mapped data of this dump. */
void *data;
/* The size of the memory-mapped data, in bytes. */
size_t size;
/* The number of bytes that have been dumped. */
size_t count;
/* The offset of the data within the current file. */
size_t file_offset;
/* The file with which this memory is associated. */
int fd;
/* Mask containing the significant bits for page addresses. */
size_t page_mask;
};
/* Function prototypes. */
void dump_init(struct dump *d, const char *filename, size_t size);
void dump_ensure(struct dump *d, size_t size);
void dump_sync(struct dump *d);
void dump_close(struct dump *d);
void *dump_get(struct dump *d, size_t count, size_t *offset);
#endif /* SWIFT_DUMP_H */
......@@ -490,7 +490,6 @@ void space_rebuild(struct space *s, int verbose) {
space_gparts_get_cell_index(s, gind, cells_top, verbose);
#ifdef WITH_MPI
/* Move non-local parts to the end of the list. */
const int local_nodeID = s->e->nodeID;
for (size_t k = 0; k < nr_parts;) {
......
......@@ -25,7 +25,7 @@ TESTS = testGreetings testMaths testReading.sh testSingle testKernel testSymmetr
testPair.sh testPairPerturbed.sh test27cells.sh test27cellsPerturbed.sh \
testParser.sh testSPHStep test125cells.sh testKernelGrav testFFT \
testAdiabaticIndex testRiemannExact testRiemannTRRS testRiemannHLLC \
testMatrixInversion testThreadpool
testMatrixInversion testThreadpool testDump
# List of test programs to compile
check_PROGRAMS = testGreetings testReading testSingle testTimeIntegration \
......@@ -33,7 +33,7 @@ check_PROGRAMS = testGreetings testReading testSingle testTimeIntegration \
testKernel testKernelGrav testFFT testInteractions testMaths \
testSymmetry testThreadpool \
testAdiabaticIndex testRiemannExact testRiemannTRRS \
testRiemannHLLC testMatrixInversion
testRiemannHLLC testMatrixInversion testDump
# Sources for the individual programs
testGreetings_SOURCES = testGreetings.c
......@@ -78,6 +78,8 @@ testMatrixInversion_SOURCES = testMatrixInversion.c
testThreadpool_SOURCES = testThreadpool.c
testDump_SOURCES = testDump.c
# Files necessary for distribution
EXTRA_DIST = testReading.sh makeInput.py testPair.sh testPairPerturbed.sh \
test27cells.sh test27cellsPerturbed.sh testParser.sh \
......
/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2016 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
******************************************************************************/
/* Config parameters. */
#include "../config.h"
/* Some standard headers. */
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
/* This object's header. */
#include "../src/dump.h"
/* Local headers. */
#include "../src/threadpool.h"
void dump_mapper(void *map_data, int num_elements, void *extra_data) {
struct dump *d = (struct dump *)extra_data;
size_t offset;
char *out_string = dump_get(d, 7, &offset);
char out_buff[8];
snprintf(out_buff, 8, "%06zi\n", offset / 7);
memcpy(out_string, out_buff, 7);
}
int main(int argc, char *argv[]) {
/* Some constants. */
const int num_threads = 4;
const char *filename = "/tmp/dump_test.out";
const int num_runs = 20;
const int chunk_size = 1000;
/* Prepare a threadpool to write to the dump. */
struct threadpool t;
threadpool_init(&t, num_threads);
/* Prepare a dump. */
struct dump d;
dump_init(&d, filename, 1024);
/* Dump numbers in chunks. */
for (int run = 0; run < num_runs; run++) {
/* Ensure capacity. */
dump_ensure(&d, 7 * chunk_size);
/* Dump a few numbers. */
printf("dumping %i chunks...\n", chunk_size);
fflush(stdout);
threadpool_map(&t, dump_mapper, NULL, chunk_size, 0, 1, &d);
}
/* Sync the file, not necessary before dump_close, but just to test this. */
dump_sync(&d);
/* Finalize the dump. */
dump_close(&d);
/* Return a happy number. */
return 0;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment