[med-svn] [Git][med-team/readerwriterqueue][master] 5 commits: New upstream version 1.0.5
Nilesh Patra
gitlab at salsa.debian.org
Mon May 3 16:14:59 BST 2021
Nilesh Patra pushed to branch master at Debian Med / readerwriterqueue
Commits:
26a2b0f8 by Nilesh Patra at 2021-05-03T20:33:45+05:30
New upstream version 1.0.5
- - - - -
cc0b762f by Nilesh Patra at 2021-05-03T20:33:46+05:30
Update upstream source from tag 'upstream/1.0.5'
Update to upstream version '1.0.5'
with Debian dir 7a4df1bb12a9520b731e64fc3e43c81c507c978a
- - - - -
1af24146 by Nilesh Patra at 2021-05-03T20:38:17+05:30
Refresh patch
- - - - -
eecd2934 by Nilesh Patra at 2021-05-03T20:39:46+05:30
Install readerwritercircularbuffer.h to /usr/include
- - - - -
14d52c2d by Nilesh Patra at 2021-05-03T20:44:28+05:30
Update Interim changelog entry
- - - - -
13 changed files:
- CMakeLists.txt
- LICENSE.md
- README.md
- atomicops.h
- benchmarks/bench.cpp
- benchmarks/makefile
- debian/changelog
- debian/libreaderwriterqueue-dev.install
- debian/patches/hardening.patch
- + readerwritercircularbuffer.h
- readerwriterqueue.h
- tests/unittests/makefile
- tests/unittests/unittests.cpp
Changes:
=====================================
CMakeLists.txt
=====================================
@@ -7,5 +7,5 @@ add_library(${PROJECT_NAME} INTERFACE)
target_include_directories(readerwriterqueue INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
-install(FILES atomicops.h readerwriterqueue.h LICENSE.md
+install(FILES atomicops.h readerwriterqueue.h readerwritercircularbuffer.h LICENSE.md
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${PROJECT_NAME})
=====================================
LICENSE.md
=====================================
@@ -1,11 +1,11 @@
This license applies to all the code in this repository except that written by third
parties, namely the files in benchmarks/ext, which have their own licenses, and Jeff
-Preshing's semaphore implementation (used in the blocking queue) which has a zlib
+Preshing's semaphore implementation (used in the blocking queues) which has a zlib
license (embedded in atomicops.h).
Simplified BSD License:
-Copyright (c) 2013-2015, Cameron Desrochers
+Copyright (c) 2013-2021, Cameron Desrochers
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
=====================================
README.md
=====================================
@@ -7,6 +7,8 @@ you could use this queue completely from a single thread if you wish (but that w
Note: If you need a general-purpose multi-producer, multi-consumer lock free queue, I have [one of those too][mpmc].
+This repository also includes a [circular-buffer SPSC queue][circular] which supports blocking on enqueue as well as dequeue.
+
## Features
@@ -25,7 +27,7 @@ Note: If you need a general-purpose multi-producer, multi-consumer lock free que
## Use
-Simply drop the readerwriterqueue.h and atomicops.h files into your source code and include them :-)
+Simply drop the readerwriterqueue.h (or readerwritercircularbuffer.h) and atomicops.h files into your source code and include them :-)
A modern compiler is required (MSVC2010+, GCC 4.7+, ICC 13+, or any C++11 compliant compiler should work).
Note: If you're using GCC, you really do need GCC 4.7 or above -- [4.6 has a bug][gcc46bug] that prevents the atomic fence primitives
@@ -94,6 +96,25 @@ means care must be taken to only call `wait_dequeue` if you're sure another elem
will come along eventually, or if the queue has a static lifetime. This is because
destroying the queue while a thread is waiting on it will invoke undefined behaviour.
+The blocking circular buffer has a fixed number of slots, but is otherwise quite similar to
+use:
+
+```cpp
+BlockingReaderWriterCircularBuffer<int> q(1024); // pass initial capacity
+
+q.try_enqueue(1);
+int number;
+q.try_dequeue(number);
+assert(number == 1);
+
+q.wait_enqueue(123);
+q.wait_dequeue(number);
+assert(number == 123);
+
+q.wait_dequeue_timed(number, std::chrono::milliseconds(10));
+```
+
+
## CMake installation
As an alternative to including the source files in your project directly,
you can use CMake to install the library in your system's include directory:
@@ -118,14 +139,6 @@ includes all modern processors (e.g. x86/x86-64, ARM, and PowerPC). *Not* for us
Note that it's only been tested on x86(-64); if someone has access to other processors I'd love to run some tests on
anything that's not x86-based.
-Finally, I am not an expert. This is my first foray into lock-free programming, and though I'm confident in the code,
-it's possible that there are bugs despite the effort I put into designing and testing this data structure.
-
-Use this code at your own risk; in particular, lock-free programming is a patent minefield, and this code may very
-well violate a pending patent (I haven't looked). It's worth noting that I came up with this algorithm and
-implementation from scratch, independent of any existing lock-free queues.
-
-
## More info
See the [LICENSE.md][license] file for the license (simplified BSD).
@@ -139,3 +152,4 @@ about lock-free programming.
[benchmarks]: http://moodycamel.com/blog/2013/a-fast-lock-free-queue-for-c++#benchmarks
[gcc46bug]: http://stackoverflow.com/questions/16429669/stdatomic-thread-fence-has-undefined-reference
[mpmc]: https://github.com/cameron314/concurrentqueue
+[circular]: readerwritercircularbuffer.h
=====================================
atomicops.h
=====================================
@@ -239,7 +239,7 @@ template<typename T>
class weak_atomic
{
public:
- AE_NO_TSAN weak_atomic() { }
+ AE_NO_TSAN weak_atomic() : value() { }
#ifdef AE_VCPP
#pragma warning(push)
#pragma warning(disable: 4100) // Get rid of (erroneous) 'unreferenced formal parameter' warning
@@ -389,7 +389,7 @@ namespace moodycamel
Semaphore& operator=(const Semaphore& other);
public:
- AE_NO_TSAN Semaphore(int initialCount = 0)
+ AE_NO_TSAN Semaphore(int initialCount = 0) : m_hSema()
{
assert(initialCount >= 0);
const long maxLong = 0x7fffffff;
@@ -437,7 +437,7 @@ namespace moodycamel
Semaphore& operator=(const Semaphore& other);
public:
- AE_NO_TSAN Semaphore(int initialCount = 0)
+ AE_NO_TSAN Semaphore(int initialCount = 0) : m_sema()
{
assert(initialCount >= 0);
kern_return_t rc = semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
@@ -497,7 +497,7 @@ namespace moodycamel
Semaphore& operator=(const Semaphore& other);
public:
- AE_NO_TSAN Semaphore(int initialCount = 0)
+ AE_NO_TSAN Semaphore(int initialCount = 0) : m_sema()
{
assert(initialCount >= 0);
int rc = sem_init(&m_sema, 0, static_cast<unsigned int>(initialCount));
@@ -626,7 +626,7 @@ namespace moodycamel
}
public:
- AE_NO_TSAN LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
+ AE_NO_TSAN LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount), m_sema()
{
assert(initialCount >= 0);
}
=====================================
benchmarks/bench.cpp
=====================================
@@ -8,6 +8,10 @@
#define NO_FOLLY_SUPPORT
#endif
+#if defined(_MSC_VER) && _MSC_VER < 1700
+#define NO_CIRCULAR_BUFFER_SUPPORT
+#endif
+
#if !defined(__amd64__) && !defined(_M_X64) && !defined(__x86_64__) && !defined(_M_IX86) && !defined(__i386__)
#define NO_SPSC_SUPPORT // SPSC implementation is for x86 only
#endif
@@ -17,6 +21,15 @@
#include "ext/folly/ProducerConsumerQueue.h" // Facebook's folly (GitHub)
#endif
#include "../readerwriterqueue.h" // Mine
+#ifndef NO_CIRCULAR_BUFFER_SUPPORT
+#include "../readerwritercircularbuffer.h" // Mine
+template<typename T>
+class BlockingReaderWriterCircularBufferAdapter : public moodycamel::BlockingReaderWriterCircularBuffer<T> {
+public:
+ BlockingReaderWriterCircularBufferAdapter(std::size_t capacity) : moodycamel::BlockingReaderWriterCircularBuffer<T>(capacity) { }
+ void enqueue(T const& x) { this->wait_enqueue(x); }
+};
+#endif
#include "systemtime.h"
#include "../tests/common/simplethread.h"
@@ -75,11 +88,13 @@ int main(int argc, char** argv)
const double FASTEST_PERCENT_CONSIDERED = 20; // Consider only the fastest runs in the top 20%
double rwqResults[BENCHMARK_COUNT][TEST_COUNT];
+ double brwcbResults[BENCHMARK_COUNT][TEST_COUNT];
double spscResults[BENCHMARK_COUNT][TEST_COUNT];
double follyResults[BENCHMARK_COUNT][TEST_COUNT];
// Also calculate a rough heuristic of "ops/s" (across all runs, not just fastest)
double rwqOps[BENCHMARK_COUNT][TEST_COUNT];
+ double brwcbOps[BENCHMARK_COUNT][TEST_COUNT];
double spscOps[BENCHMARK_COUNT][TEST_COUNT];
double follyOps[BENCHMARK_COUNT][TEST_COUNT];
@@ -94,6 +109,16 @@ int main(int argc, char** argv)
for (int i = 0; i < TEST_COUNT; ++i) {
rwqResults[benchmark][i] = runBenchmark<ReaderWriterQueue<int>>((BenchmarkType)benchmark, randSeeds[benchmark], rwqOps[benchmark][i]);
}
+#ifndef NO_CIRCULAR_BUFFER_SUPPORT
+ for (int i = 0; i < TEST_COUNT; ++i) {
+ brwcbResults[benchmark][i] = runBenchmark<BlockingReaderWriterCircularBufferAdapter<int>>((BenchmarkType)benchmark, randSeeds[benchmark], brwcbOps[benchmark][i]);
+ }
+#else
+ for (int i = 0; i < TEST_COUNT; ++i) {
+ brwcbResults[benchmark][i] = 0;
+ brwcbOps[benchmark][i] = 0;
+ }
+#endif
#ifndef NO_SPSC_SUPPORT
for (int i = 0; i < TEST_COUNT; ++i) {
spscResults[benchmark][i] = runBenchmark<spsc_queue<int>>((BenchmarkType)benchmark, randSeeds[benchmark], spscOps[benchmark][i]);
@@ -119,6 +144,7 @@ int main(int argc, char** argv)
// Sort results
for (int benchmark = 0; benchmark < BENCHMARK_COUNT; ++benchmark) {
std::sort(&rwqResults[benchmark][0], &rwqResults[benchmark][0] + TEST_COUNT);
+ std::sort(&brwcbResults[benchmark][0], &brwcbResults[benchmark][0] + TEST_COUNT);
std::sort(&spscResults[benchmark][0], &spscResults[benchmark][0] + TEST_COUNT);
std::sort(&follyResults[benchmark][0], &follyResults[benchmark][0] + TEST_COUNT);
}
@@ -126,24 +152,29 @@ int main(int argc, char** argv)
// Display results
int max = std::max(2, (int)(TEST_COUNT * FASTEST_PERCENT_CONSIDERED / 100));
assert(max > 0);
+#ifdef NO_CIRCULAR_BUFFER_SUPPORT
+ std::cout << "Note: BRWCB queue not supported on this platform, discount its timings" << std::endl;
+#endif
#ifdef NO_SPSC_SUPPORT
std::cout << "Note: SPSC queue not supported on this platform, discount its timings" << std::endl;
#endif
#ifdef NO_FOLLY_SUPPORT
std::cout << "Note: Folly queue not supported by this compiler, discount its timings" << std::endl;
#endif
- std::cout << std::setw(BENCHMARK_NAME_MAX) << " " << " |----------- Min ------------|------------ Max ------------|------------ Avg ------------|\n";
- std::cout << std::left << std::setw(BENCHMARK_NAME_MAX) << "Benchmark" << " | RWQ | SPSC | Folly | RWQ | SPSC | Folly | RWQ | SPSC | Folly | xSPSC | xFolly\n";
+ std::cout << std::setw(BENCHMARK_NAME_MAX) << " " << " |---------------- Min -----------------|----------------- Max -----------------|----------------- Avg -----------------|\n";
+ std::cout << std::left << std::setw(BENCHMARK_NAME_MAX) << "Benchmark" << " | RWQ | BRWCB | SPSC | Folly | RWQ | BRWCB | SPSC | Folly | RWQ | BRWCB | SPSC | Folly | xSPSC | xFolly\n";
std::cout.fill('-');
- std::cout << std::setw(BENCHMARK_NAME_MAX) << "---------" << "-+---------+---------+---------+---------+---------+---------+---------+---------+---------+-------+-------\n";
+ std::cout << std::setw(BENCHMARK_NAME_MAX) << "---------" << "-+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+-------+-------\n";
std::cout.fill(' ');
- double rwqOpsPerSec = 0, spscOpsPerSec = 0, follyOpsPerSec = 0;
+ double rwqOpsPerSec = 0, brwcbOpsPerSec = 0, spscOpsPerSec = 0, follyOpsPerSec = 0;
int opTimedBenchmarks = 0;
for (int benchmark = 0; benchmark < BENCHMARK_COUNT; ++benchmark) {
double rwqMin = rwqResults[benchmark][0], rwqMax = rwqResults[benchmark][max - 1];
+ double brwcbMin = brwcbResults[benchmark][0], brwcbMax = brwcbResults[benchmark][max - 1];
double spscMin = spscResults[benchmark][0], spscMax = spscResults[benchmark][max - 1];
double follyMin = follyResults[benchmark][0], follyMax = follyResults[benchmark][max - 1];
double rwqAvg = std::accumulate(&rwqResults[benchmark][0], &rwqResults[benchmark][0] + max, 0.0) / max;
+ double brwcbAvg = std::accumulate(&brwcbResults[benchmark][0], &brwcbResults[benchmark][0] + max, 0.0) / max;
double spscAvg = std::accumulate(&spscResults[benchmark][0], &spscResults[benchmark][0] + max, 0.0) / max;
double follyAvg = std::accumulate(&follyResults[benchmark][0], &follyResults[benchmark][0] + max, 0.0) / max;
double spscMult = rwqAvg < 0.00001 ? 0 : spscAvg / rwqAvg;
@@ -151,9 +182,11 @@ int main(int argc, char** argv)
if (rwqResults[benchmark][0] != -1) {
double rwqTotalAvg = std::accumulate(&rwqResults[benchmark][0], &rwqResults[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT;
+ double brwcbTotalAvg = std::accumulate(&brwcbResults[benchmark][0], &brwcbResults[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT;
double spscTotalAvg = std::accumulate(&spscResults[benchmark][0], &spscResults[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT;
double follyTotalAvg = std::accumulate(&follyResults[benchmark][0], &follyResults[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT;
rwqOpsPerSec += rwqTotalAvg == 0 ? 0 : std::accumulate(&rwqOps[benchmark][0], &rwqOps[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT / rwqTotalAvg;
+ brwcbOpsPerSec += brwcbTotalAvg == 0 ? 0 : std::accumulate(&brwcbOps[benchmark][0], &brwcbOps[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT / brwcbTotalAvg;
spscOpsPerSec += spscTotalAvg == 0 ? 0 : std::accumulate(&spscOps[benchmark][0], &spscOps[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT / spscTotalAvg;
follyOpsPerSec += follyTotalAvg == 0 ? 0 : std::accumulate(&follyOps[benchmark][0], &follyOps[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT / follyTotalAvg;
++opTimedBenchmarks;
@@ -162,12 +195,15 @@ int main(int argc, char** argv)
std::cout
<< std::left << std::setw(BENCHMARK_NAME_MAX) << benchmarkName((BenchmarkType)benchmark) << " | "
<< std::fixed << std::setprecision(4) << rwqMin << "s | "
+ << std::fixed << std::setprecision(4) << brwcbMin << "s | "
<< std::fixed << std::setprecision(4) << spscMin << "s | "
<< std::fixed << std::setprecision(4) << follyMin << "s | "
<< std::fixed << std::setprecision(4) << rwqMax << "s | "
+ << std::fixed << std::setprecision(4) << brwcbMax << "s | "
<< std::fixed << std::setprecision(4) << spscMax << "s | "
<< std::fixed << std::setprecision(4) << follyMax << "s | "
<< std::fixed << std::setprecision(4) << rwqAvg << "s | "
+ << std::fixed << std::setprecision(4) << brwcbAvg << "s | "
<< std::fixed << std::setprecision(4) << spscAvg << "s | "
<< std::fixed << std::setprecision(4) << follyAvg << "s | "
<< std::fixed << std::setprecision(2) << spscMult << "x | "
@@ -177,14 +213,16 @@ int main(int argc, char** argv)
}
rwqOpsPerSec /= opTimedBenchmarks;
+ brwcbOpsPerSec /= opTimedBenchmarks;
spscOpsPerSec /= opTimedBenchmarks;
follyOpsPerSec /= opTimedBenchmarks;
std::cout
<< "\nAverage ops/s:\n"
- << " ReaderWriterQueue: " << std::fixed << std::setprecision(2) << rwqOpsPerSec / 1000000 << " million\n"
- << " SPSC queue: " << std::fixed << std::setprecision(2) << spscOpsPerSec / 1000000 << " million\n"
- << " Folly queue: " << std::fixed << std::setprecision(2) << follyOpsPerSec / 1000000 << " million\n"
+ << " ReaderWriterQueue: " << std::fixed << std::setprecision(2) << rwqOpsPerSec / 1000000 << " million\n"
+ << " BlockingReaderWriterCircularBuffer: " << std::fixed << std::setprecision(2) << brwcbOpsPerSec / 1000000 << " million\n"
+ << " SPSC queue: " << std::fixed << std::setprecision(2) << spscOpsPerSec / 1000000 << " million\n"
+ << " Folly queue: " << std::fixed << std::setprecision(2) << follyOpsPerSec / 1000000 << " million\n"
;
std::cout << std::endl;
=====================================
benchmarks/makefile
=====================================
@@ -15,7 +15,7 @@ endif
default: benchmarks$(EXT)
-benchmarks$(EXT): bench.cpp ../readerwriterqueue.h ../atomicops.h ext/1024cores/spscqueue.h ext/folly/ProducerConsumerQueue.h ../tests/common/simplethread.h ../tests/common/simplethread.cpp systemtime.h systemtime.cpp makefile
+benchmarks$(EXT): bench.cpp ../readerwriterqueue.h ../readerwritercircularbuffer.h ../atomicops.h ext/1024cores/spscqueue.h ext/folly/ProducerConsumerQueue.h ../tests/common/simplethread.h ../tests/common/simplethread.cpp systemtime.h systemtime.cpp makefile
g++ -std=c++11 -Wpedantic -Wall -DNDEBUG -O3 -g bench.cpp ../tests/common/simplethread.cpp systemtime.cpp -o benchmarks$(EXT) -pthread $(PLATFORM_OPTS)
run: benchmarks$(EXT)
=====================================
debian/changelog
=====================================
@@ -1,11 +1,12 @@
-readerwriterqueue (1.0.4-1) UNRELEASED; urgency=medium
+readerwriterqueue (1.0.5-1) UNRELEASED; urgency=medium
* Team Upload.
- * New upstream version 1.0.4
+ * New upstream version 1.0.5
* Declare compliance with policy 4.5.1
* Propagate hardening options to fix blhc
+ * Install readerwritercircularbuffer.h to /usr/include
- -- Nilesh Patra <nilesh at debian.org> Wed, 21 Apr 2021 19:49:39 +0530
+ -- Nilesh Patra <nilesh at debian.org> Mon, 03 May 2021 20:40:12 +0530
readerwriterqueue (1.0.3-1) unstable; urgency=medium
=====================================
debian/libreaderwriterqueue-dev.install
=====================================
@@ -1,2 +1,3 @@
atomicops.h usr/include
readerwriterqueue.h usr/include
+readerwritercircularbuffer.h usr/include
=====================================
debian/patches/hardening.patch
=====================================
@@ -6,7 +6,7 @@ Last-Update: 2021-04-21
@@ -16,7 +16,7 @@
default: benchmarks$(EXT)
- benchmarks$(EXT): bench.cpp ../readerwriterqueue.h ../atomicops.h ext/1024cores/spscqueue.h ext/folly/ProducerConsumerQueue.h ../tests/common/simplethread.h ../tests/common/simplethread.cpp systemtime.h systemtime.cpp makefile
+ benchmarks$(EXT): bench.cpp ../readerwriterqueue.h ../readerwritercircularbuffer.h ../atomicops.h ext/1024cores/spscqueue.h ext/folly/ProducerConsumerQueue.h ../tests/common/simplethread.h ../tests/common/simplethread.cpp systemtime.h systemtime.cpp makefile
- g++ -std=c++11 -Wpedantic -Wall -DNDEBUG -O3 -g bench.cpp ../tests/common/simplethread.cpp systemtime.cpp -o benchmarks$(EXT) -pthread $(PLATFORM_OPTS)
+ g++ $(CPPFLAGS) $(CXXFLAGS) -std=c++11 -Wpedantic -Wall -DNDEBUG -O3 -g bench.cpp ../tests/common/simplethread.cpp systemtime.cpp -o benchmarks$(EXT) -pthread $(PLATFORM_OPTS) $(LDFLAGS)
@@ -28,9 +28,9 @@ Last-Update: 2021-04-21
@@ -21,7 +21,7 @@
default: unittests$(EXT)
- unittests$(EXT): unittests.cpp ../../readerwriterqueue.h ../../atomicops.h ../common/simplethread.h ../common/simplethread.cpp minitest.h makefile
+ unittests$(EXT): unittests.cpp ../../readerwriterqueue.h ../../readerwritercircularbuffer.h ../../atomicops.h ../common/simplethread.h ../common/simplethread.cpp minitest.h makefile
- g++ $(PLATFORM_OPTS) -std=c++11 -Wsign-conversion -Wpedantic -Wall -DNDEBUG -O3 -g unittests.cpp ../common/simplethread.cpp -o unittests$(EXT) -pthread $(PLATFORM_LD_OPTS)
-+ g++ $(CXXFLAGS) $(CPPFLAGS) $(PLATFORM_OPTS) -std=c++11 -Wsign-conversion -Wpedantic -Wall -DNDEBUG -O3 -g unittests.cpp ../common/simplethread.cpp -o unittests$(EXT) -pthread $(PLATFORM_LD_OPTS) $(LDFLAGS)
++ g++ $(CPPFLAGS) $(CXXFLAGS) $(PLATFORM_OPTS) -std=c++11 -Wsign-conversion -Wpedantic -Wall -DNDEBUG -O3 -g unittests.cpp ../common/simplethread.cpp -o unittests$(EXT) -pthread $(PLATFORM_LD_OPTS) $(LDFLAGS)
run: unittests$(EXT)
./unittests$(EXT)
=====================================
readerwritercircularbuffer.h
=====================================
@@ -0,0 +1,288 @@
+// ©2020 Cameron Desrochers.
+// Distributed under the simplified BSD license (see the license file that
+// should have come with this header).
+
+// Provides a C++11 implementation of a single-producer, single-consumer wait-free concurrent
+// circular buffer (fixed-size queue).
+
+#pragma once
+
+#include <utility>
+#include <chrono>
+#include <memory>
+#include <cstdlib>
+#include <cstdint>
+#include <cassert>
+
+// Note that this implementation is fully modern C++11 (not compatible with old MSVC versions)
+// but we still include atomicops.h for its LightweightSemaphore implementation.
+#include "atomicops.h"
+
+#ifndef MOODYCAMEL_CACHE_LINE_SIZE
+#define MOODYCAMEL_CACHE_LINE_SIZE 64
+#endif
+
+namespace moodycamel {
+
+template<typename T>
+class BlockingReaderWriterCircularBuffer
+{
+public:
+ typedef T value_type;
+
+public:
+ explicit BlockingReaderWriterCircularBuffer(std::size_t capacity)
+ : maxcap(capacity), mask(), rawData(), data(),
+ slots(new spsc_sema::LightweightSemaphore(static_cast<spsc_sema::LightweightSemaphore::ssize_t>(capacity))),
+ items(new spsc_sema::LightweightSemaphore(0)),
+ nextSlot(0), nextItem(0)
+ {
+ // Round capacity up to power of two to compute modulo mask.
+ // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
+ --capacity;
+ capacity |= capacity >> 1;
+ capacity |= capacity >> 2;
+ capacity |= capacity >> 4;
+ for (std::size_t i = 1; i < sizeof(std::size_t); i <<= 1)
+ capacity |= capacity >> (i << 3);
+ mask = capacity++;
+ rawData = static_cast<char*>(std::malloc(capacity * sizeof(T) + std::alignment_of<T>::value - 1));
+ data = align_for<T>(rawData);
+ }
+
+ BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer&& other)
+ : maxcap(0), mask(0), rawData(nullptr), data(nullptr),
+ slots(new spsc_sema::LightweightSemaphore(0)),
+ items(new spsc_sema::LightweightSemaphore(0)),
+ nextSlot(), nextItem()
+ {
+ swap(other);
+ }
+
+ BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer const&) = delete;
+
+ // Note: The queue should not be accessed concurrently while it's
+ // being deleted. It's up to the user to synchronize this.
+ ~BlockingReaderWriterCircularBuffer()
+ {
+ for (std::size_t i = 0, n = items->availableApprox(); i != n; ++i)
+ reinterpret_cast<T*>(data)[(nextItem + i) & mask].~T();
+ std::free(rawData);
+ }
+
+ BlockingReaderWriterCircularBuffer& operator=(BlockingReaderWriterCircularBuffer&& other) noexcept
+ {
+ swap(other);
+ return *this;
+ }
+
+ BlockingReaderWriterCircularBuffer& operator=(BlockingReaderWriterCircularBuffer const&) = delete;
+
+ // Swaps the contents of this buffer with the contents of another.
+ // Not thread-safe.
+ void swap(BlockingReaderWriterCircularBuffer& other) noexcept
+ {
+ std::swap(maxcap, other.maxcap);
+ std::swap(mask, other.mask);
+ std::swap(rawData, other.rawData);
+ std::swap(data, other.data);
+ std::swap(slots, other.slots);
+ std::swap(items, other.items);
+ std::swap(nextSlot, other.nextSlot);
+ std::swap(nextItem, other.nextItem);
+ }
+
+ // Enqueues a single item (by copying it).
+ // Fails if not enough room to enqueue.
+ // Thread-safe when called by producer thread.
+ // No exception guarantee (state will be corrupted) if constructor of T throws.
+ bool try_enqueue(T const& item)
+ {
+ if (!slots->tryWait())
+ return false;
+ inner_enqueue(item);
+ return true;
+ }
+
+ // Enqueues a single item (by moving it, if possible).
+ // Fails if not enough room to enqueue.
+ // Thread-safe when called by producer thread.
+ // No exception guarantee (state will be corrupted) if constructor of T throws.
+ bool try_enqueue(T&& item)
+ {
+ if (!slots->tryWait())
+ return false;
+ inner_enqueue(std::move(item));
+ return true;
+ }
+
+ // Blocks the current thread until there's enough space to enqueue the given item,
+ // then enqueues it (via copy).
+ // Thread-safe when called by producer thread.
+ // No exception guarantee (state will be corrupted) if constructor of T throws.
+ void wait_enqueue(T const& item)
+ {
+ while (!slots->wait());
+ inner_enqueue(item);
+ }
+
+ // Blocks the current thread until there's enough space to enqueue the given item,
+ // then enqueues it (via move, if possible).
+ // Thread-safe when called by producer thread.
+ // No exception guarantee (state will be corrupted) if constructor of T throws.
+ void wait_enqueue(T&& item)
+ {
+ while (!slots->wait());
+ inner_enqueue(std::move(item));
+ }
+
+ // Blocks the current thread until there's enough space to enqueue the given item,
+ // or the timeout expires. Returns false without enqueueing the item if the timeout
+ // expires, otherwise enqueues the item (via copy) and returns true.
+ // Thread-safe when called by producer thread.
+ // No exception guarantee (state will be corrupted) if constructor of T throws.
+ bool wait_enqueue_timed(T const& item, std::int64_t timeout_usecs)
+ {
+ if (!slots->wait(timeout_usecs))
+ return false;
+ inner_enqueue(item);
+ return true;
+ }
+
+ // Blocks the current thread until there's enough space to enqueue the given item,
+ // or the timeout expires. Returns false without enqueueing the item if the timeout
+ // expires, otherwise enqueues the item (via move, if possible) and returns true.
+ // Thread-safe when called by producer thread.
+ // No exception guarantee (state will be corrupted) if constructor of T throws.
+ bool wait_enqueue_timed(T&& item, std::int64_t timeout_usecs)
+ {
+ if (!slots->wait(timeout_usecs))
+ return false;
+ inner_enqueue(std::move(item));
+ return true;
+ }
+
+ // Blocks the current thread until there's enough space to enqueue the given item,
+ // or the timeout expires. Returns false without enqueueing the item if the timeout
+ // expires, otherwise enqueues the item (via copy) and returns true.
+ // Thread-safe when called by producer thread.
+ // No exception guarantee (state will be corrupted) if constructor of T throws.
+ template<typename Rep, typename Period>
+ inline bool wait_enqueue_timed(T const& item, std::chrono::duration<Rep, Period> const& timeout)
+ {
+ return wait_enqueue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
+ }
+
+ // Blocks the current thread until there's enough space to enqueue the given item,
+ // or the timeout expires. Returns false without enqueueing the item if the timeout
+ // expires, otherwise enqueues the item (via move, if possible) and returns true.
+ // Thread-safe when called by producer thread.
+ // No exception guarantee (state will be corrupted) if constructor of T throws.
+ template<typename Rep, typename Period>
+ inline bool wait_enqueue_timed(T&& item, std::chrono::duration<Rep, Period> const& timeout)
+ {
+ return wait_enqueue_timed(std::move(item), std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
+ }
+
+ // Attempts to dequeue a single item.
+ // Returns false if the buffer is empty.
+ // Thread-safe when called by consumer thread.
+ // No exception guarantee (state will be corrupted) if assignment operator of U throws.
+ template<typename U>
+ bool try_dequeue(U& item)
+ {
+ if (!items->tryWait())
+ return false;
+ inner_dequeue(item);
+ return true;
+ }
+
+ // Blocks the current thread until there's something to dequeue, then dequeues it.
+ // Thread-safe when called by consumer thread.
+ // No exception guarantee (state will be corrupted) if assignment operator of U throws.
+ template<typename U>
+ void wait_dequeue(U& item)
+ {
+ while (!items->wait());
+ inner_dequeue(item);
+ }
+
+ // Blocks the current thread until either there's something to dequeue
+ // or the timeout expires. Returns false without setting `item` if the
+ // timeout expires, otherwise assigns to `item` and returns true.
+ // Thread-safe when called by consumer thread.
+ // No exception guarantee (state will be corrupted) if assignment operator of U throws.
+ template<typename U>
+ bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
+ {
+ if (!items->wait(timeout_usecs))
+ return false;
+ inner_dequeue(item);
+ return true;
+ }
+
+ // Blocks the current thread until either there's something to dequeue
+ // or the timeout expires. Returns false without setting `item` if the
+ // timeout expires, otherwise assigns to `item` and returns true.
+ // Thread-safe when called by consumer thread.
+ // No exception guarantee (state will be corrupted) if assignment operator of U throws.
+ template<typename U, typename Rep, typename Period>
+ inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
+ {
+ return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
+ }
+
+ // Returns a (possibly outdated) snapshot of the total number of elements currently in the buffer.
+ // Thread-safe.
+ inline std::size_t size_approx() const
+ {
+ return items->availableApprox();
+ }
+
+ // Returns the maximum number of elements that this circular buffer can hold at once.
+ // Thread-safe.
+ inline std::size_t max_capacity() const
+ {
+ return maxcap;
+ }
+
+private:
+ template<typename U>
+ void inner_enqueue(U&& item)
+ {
+ std::size_t i = nextSlot++;
+ new (reinterpret_cast<T*>(data) + (i & mask)) T(std::forward<U>(item));
+ items->signal();
+ }
+
+ template<typename U>
+ void inner_dequeue(U& item)
+ {
+ std::size_t i = nextItem++;
+ T& element = reinterpret_cast<T*>(data)[i & mask];
+ item = std::move(element);
+ element.~T();
+ slots->signal();
+ }
+
+ template<typename U>
+ static inline char* align_for(char* ptr)
+ {
+ const std::size_t alignment = std::alignment_of<U>::value;
+ return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
+ }
+
+private:
+ std::size_t maxcap; // actual (non-power-of-two) capacity
+ std::size_t mask; // circular buffer capacity mask (for cheap modulo)
+ char* rawData; // raw circular buffer memory
+ char* data; // circular buffer memory aligned to element alignment
+ std::unique_ptr<spsc_sema::LightweightSemaphore> slots; // number of slots currently free
+ std::unique_ptr<spsc_sema::LightweightSemaphore> items; // number of elements currently enqueued
+ char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(char*) * 2 - sizeof(std::size_t) * 2 - sizeof(std::unique_ptr<spsc_sema::LightweightSemaphore>) * 2];
+ std::size_t nextSlot; // index of next free slot to enqueue into
+ char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(std::size_t)];
+ std::size_t nextItem; // index of next element to dequeue from
+};
+
+}
=====================================
readerwriterqueue.h
=====================================
@@ -22,8 +22,8 @@
// A lock-free queue for a single-consumer, single-producer architecture..
// The queue is also wait-free in the common path (except if more memory
// needs to be allocated, in which case malloc is called).
-// Allocates memory sparingly (O(lg(n) times, amortized), and only once if
-// the original maximum size estimate is never exceeded.
+// Allocates memory sparingly, and only once if the original maximum size
+// estimate is never exceeded.
// Tested on x86/x64 processors, but semantics should be correct for all
// architectures (given the right implementations in atomicops.h), provided
// that aligned integer and pointer accesses are naturally atomic.
=====================================
tests/unittests/makefile
=====================================
@@ -20,7 +20,7 @@ endif
default: unittests$(EXT)
-unittests$(EXT): unittests.cpp ../../readerwriterqueue.h ../../atomicops.h ../common/simplethread.h ../common/simplethread.cpp minitest.h makefile
+unittests$(EXT): unittests.cpp ../../readerwriterqueue.h ../../readerwritercircularbuffer.h ../../atomicops.h ../common/simplethread.h ../common/simplethread.cpp minitest.h makefile
g++ $(PLATFORM_OPTS) -std=c++11 -Wsign-conversion -Wpedantic -Wall -DNDEBUG -O3 -g unittests.cpp ../common/simplethread.cpp -o unittests$(EXT) -pthread $(PLATFORM_LD_OPTS)
run: unittests$(EXT)
=====================================
tests/unittests/unittests.cpp
=====================================
@@ -10,6 +10,7 @@
#include "minitest.h"
#include "../common/simplethread.h"
#include "../../readerwriterqueue.h"
+#include "../../readerwritercircularbuffer.h"
using namespace moodycamel;
@@ -108,6 +109,7 @@ public:
REGISTER_TEST(try_enqueue_fail_workaround);
REGISTER_TEST(try_emplace_fail);
#endif
+ REGISTER_TEST(blocking_circular_buffer);
}
bool create_empty_queue()
@@ -693,6 +695,120 @@ public:
return true;
}
#endif
+
+ bool blocking_circular_buffer()
+ {
+ {
+ // Basic enqueue
+ BlockingReaderWriterCircularBuffer<int> q(65);
+ for (int iteration = 0; iteration != 128; ++iteration) { // check there's no problem with mismatch between nominal and allocated capacity
+ ASSERT_OR_FAIL(q.max_capacity() == 65);
+ ASSERT_OR_FAIL(q.size_approx() == 0);
+ ASSERT_OR_FAIL(q.try_enqueue(0));
+ ASSERT_OR_FAIL(q.max_capacity() == 65);
+ ASSERT_OR_FAIL(q.size_approx() == 1);
+ for (int i = 1; i != 65; ++i)
+ q.wait_enqueue(i);
+ ASSERT_OR_FAIL(q.size_approx() == 65);
+ ASSERT_OR_FAIL(!q.try_enqueue(65));
+
+ // Basic dequeue
+ int item;
+ ASSERT_OR_FAIL(q.try_dequeue(item));
+ ASSERT_OR_FAIL(item == 0);
+ for (int i = 1; i != 65; ++i) {
+ q.wait_dequeue(item);
+ ASSERT_OR_FAIL(item == i);
+ }
+ ASSERT_OR_FAIL(!q.try_dequeue(item));
+ ASSERT_OR_FAIL(!q.wait_dequeue_timed(item, 1));
+ ASSERT_OR_FAIL(item == 64);
+ }
+ }
+
+ {
+ // Zero capacity
+ BlockingReaderWriterCircularBuffer<int> q(0);
+ ASSERT_OR_FAIL(q.max_capacity() == 0);
+ ASSERT_OR_FAIL(!q.try_enqueue(1));
+ ASSERT_OR_FAIL(!q.wait_enqueue_timed(1, 0));
+ }
+
+ // Element lifetimes
+ Foo::reset();
+ {
+ BlockingReaderWriterCircularBuffer<Foo> q(31);
+ {
+ Foo item;
+ for (int i = 0; i != 23 + 32; ++i) {
+ ASSERT_OR_FAIL(q.try_enqueue(Foo()));
+ ASSERT_OR_FAIL(q.try_dequeue(item));
+ }
+ ASSERT_OR_FAIL(Foo::destroy_count() == 23 + 32);
+ ASSERT_OR_FAIL(Foo::destroyed_in_order());
+ }
+ Foo::reset();
+
+ {
+ Foo item;
+ for (int i = 0; i != 10; ++i)
+ ASSERT_OR_FAIL(q.try_enqueue(Foo()));
+ ASSERT_OR_FAIL(q.size_approx() == 10);
+ ASSERT_OR_FAIL(Foo::destroy_count() == 0);
+ ASSERT_OR_FAIL(q.try_dequeue(item));
+ ASSERT_OR_FAIL(q.size_approx() == 9);
+ ASSERT_OR_FAIL(Foo::destroy_count() == 1);
+ }
+ ASSERT_OR_FAIL(Foo::destroy_count() == 2);
+ ASSERT_OR_FAIL(Foo::destroyed_in_order());
+
+ BlockingReaderWriterCircularBuffer<Foo> q2(std::move(q));
+ ASSERT_OR_FAIL(q.size_approx() == 0);
+ ASSERT_OR_FAIL(q2.size_approx() == 9);
+
+ BlockingReaderWriterCircularBuffer<Foo> q3(2);
+ q3 = std::move(q2);
+ ASSERT_OR_FAIL(q.size_approx() == 0);
+ ASSERT_OR_FAIL(q2.size_approx() == 0);
+ ASSERT_OR_FAIL(q3.size_approx() == 9);
+
+ q = std::move(q2);
+ ASSERT_OR_FAIL(q.size_approx() == 0);
+ ASSERT_OR_FAIL(q2.size_approx() == 0);
+ ASSERT_OR_FAIL(q3.size_approx() == 9);
+ ASSERT_OR_FAIL(Foo::destroy_count() == 2);
+ }
+ ASSERT_OR_FAIL(Foo::destroy_count() == 11);
+ ASSERT_OR_FAIL(Foo::destroyed_in_order());
+
+ weak_atomic<int> result;
+ result = 1;
+
+ {
+ // Threaded
+ BlockingReaderWriterCircularBuffer<int> q(8);
+ SimpleThread reader([&]() {
+ int item;
+ for (int i = 0; i != 1000000; ++i) {
+ q.wait_dequeue(item);
+ if (item != i)
+ result = 0;
+ }
+ });
+ SimpleThread writer([&]() {
+ for (int i = 0; i != 1000000; ++i)
+ q.wait_enqueue(i);
+ });
+
+ writer.join();
+ reader.join();
+
+ ASSERT_OR_FAIL(q.size_approx() == 0);
+ ASSERT_OR_FAIL(result.load());
+ }
+
+ return true;
+ }
};
View it on GitLab: https://salsa.debian.org/med-team/readerwriterqueue/-/compare/7fa8798bbbd84911a1e1ea6c97a230554732843f...14d52c2dd85d187833d58d76dc0b76760b246890
--
View it on GitLab: https://salsa.debian.org/med-team/readerwriterqueue/-/compare/7fa8798bbbd84911a1e1ea6c97a230554732843f...14d52c2dd85d187833d58d76dc0b76760b246890
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20210503/90b595c0/attachment-0001.htm>
More information about the debian-med-commit
mailing list