/*******************************************************************************
* This file is part of SWIFT.
* Copyright (c) 2021 John Helly (j.c.helly@durham.ac.uk)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see .
*
******************************************************************************/
#include "particle_buffer.h"
#include "align.h"
#include "error.h"
#include "memuse.h"
#include
#include
#include
#include
/**
* @brief Initialize a particle buffer.
*
* Stores a sequence of data objects of size element_size,
* allowing new elements to be appended by multiple threads
* simultaneously. Note that ONLY the append operation is
* thread safe.
*
* Objects are stored in a linked list of blocks and new blocks
* are allocated as needed.
*
* @param buffer The #particle_buffer
* @param element_size Size of a single element
* @param elements_per_block Number of elements to store in each block
* @param name Name to use when logging memory allocations
*
*/
void particle_buffer_init(struct particle_buffer *buffer, size_t element_size,
size_t elements_per_block, char *name) {
buffer->element_size = element_size;
buffer->elements_per_block = elements_per_block;
buffer->first_block = NULL;
buffer->last_block = NULL;
lock_init(&buffer->lock);
int len = snprintf(buffer->name, PARTICLE_BUFFER_NAME_LENGTH, "%s", name);
if (len >= PARTICLE_BUFFER_NAME_LENGTH || len < 0)
error("Buffer name truncated or encoding error");
}
/**
* @brief Deallocate a particle buffer.
*
* The buffer is no longer in a usable state after this.
*
* @param buffer The #particle_buffer
*
*/
void particle_buffer_free(struct particle_buffer *buffer) {
struct particle_buffer_block *block = buffer->first_block;
while (block) {
struct particle_buffer_block *next = block->next;
swift_free(buffer->name, block->data);
free(block);
block = next;
}
buffer->first_block = NULL;
buffer->last_block = NULL;
if (lock_destroy(&buffer->lock) != 0)
error("Failed to destroy lock on particle buffer");
}
/**
* @brief Empty a particle buffer
*
* This leaves the buffer ready to accept new elements.
*
* @param buffer The #particle_buffer
*
*/
void particle_buffer_empty(struct particle_buffer *buffer) {
const size_t element_size = buffer->element_size;
const size_t elements_per_block = buffer->elements_per_block;
char name[PARTICLE_BUFFER_NAME_LENGTH];
strncpy(name, buffer->name, PARTICLE_BUFFER_NAME_LENGTH);
particle_buffer_free(buffer);
particle_buffer_init(buffer, element_size, elements_per_block, name);
}
/**
* @brief Allocate a new particle buffer block
*
* @param buffer The #particle_buffer
* @param previous_block The previous final block in the linked list
*/
static struct particle_buffer_block *allocate_block(
struct particle_buffer *buffer,
struct particle_buffer_block *previous_block) {
const size_t element_size = buffer->element_size;
const size_t elements_per_block = buffer->elements_per_block;
/* Allocate the struct */
struct particle_buffer_block *block = (struct particle_buffer_block *)malloc(
sizeof(struct particle_buffer_block));
if (!block)
error("Failed to allocate new particle buffer block: %s", buffer->name);
/* Allocate data buffer */
char *data;
if (swift_memalign(buffer->name, (void **)&data, SWIFT_STRUCT_ALIGNMENT,
element_size * elements_per_block) != 0) {
error("Failed to allocate particle buffer data block: %s", buffer->name);
}
/* Initalise the struct */
block->data = data;
block->num_elements = 0;
block->next = NULL;
if (previous_block) previous_block->next = block;
return block;
}
/**
* @brief Append an element to a particle buffer.
*
* May be called from multiple threads simultaneously.
*
* @param buffer The #particle_buffer
* @param data The element to append
*
*/
void particle_buffer_append(struct particle_buffer *buffer, void *data) {
const size_t element_size = buffer->element_size;
const size_t elements_per_block = buffer->elements_per_block;
while (1) {
/* Find the current block (atomic because last_block may be modified by
* other threads) */
struct particle_buffer_block *block =
__atomic_load_n(&buffer->last_block, __ATOMIC_SEQ_CST);
/* It may be that no blocks exist yet */
if (!block) {
lock_lock(&buffer->lock);
/* Check no-one else allocated the first block before we got the lock */
if (!buffer->last_block) {
block = allocate_block(buffer, NULL);
buffer->first_block = block;
__atomic_thread_fence(__ATOMIC_SEQ_CST);
/* After this store other threads will write to the new block,
so all initialization must complete before this. */
__atomic_store_n(&buffer->last_block, block, __ATOMIC_SEQ_CST);
}
if (lock_unlock(&buffer->lock) != 0)
error("Failed to unlock particle buffer");
/* Now try again */
continue;
}
/* Find next available index in current block */
size_t index =
__atomic_fetch_add(&block->num_elements, 1, __ATOMIC_SEQ_CST);
if (index < elements_per_block) {
/* We reserved a valid index, so copy the data */
memcpy(block->data + index * element_size, data, element_size);
return;
} else {
/* No space left, so we need to allocate a new block */
lock_lock(&buffer->lock);
/* Check no-one else already did it before we got the lock */
if (!block->next) {
/* Allocate and initialize the new block */
struct particle_buffer_block *new_block = allocate_block(buffer, block);
__atomic_thread_fence(__ATOMIC_SEQ_CST);
/* After this store other threads will write to the new block,
so all initialization must complete before this. */
__atomic_store_n(&buffer->last_block, new_block, __ATOMIC_SEQ_CST);
}
if (lock_unlock(&buffer->lock) != 0)
error("Failed to unlock particle buffer");
/* Now we have space, will try again */
}
}
}
/**
* @brief Iterate over data blocks in particle buffer.
*
* @param buffer The #particle_buffer
* @param block Initially null, returns pointer to next data block
* @param num_elements Returns number of elements in this block
* @param data Returns pointer to data in this block
*
*/
void particle_buffer_iterate(struct particle_buffer *buffer,
struct particle_buffer_block **block,
size_t *num_elements, void **data) {
if (!*block) {
*block = buffer->first_block;
} else {
*block = (*block)->next;
}
if (*block) {
*data = (*block)->data;
*num_elements = (*block)->num_elements;
if (*num_elements > buffer->elements_per_block)
*num_elements = buffer->elements_per_block;
} else {
*data = NULL;
*num_elements = 0;
}
}
/**
* @brief Return number of elements in particle buffer.
*
* @param buffer The #particle_buffer
*
*/
size_t particle_buffer_num_elements(struct particle_buffer *buffer) {
size_t num_elements = 0;
struct particle_buffer_block *block = buffer->first_block;
while (block) {
if (block->num_elements < buffer->elements_per_block) {
/* Non-full block, so block->num_elements is correct */
num_elements += block->num_elements;
} else {
/* Full block, so block->num_elements may be out of range */
num_elements += buffer->elements_per_block;
}
block = block->next;
}
return num_elements;
}
/**
* @brief Return memory used by a particle buffer.
*
* @param buffer The #particle_buffer
*
*/
size_t particle_buffer_memory_use(struct particle_buffer *buffer) {
size_t num_bytes = 0;
struct particle_buffer_block *block = buffer->first_block;
while (block) {
num_bytes += (buffer->elements_per_block * buffer->element_size);
block = block->next;
}
return num_bytes;
}