[med-svn] [Git][med-team/readerwriterqueue][master] 5 commits: New upstream version 1.0.5

Nilesh Patra gitlab at salsa.debian.org
Mon May 3 16:14:59 BST 2021



Nilesh Patra pushed to branch master at Debian Med / readerwriterqueue


Commits:
26a2b0f8 by Nilesh Patra at 2021-05-03T20:33:45+05:30
New upstream version 1.0.5
- - - - -
cc0b762f by Nilesh Patra at 2021-05-03T20:33:46+05:30
Update upstream source from tag 'upstream/1.0.5'

Update to upstream version '1.0.5'
with Debian dir 7a4df1bb12a9520b731e64fc3e43c81c507c978a
- - - - -
1af24146 by Nilesh Patra at 2021-05-03T20:38:17+05:30
Refresh patch

- - - - -
eecd2934 by Nilesh Patra at 2021-05-03T20:39:46+05:30
Install readerwritercircularbuffer.h to /usr/include

- - - - -
14d52c2d by Nilesh Patra at 2021-05-03T20:44:28+05:30
Update Interim changelog entry

- - - - -


13 changed files:

- CMakeLists.txt
- LICENSE.md
- README.md
- atomicops.h
- benchmarks/bench.cpp
- benchmarks/makefile
- debian/changelog
- debian/libreaderwriterqueue-dev.install
- debian/patches/hardening.patch
- + readerwritercircularbuffer.h
- readerwriterqueue.h
- tests/unittests/makefile
- tests/unittests/unittests.cpp


Changes:

=====================================
CMakeLists.txt
=====================================
@@ -7,5 +7,5 @@ add_library(${PROJECT_NAME} INTERFACE)
 
 target_include_directories(readerwriterqueue INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
 
-install(FILES atomicops.h readerwriterqueue.h LICENSE.md
+install(FILES atomicops.h readerwriterqueue.h readerwritercircularbuffer.h LICENSE.md
         DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${PROJECT_NAME})


=====================================
LICENSE.md
=====================================
@@ -1,11 +1,11 @@
 This license applies to all the code in this repository except that written by third
 parties, namely the files in benchmarks/ext, which have their own licenses, and Jeff
-Preshing's semaphore implementation (used in the blocking queue) which has a zlib
+Preshing's semaphore implementation (used in the blocking queues) which has a zlib
 license (embedded in atomicops.h).
 
 Simplified BSD License:
 
-Copyright (c) 2013-2015, Cameron Desrochers  
+Copyright (c) 2013-2021, Cameron Desrochers  
 All rights reserved.
 
 Redistribution and use in source and binary forms, with or without modification,


=====================================
README.md
=====================================
@@ -7,6 +7,8 @@ you could use this queue completely from a single thread if you wish (but that w
 
 Note: If you need a general-purpose multi-producer, multi-consumer lock free queue, I have [one of those too][mpmc].
 
+This repository also includes a [circular-buffer SPSC queue][circular] which supports blocking on enqueue as well as dequeue.
+
 
 ## Features
 
@@ -25,7 +27,7 @@ Note: If you need a general-purpose multi-producer, multi-consumer lock free que
 
 ## Use
 
-Simply drop the readerwriterqueue.h and atomicops.h files into your source code and include them :-)
+Simply drop the readerwriterqueue.h (or readerwritercircularbuffer.h) and atomicops.h files into your source code and include them :-)
 A modern compiler is required (MSVC2010+, GCC 4.7+, ICC 13+, or any C++11 compliant compiler should work).
 
 Note: If you're using GCC, you really do need GCC 4.7 or above -- [4.6 has a bug][gcc46bug] that prevents the atomic fence primitives
@@ -94,6 +96,25 @@ means care must be taken to only call `wait_dequeue` if you're sure another elem
 will come along eventually, or if the queue has a static lifetime. This is because
 destroying the queue while a thread is waiting on it will invoke undefined behaviour.
 
+The blocking circular buffer has a fixed number of slots, but is otherwise quite similar to
+use:
+
+```cpp
+BlockingReaderWriterCircularBuffer<int> q(1024);  // pass initial capacity
+
+q.try_enqueue(1);
+int number;
+q.try_dequeue(number);
+assert(number == 1);
+
+q.wait_enqueue(123);
+q.wait_dequeue(number);
+assert(number == 123);
+
+q.wait_dequeue_timed(number, std::chrono::milliseconds(10));
+```
+
+
 ## CMake installation
 As an alternative to including the source files in your project directly,
 you can use CMake to install the library in your system's include directory:
@@ -118,14 +139,6 @@ includes all modern processors (e.g. x86/x86-64, ARM, and PowerPC). *Not* for us
 Note that it's only been tested on x86(-64); if someone has access to other processors I'd love to run some tests on
 anything that's not x86-based.
 
-Finally, I am not an expert. This is my first foray into lock-free programming, and though I'm confident in the code,
-it's possible that there are bugs despite the effort I put into designing and testing this data structure.
-
-Use this code at your own risk; in particular, lock-free programming is a patent minefield, and this code may very
-well violate a pending patent (I haven't looked). It's worth noting that I came up with this algorithm and
-implementation from scratch, independent of any existing lock-free queues.
-
-
 ## More info
 
 See the [LICENSE.md][license] file for the license (simplified BSD).
@@ -139,3 +152,4 @@ about lock-free programming.
 [benchmarks]: http://moodycamel.com/blog/2013/a-fast-lock-free-queue-for-c++#benchmarks
 [gcc46bug]: http://stackoverflow.com/questions/16429669/stdatomic-thread-fence-has-undefined-reference
 [mpmc]: https://github.com/cameron314/concurrentqueue
+[circular]: readerwritercircularbuffer.h


=====================================
atomicops.h
=====================================
@@ -239,7 +239,7 @@ template<typename T>
 class weak_atomic
 {
 public:
-	AE_NO_TSAN weak_atomic() { }
+	AE_NO_TSAN weak_atomic() : value() { }
 #ifdef AE_VCPP
 #pragma warning(push)
 #pragma warning(disable: 4100)		// Get rid of (erroneous) 'unreferenced formal parameter' warning
@@ -389,7 +389,7 @@ namespace moodycamel
 		    Semaphore& operator=(const Semaphore& other);
 
 		public:
-		    AE_NO_TSAN Semaphore(int initialCount = 0)
+		    AE_NO_TSAN Semaphore(int initialCount = 0) : m_hSema()
 		    {
 		        assert(initialCount >= 0);
 		        const long maxLong = 0x7fffffff;
@@ -437,7 +437,7 @@ namespace moodycamel
 		    Semaphore& operator=(const Semaphore& other);
 
 		public:
-		    AE_NO_TSAN Semaphore(int initialCount = 0)
+		    AE_NO_TSAN Semaphore(int initialCount = 0) : m_sema()
 		    {
 		        assert(initialCount >= 0);
 		        kern_return_t rc = semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
@@ -497,7 +497,7 @@ namespace moodycamel
 		    Semaphore& operator=(const Semaphore& other);
 
 		public:
-		    AE_NO_TSAN Semaphore(int initialCount = 0)
+		    AE_NO_TSAN Semaphore(int initialCount = 0) : m_sema()
 		    {
 		        assert(initialCount >= 0);
 		        int rc = sem_init(&m_sema, 0, static_cast<unsigned int>(initialCount));
@@ -626,7 +626,7 @@ namespace moodycamel
 		    }
 
 		public:
-		    AE_NO_TSAN LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
+		    AE_NO_TSAN LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount), m_sema()
 		    {
 		        assert(initialCount >= 0);
 		    }


=====================================
benchmarks/bench.cpp
=====================================
@@ -8,6 +8,10 @@
 #define NO_FOLLY_SUPPORT
 #endif
 
+#if defined(_MSC_VER) && _MSC_VER < 1700
+#define NO_CIRCULAR_BUFFER_SUPPORT
+#endif
+
 #if !defined(__amd64__) && !defined(_M_X64) && !defined(__x86_64__) && !defined(_M_IX86) && !defined(__i386__)
 #define NO_SPSC_SUPPORT  // SPSC implementation is for x86 only
 #endif
@@ -17,6 +21,15 @@
 #include "ext/folly/ProducerConsumerQueue.h"    // Facebook's folly (GitHub)
 #endif
 #include "../readerwriterqueue.h"               // Mine
+#ifndef NO_CIRCULAR_BUFFER_SUPPORT
+#include "../readerwritercircularbuffer.h"      // Mine
+template<typename T>
+class BlockingReaderWriterCircularBufferAdapter : public moodycamel::BlockingReaderWriterCircularBuffer<T> {
+public:
+	BlockingReaderWriterCircularBufferAdapter(std::size_t capacity) : moodycamel::BlockingReaderWriterCircularBuffer<T>(capacity) { }
+	void enqueue(T const& x) { this->wait_enqueue(x); }
+};
+#endif
 #include "systemtime.h"
 #include "../tests/common/simplethread.h"
 
@@ -75,11 +88,13 @@ int main(int argc, char** argv)
 	const double FASTEST_PERCENT_CONSIDERED = 20;		// Consider only the fastest runs in the top 20%
 
 	double rwqResults[BENCHMARK_COUNT][TEST_COUNT];
+	double brwcbResults[BENCHMARK_COUNT][TEST_COUNT];
 	double spscResults[BENCHMARK_COUNT][TEST_COUNT];
 	double follyResults[BENCHMARK_COUNT][TEST_COUNT];
 	
 	// Also calculate a rough heuristic of "ops/s" (across all runs, not just fastest)
 	double rwqOps[BENCHMARK_COUNT][TEST_COUNT];
+	double brwcbOps[BENCHMARK_COUNT][TEST_COUNT];
 	double spscOps[BENCHMARK_COUNT][TEST_COUNT];
 	double follyOps[BENCHMARK_COUNT][TEST_COUNT];
 
@@ -94,6 +109,16 @@ int main(int argc, char** argv)
 		for (int i = 0; i < TEST_COUNT; ++i) {
 			rwqResults[benchmark][i] = runBenchmark<ReaderWriterQueue<int>>((BenchmarkType)benchmark, randSeeds[benchmark], rwqOps[benchmark][i]);
 		}
+#ifndef NO_CIRCULAR_BUFFER_SUPPORT
+		for (int i = 0; i < TEST_COUNT; ++i) {
+			brwcbResults[benchmark][i] = runBenchmark<BlockingReaderWriterCircularBufferAdapter<int>>((BenchmarkType)benchmark, randSeeds[benchmark], brwcbOps[benchmark][i]);
+		}
+#else
+		for (int i = 0; i < TEST_COUNT; ++i) {
+			brwcbResults[benchmark][i] = 0;
+			brwcbOps[benchmark][i] = 0;
+		}
+#endif
 #ifndef NO_SPSC_SUPPORT
 		for (int i = 0; i < TEST_COUNT; ++i) {
 			spscResults[benchmark][i] = runBenchmark<spsc_queue<int>>((BenchmarkType)benchmark, randSeeds[benchmark], spscOps[benchmark][i]);
@@ -119,6 +144,7 @@ int main(int argc, char** argv)
 	// Sort results
 	for (int benchmark = 0; benchmark < BENCHMARK_COUNT; ++benchmark) {
 		std::sort(&rwqResults[benchmark][0], &rwqResults[benchmark][0] + TEST_COUNT);
+		std::sort(&brwcbResults[benchmark][0], &brwcbResults[benchmark][0] + TEST_COUNT);
 		std::sort(&spscResults[benchmark][0], &spscResults[benchmark][0] + TEST_COUNT);
 		std::sort(&follyResults[benchmark][0], &follyResults[benchmark][0] + TEST_COUNT);
 	}
@@ -126,24 +152,29 @@ int main(int argc, char** argv)
 	// Display results
 	int max = std::max(2, (int)(TEST_COUNT * FASTEST_PERCENT_CONSIDERED / 100));
 	assert(max > 0);
+#ifdef NO_CIRCULAR_BUFFER_SUPPORT
+	std::cout << "Note: BRWCB queue not supported on this platform, discount its timings" << std::endl;
+#endif
 #ifdef NO_SPSC_SUPPORT
 	std::cout << "Note: SPSC queue not supported on this platform, discount its timings" << std::endl;
 #endif
 #ifdef NO_FOLLY_SUPPORT
 	std::cout << "Note: Folly queue not supported by this compiler, discount its timings" << std::endl;
 #endif
-	std::cout              << std::setw(BENCHMARK_NAME_MAX) << "         " << " |-----------  Min ------------|------------ Max ------------|------------ Avg ------------|\n";
-	std::cout << std::left << std::setw(BENCHMARK_NAME_MAX) << "Benchmark" << " |   RWQ   |  SPSC   |  Folly  |   RWQ   |  SPSC   |  Folly  |   RWQ   |  SPSC   |  Folly  | xSPSC | xFolly\n";
+	std::cout              << std::setw(BENCHMARK_NAME_MAX) << "         " << " |----------------  Min -----------------|----------------- Max -----------------|----------------- Avg -----------------|\n";
+	std::cout << std::left << std::setw(BENCHMARK_NAME_MAX) << "Benchmark" << " |   RWQ   |  BRWCB  |  SPSC   |  Folly  |   RWQ   |  BRWCB  |  SPSC   |  Folly  |   RWQ   |  BRWCB  |  SPSC   |  Folly  | xSPSC | xFolly\n";
 	std::cout.fill('-');
-	std::cout              << std::setw(BENCHMARK_NAME_MAX) << "---------" << "-+---------+---------+---------+---------+---------+---------+---------+---------+---------+-------+-------\n";
+	std::cout              << std::setw(BENCHMARK_NAME_MAX) << "---------" << "-+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+-------+-------\n";
 	std::cout.fill(' ');
-	double rwqOpsPerSec = 0, spscOpsPerSec = 0, follyOpsPerSec = 0;
+	double rwqOpsPerSec = 0, brwcbOpsPerSec = 0, spscOpsPerSec = 0, follyOpsPerSec = 0;
 	int opTimedBenchmarks = 0;
 	for (int benchmark = 0; benchmark < BENCHMARK_COUNT; ++benchmark) {
 		double rwqMin = rwqResults[benchmark][0], rwqMax = rwqResults[benchmark][max - 1];
+		double brwcbMin = brwcbResults[benchmark][0], brwcbMax = brwcbResults[benchmark][max - 1];
 		double spscMin = spscResults[benchmark][0], spscMax = spscResults[benchmark][max - 1];
 		double follyMin = follyResults[benchmark][0], follyMax = follyResults[benchmark][max - 1];
 		double rwqAvg = std::accumulate(&rwqResults[benchmark][0], &rwqResults[benchmark][0] + max, 0.0) / max;
+		double brwcbAvg = std::accumulate(&brwcbResults[benchmark][0], &brwcbResults[benchmark][0] + max, 0.0) / max;
 		double spscAvg = std::accumulate(&spscResults[benchmark][0], &spscResults[benchmark][0] + max, 0.0) / max;
 		double follyAvg = std::accumulate(&follyResults[benchmark][0], &follyResults[benchmark][0] + max, 0.0) / max;
 		double spscMult = rwqAvg < 0.00001 ? 0 : spscAvg / rwqAvg;
@@ -151,9 +182,11 @@ int main(int argc, char** argv)
 
 		if (rwqResults[benchmark][0] != -1) {
 			double rwqTotalAvg = std::accumulate(&rwqResults[benchmark][0], &rwqResults[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT;
+			double brwcbTotalAvg = std::accumulate(&brwcbResults[benchmark][0], &brwcbResults[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT;
 			double spscTotalAvg = std::accumulate(&spscResults[benchmark][0], &spscResults[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT;
 			double follyTotalAvg = std::accumulate(&follyResults[benchmark][0], &follyResults[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT;
 			rwqOpsPerSec += rwqTotalAvg == 0 ? 0 : std::accumulate(&rwqOps[benchmark][0], &rwqOps[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT / rwqTotalAvg;
+			brwcbOpsPerSec += brwcbTotalAvg == 0 ? 0 : std::accumulate(&brwcbOps[benchmark][0], &brwcbOps[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT / brwcbTotalAvg;
 			spscOpsPerSec += spscTotalAvg == 0 ? 0 : std::accumulate(&spscOps[benchmark][0], &spscOps[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT / spscTotalAvg;
 			follyOpsPerSec += follyTotalAvg == 0 ? 0 : std::accumulate(&follyOps[benchmark][0], &follyOps[benchmark][0] + TEST_COUNT, 0.0) / TEST_COUNT / follyTotalAvg;
 			++opTimedBenchmarks;
@@ -162,12 +195,15 @@ int main(int argc, char** argv)
 		std::cout
 			<< std::left << std::setw(BENCHMARK_NAME_MAX) << benchmarkName((BenchmarkType)benchmark) << " | "
 			<< std::fixed << std::setprecision(4) << rwqMin << "s | "
+			<< std::fixed << std::setprecision(4) << brwcbMin << "s | "
 			<< std::fixed << std::setprecision(4) << spscMin << "s | "
 			<< std::fixed << std::setprecision(4) << follyMin << "s | "
 			<< std::fixed << std::setprecision(4) << rwqMax << "s | "
+			<< std::fixed << std::setprecision(4) << brwcbMax << "s | "
 			<< std::fixed << std::setprecision(4) << spscMax << "s | "
 			<< std::fixed << std::setprecision(4) << follyMax << "s | "
 			<< std::fixed << std::setprecision(4) << rwqAvg << "s | "
+			<< std::fixed << std::setprecision(4) << brwcbAvg << "s | "
 			<< std::fixed << std::setprecision(4) << spscAvg << "s | "
 			<< std::fixed << std::setprecision(4) << follyAvg << "s | "
 			<< std::fixed << std::setprecision(2) << spscMult << "x | "
@@ -177,14 +213,16 @@ int main(int argc, char** argv)
 	}
 
 	rwqOpsPerSec /= opTimedBenchmarks;
+	brwcbOpsPerSec /= opTimedBenchmarks;
 	spscOpsPerSec /= opTimedBenchmarks;
 	follyOpsPerSec /= opTimedBenchmarks;
 
 	std::cout
 		<< "\nAverage ops/s:\n"
-		<< "    ReaderWriterQueue: " << std::fixed << std::setprecision(2) << rwqOpsPerSec / 1000000 << " million\n"
-		<< "    SPSC queue:        " << std::fixed << std::setprecision(2) << spscOpsPerSec / 1000000 << " million\n"
-		<< "    Folly queue:       " << std::fixed << std::setprecision(2) << follyOpsPerSec / 1000000 << " million\n"
+		<< "    ReaderWriterQueue:                  " << std::fixed << std::setprecision(2) << rwqOpsPerSec / 1000000 << " million\n"
+		<< "    BlockingReaderWriterCircularBuffer: " << std::fixed << std::setprecision(2) << brwcbOpsPerSec / 1000000 << " million\n"
+		<< "    SPSC queue:                         " << std::fixed << std::setprecision(2) << spscOpsPerSec / 1000000 << " million\n"
+		<< "    Folly queue:                        " << std::fixed << std::setprecision(2) << follyOpsPerSec / 1000000 << " million\n"
 	;
 	std::cout << std::endl;
 


=====================================
benchmarks/makefile
=====================================
@@ -15,7 +15,7 @@ endif
 
 default: benchmarks$(EXT)
 
-benchmarks$(EXT): bench.cpp ../readerwriterqueue.h ../atomicops.h ext/1024cores/spscqueue.h ext/folly/ProducerConsumerQueue.h ../tests/common/simplethread.h ../tests/common/simplethread.cpp systemtime.h systemtime.cpp makefile
+benchmarks$(EXT): bench.cpp ../readerwriterqueue.h ../readerwritercircularbuffer.h ../atomicops.h ext/1024cores/spscqueue.h ext/folly/ProducerConsumerQueue.h ../tests/common/simplethread.h ../tests/common/simplethread.cpp systemtime.h systemtime.cpp makefile
 	g++ -std=c++11 -Wpedantic -Wall -DNDEBUG -O3 -g bench.cpp ../tests/common/simplethread.cpp systemtime.cpp -o benchmarks$(EXT) -pthread $(PLATFORM_OPTS)
 
 run: benchmarks$(EXT)


=====================================
debian/changelog
=====================================
@@ -1,11 +1,12 @@
-readerwriterqueue (1.0.4-1) UNRELEASED; urgency=medium
+readerwriterqueue (1.0.5-1) UNRELEASED; urgency=medium
 
   * Team Upload.
-  * New upstream version 1.0.4
+  * New upstream version 1.0.5
   * Declare compliance with policy 4.5.1
   * Propagate hardening options to fix blhc
+  * Install readerwritercircularbuffer.h to /usr/include
 
- -- Nilesh Patra <nilesh at debian.org>  Wed, 21 Apr 2021 19:49:39 +0530
+ -- Nilesh Patra <nilesh at debian.org>  Mon, 03 May 2021 20:40:12 +0530
 
 readerwriterqueue (1.0.3-1) unstable; urgency=medium
 


=====================================
debian/libreaderwriterqueue-dev.install
=====================================
@@ -1,2 +1,3 @@
 atomicops.h	usr/include
 readerwriterqueue.h	usr/include
+readerwritercircularbuffer.h usr/include


=====================================
debian/patches/hardening.patch
=====================================
@@ -6,7 +6,7 @@ Last-Update: 2021-04-21
 @@ -16,7 +16,7 @@
  default: benchmarks$(EXT)
  
- benchmarks$(EXT): bench.cpp ../readerwriterqueue.h ../atomicops.h ext/1024cores/spscqueue.h ext/folly/ProducerConsumerQueue.h ../tests/common/simplethread.h ../tests/common/simplethread.cpp systemtime.h systemtime.cpp makefile
+ benchmarks$(EXT): bench.cpp ../readerwriterqueue.h ../readerwritercircularbuffer.h ../atomicops.h ext/1024cores/spscqueue.h ext/folly/ProducerConsumerQueue.h ../tests/common/simplethread.h ../tests/common/simplethread.cpp systemtime.h systemtime.cpp makefile
 -	g++ -std=c++11 -Wpedantic -Wall -DNDEBUG -O3 -g bench.cpp ../tests/common/simplethread.cpp systemtime.cpp -o benchmarks$(EXT) -pthread $(PLATFORM_OPTS)
 +	g++ $(CPPFLAGS) $(CXXFLAGS) -std=c++11 -Wpedantic -Wall -DNDEBUG -O3 -g bench.cpp ../tests/common/simplethread.cpp systemtime.cpp -o benchmarks$(EXT) -pthread $(PLATFORM_OPTS) $(LDFLAGS)
  
@@ -28,9 +28,9 @@ Last-Update: 2021-04-21
 @@ -21,7 +21,7 @@
  default: unittests$(EXT)
  
- unittests$(EXT): unittests.cpp ../../readerwriterqueue.h ../../atomicops.h ../common/simplethread.h ../common/simplethread.cpp minitest.h makefile
+ unittests$(EXT): unittests.cpp ../../readerwriterqueue.h ../../readerwritercircularbuffer.h ../../atomicops.h ../common/simplethread.h ../common/simplethread.cpp minitest.h makefile
 -	g++ $(PLATFORM_OPTS) -std=c++11 -Wsign-conversion -Wpedantic -Wall -DNDEBUG -O3 -g unittests.cpp ../common/simplethread.cpp -o unittests$(EXT) -pthread $(PLATFORM_LD_OPTS)
-+	g++ $(CXXFLAGS) $(CPPFLAGS) $(PLATFORM_OPTS) -std=c++11 -Wsign-conversion -Wpedantic -Wall -DNDEBUG -O3 -g unittests.cpp ../common/simplethread.cpp -o unittests$(EXT) -pthread $(PLATFORM_LD_OPTS) $(LDFLAGS)
++	g++ $(CPPFLAGS) $(CXXFLAGS) $(PLATFORM_OPTS) -std=c++11 -Wsign-conversion -Wpedantic -Wall -DNDEBUG -O3 -g unittests.cpp ../common/simplethread.cpp -o unittests$(EXT) -pthread $(PLATFORM_LD_OPTS) $(LDFLAGS)
  
  run: unittests$(EXT)
  	./unittests$(EXT)


=====================================
readerwritercircularbuffer.h
=====================================
@@ -0,0 +1,288 @@
+// ©2020 Cameron Desrochers.
+// Distributed under the simplified BSD license (see the license file that
+// should have come with this header).
+
+// Provides a C++11 implementation of a single-producer, single-consumer wait-free concurrent
+// circular buffer (fixed-size queue).
+
+#pragma once
+
+#include <utility>
+#include <chrono>
+#include <memory>
+#include <cstdlib>
+#include <cstdint>
+#include <cassert>
+
+// Note that this implementation is fully modern C++11 (not compatible with old MSVC versions)
+// but we still include atomicops.h for its LightweightSemaphore implementation.
+#include "atomicops.h"
+
+#ifndef MOODYCAMEL_CACHE_LINE_SIZE
+#define MOODYCAMEL_CACHE_LINE_SIZE 64
+#endif
+
+namespace moodycamel {
+
+template<typename T>
+class BlockingReaderWriterCircularBuffer
+{
+public:
+	typedef T value_type;
+
+public:
+	explicit BlockingReaderWriterCircularBuffer(std::size_t capacity)
+		: maxcap(capacity), mask(), rawData(), data(),
+		slots(new spsc_sema::LightweightSemaphore(static_cast<spsc_sema::LightweightSemaphore::ssize_t>(capacity))),
+		items(new spsc_sema::LightweightSemaphore(0)),
+		nextSlot(0), nextItem(0)
+	{
+		// Round capacity up to power of two to compute modulo mask.
+		// Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
+		--capacity;
+		capacity |= capacity >> 1;
+		capacity |= capacity >> 2;
+		capacity |= capacity >> 4;
+		for (std::size_t i = 1; i < sizeof(std::size_t); i <<= 1)
+			capacity |= capacity >> (i << 3);
+		mask = capacity++;
+		rawData = static_cast<char*>(std::malloc(capacity * sizeof(T) + std::alignment_of<T>::value - 1));
+		data = align_for<T>(rawData);
+	}
+
+	BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer&& other)
+		: maxcap(0), mask(0), rawData(nullptr), data(nullptr),
+		slots(new spsc_sema::LightweightSemaphore(0)),
+		items(new spsc_sema::LightweightSemaphore(0)),
+		nextSlot(), nextItem()
+	{
+		swap(other);
+	}
+
+	BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer const&) = delete;
+
+	// Note: The queue should not be accessed concurrently while it's
+	// being deleted. It's up to the user to synchronize this.
+	~BlockingReaderWriterCircularBuffer()
+	{
+		for (std::size_t i = 0, n = items->availableApprox(); i != n; ++i)
+			reinterpret_cast<T*>(data)[(nextItem + i) & mask].~T();
+		std::free(rawData);
+	}
+
+	BlockingReaderWriterCircularBuffer& operator=(BlockingReaderWriterCircularBuffer&& other) noexcept
+	{
+		swap(other);
+		return *this;
+	}
+
+	BlockingReaderWriterCircularBuffer& operator=(BlockingReaderWriterCircularBuffer const&) = delete;
+
+	// Swaps the contents of this buffer with the contents of another.
+	// Not thread-safe.
+	void swap(BlockingReaderWriterCircularBuffer& other) noexcept
+	{
+		std::swap(maxcap, other.maxcap);
+		std::swap(mask, other.mask);
+		std::swap(rawData, other.rawData);
+		std::swap(data, other.data);
+		std::swap(slots, other.slots);
+		std::swap(items, other.items);
+		std::swap(nextSlot, other.nextSlot);
+		std::swap(nextItem, other.nextItem);
+	}
+
+	// Enqueues a single item (by copying it).
+	// Fails if not enough room to enqueue.
+	// Thread-safe when called by producer thread.
+	// No exception guarantee (state will be corrupted) if constructor of T throws.
+	bool try_enqueue(T const& item)
+	{
+		if (!slots->tryWait())
+			return false;
+		inner_enqueue(item);
+		return true;
+	}
+
+	// Enqueues a single item (by moving it, if possible).
+	// Fails if not enough room to enqueue.
+	// Thread-safe when called by producer thread.
+	// No exception guarantee (state will be corrupted) if constructor of T throws.
+	bool try_enqueue(T&& item)
+	{
+		if (!slots->tryWait())
+			return false;
+		inner_enqueue(std::move(item));
+		return true;
+	}
+
+	// Blocks the current thread until there's enough space to enqueue the given item,
+	// then enqueues it (via copy).
+	// Thread-safe when called by producer thread.
+	// No exception guarantee (state will be corrupted) if constructor of T throws.
+	void wait_enqueue(T const& item)
+	{
+		while (!slots->wait());
+		inner_enqueue(item);
+	}
+
+	// Blocks the current thread until there's enough space to enqueue the given item,
+	// then enqueues it (via move, if possible).
+	// Thread-safe when called by producer thread.
+	// No exception guarantee (state will be corrupted) if constructor of T throws.
+	void wait_enqueue(T&& item)
+	{
+		while (!slots->wait());
+		inner_enqueue(std::move(item));
+	}
+
+	// Blocks the current thread until there's enough space to enqueue the given item,
+	// or the timeout expires. Returns false without enqueueing the item if the timeout
+	// expires, otherwise enqueues the item (via copy) and returns true.
+	// Thread-safe when called by producer thread.
+	// No exception guarantee (state will be corrupted) if constructor of T throws.
+	bool wait_enqueue_timed(T const& item, std::int64_t timeout_usecs)
+	{
+		if (!slots->wait(timeout_usecs))
+			return false;
+		inner_enqueue(item);
+		return true;
+	}
+
+	// Blocks the current thread until there's enough space to enqueue the given item,
+	// or the timeout expires. Returns false without enqueueing the item if the timeout
+	// expires, otherwise enqueues the item (via move, if possible) and returns true.
+	// Thread-safe when called by producer thread.
+	// No exception guarantee (state will be corrupted) if constructor of T throws.
+	bool wait_enqueue_timed(T&& item, std::int64_t timeout_usecs)
+	{
+		if (!slots->wait(timeout_usecs))
+			return false;
+		inner_enqueue(std::move(item));
+		return true;
+	}
+
+	// Blocks the current thread until there's enough space to enqueue the given item,
+	// or the timeout expires. Returns false without enqueueing the item if the timeout
+	// expires, otherwise enqueues the item (via copy) and returns true.
+	// Thread-safe when called by producer thread.
+	// No exception guarantee (state will be corrupted) if constructor of T throws.
+	template<typename Rep, typename Period>
+	inline bool wait_enqueue_timed(T const& item, std::chrono::duration<Rep, Period> const& timeout)
+	{
+		return wait_enqueue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
+	}
+
+	// Blocks the current thread until there's enough space to enqueue the given item,
+	// or the timeout expires. Returns false without enqueueing the item if the timeout
+	// expires, otherwise enqueues the item (via move, if possible) and returns true.
+	// Thread-safe when called by producer thread.
+	// No exception guarantee (state will be corrupted) if constructor of T throws.
+	template<typename Rep, typename Period>
+	inline bool wait_enqueue_timed(T&& item, std::chrono::duration<Rep, Period> const& timeout)
+	{
+		return wait_enqueue_timed(std::move(item), std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
+	}
+
+	// Attempts to dequeue a single item.
+	// Returns false if the buffer is empty.
+	// Thread-safe when called by consumer thread.
+	// No exception guarantee (state will be corrupted) if assignment operator of U throws.
+	template<typename U>
+	bool try_dequeue(U& item)
+	{
+		if (!items->tryWait())
+			return false;
+		inner_dequeue(item);
+		return true;
+	}
+
+	// Blocks the current thread until there's something to dequeue, then dequeues it.
+	// Thread-safe when called by consumer thread.
+	// No exception guarantee (state will be corrupted) if assignment operator of U throws.
+	template<typename U>
+	void wait_dequeue(U& item)
+	{
+		while (!items->wait());
+		inner_dequeue(item);
+	}
+
+	// Blocks the current thread until either there's something to dequeue
+	// or the timeout expires. Returns false without setting `item` if the
+	// timeout expires, otherwise assigns to `item` and returns true.
+	// Thread-safe when called by consumer thread.
+	// No exception guarantee (state will be corrupted) if assignment operator of U throws.
+	template<typename U>
+	bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs)
+	{
+		if (!items->wait(timeout_usecs))
+			return false;
+		inner_dequeue(item);
+		return true;
+	}
+
+	// Blocks the current thread until either there's something to dequeue
+	// or the timeout expires. Returns false without setting `item` if the
+	// timeout expires, otherwise assigns to `item` and returns true.
+	// Thread-safe when called by consumer thread.
+	// No exception guarantee (state will be corrupted) if assignment operator of U throws.
+	template<typename U, typename Rep, typename Period>
+	inline bool wait_dequeue_timed(U& item, std::chrono::duration<Rep, Period> const& timeout)
+	{
+		return wait_dequeue_timed(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
+	}
+
+	// Returns a (possibly outdated) snapshot of the total number of elements currently in the buffer.
+	// Thread-safe.
+	inline std::size_t size_approx() const
+	{
+		return items->availableApprox();
+	}
+
+	// Returns the maximum number of elements that this circular buffer can hold at once.
+	// Thread-safe.
+	inline std::size_t max_capacity() const
+	{
+		return maxcap;
+	}
+
+private:
+	template<typename U>
+	void inner_enqueue(U&& item)
+	{
+		std::size_t i = nextSlot++;
+		new (reinterpret_cast<T*>(data) + (i & mask)) T(std::forward<U>(item));
+		items->signal();
+	}
+
+	template<typename U>
+	void inner_dequeue(U& item)
+	{
+		std::size_t i = nextItem++;
+		T& element = reinterpret_cast<T*>(data)[i & mask];
+		item = std::move(element);
+		element.~T();
+		slots->signal();
+	}
+
+	template<typename U>
+	static inline char* align_for(char* ptr)
+	{
+		const std::size_t alignment = std::alignment_of<U>::value;
+		return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
+	}
+
+private:
+	std::size_t maxcap;                           // actual (non-power-of-two) capacity
+	std::size_t mask;                             // circular buffer capacity mask (for cheap modulo)
+	char* rawData;                                // raw circular buffer memory
+	char* data;                                   // circular buffer memory aligned to element alignment
+	std::unique_ptr<spsc_sema::LightweightSemaphore> slots;  // number of slots currently free
+	std::unique_ptr<spsc_sema::LightweightSemaphore> items;  // number of elements currently enqueued
+	char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(char*) * 2 - sizeof(std::size_t) * 2 - sizeof(std::unique_ptr<spsc_sema::LightweightSemaphore>) * 2];
+	std::size_t nextSlot;                         // index of next free slot to enqueue into
+	char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(std::size_t)];
+	std::size_t nextItem;                         // index of next element to dequeue from
+};
+
+}


=====================================
readerwriterqueue.h
=====================================
@@ -22,8 +22,8 @@
 // A lock-free queue for a single-consumer, single-producer architecture..
 // The queue is also wait-free in the common path (except if more memory
 // needs to be allocated, in which case malloc is called).
-// Allocates memory sparingly (O(lg(n) times, amortized), and only once if
-// the original maximum size estimate is never exceeded.
+// Allocates memory sparingly, and only once if the original maximum size
+// estimate is never exceeded.
 // Tested on x86/x64 processors, but semantics should be correct for all
 // architectures (given the right implementations in atomicops.h), provided
 // that aligned integer and pointer accesses are naturally atomic.


=====================================
tests/unittests/makefile
=====================================
@@ -20,7 +20,7 @@ endif
 
 default: unittests$(EXT)
 
-unittests$(EXT): unittests.cpp ../../readerwriterqueue.h ../../atomicops.h ../common/simplethread.h ../common/simplethread.cpp minitest.h makefile
+unittests$(EXT): unittests.cpp ../../readerwriterqueue.h ../../readerwritercircularbuffer.h ../../atomicops.h ../common/simplethread.h ../common/simplethread.cpp minitest.h makefile
 	g++ $(PLATFORM_OPTS) -std=c++11 -Wsign-conversion -Wpedantic -Wall -DNDEBUG -O3 -g unittests.cpp ../common/simplethread.cpp -o unittests$(EXT) -pthread $(PLATFORM_LD_OPTS)
 
 run: unittests$(EXT)


=====================================
tests/unittests/unittests.cpp
=====================================
@@ -10,6 +10,7 @@
 #include "minitest.h"
 #include "../common/simplethread.h"
 #include "../../readerwriterqueue.h"
+#include "../../readerwritercircularbuffer.h"
 
 using namespace moodycamel;
 
@@ -108,6 +109,7 @@ public:
 		REGISTER_TEST(try_enqueue_fail_workaround);
 		REGISTER_TEST(try_emplace_fail);
 #endif
+		REGISTER_TEST(blocking_circular_buffer);
 	}
 	
 	bool create_empty_queue()
@@ -693,6 +695,120 @@ public:
 		return true;
 	}
 #endif
+
+	bool blocking_circular_buffer()
+	{
+		{
+			// Basic enqueue
+			BlockingReaderWriterCircularBuffer<int> q(65);
+			for (int iteration = 0; iteration != 128; ++iteration) {  // check there's no problem with mismatch between nominal and allocated capacity
+				ASSERT_OR_FAIL(q.max_capacity() == 65);
+				ASSERT_OR_FAIL(q.size_approx() == 0);
+				ASSERT_OR_FAIL(q.try_enqueue(0));
+				ASSERT_OR_FAIL(q.max_capacity() == 65);
+				ASSERT_OR_FAIL(q.size_approx() == 1);
+				for (int i = 1; i != 65; ++i)
+					q.wait_enqueue(i);
+				ASSERT_OR_FAIL(q.size_approx() == 65);
+				ASSERT_OR_FAIL(!q.try_enqueue(65));
+
+				// Basic dequeue
+				int item;
+				ASSERT_OR_FAIL(q.try_dequeue(item));
+				ASSERT_OR_FAIL(item == 0);
+				for (int i = 1; i != 65; ++i) {
+					q.wait_dequeue(item);
+					ASSERT_OR_FAIL(item == i);
+				}
+				ASSERT_OR_FAIL(!q.try_dequeue(item));
+				ASSERT_OR_FAIL(!q.wait_dequeue_timed(item, 1));
+				ASSERT_OR_FAIL(item == 64);
+			}
+		}
+
+		{
+			// Zero capacity
+			BlockingReaderWriterCircularBuffer<int> q(0);
+			ASSERT_OR_FAIL(q.max_capacity() == 0);
+			ASSERT_OR_FAIL(!q.try_enqueue(1));
+			ASSERT_OR_FAIL(!q.wait_enqueue_timed(1, 0));
+		}
+
+		// Element lifetimes
+		Foo::reset();
+		{
+			BlockingReaderWriterCircularBuffer<Foo> q(31);
+			{
+				Foo item;
+				for (int i = 0; i != 23 + 32; ++i) {
+					ASSERT_OR_FAIL(q.try_enqueue(Foo()));
+					ASSERT_OR_FAIL(q.try_dequeue(item));
+				}
+				ASSERT_OR_FAIL(Foo::destroy_count() == 23 + 32);
+				ASSERT_OR_FAIL(Foo::destroyed_in_order());
+			}
+			Foo::reset();
+
+			{
+				Foo item;
+				for (int i = 0; i != 10; ++i)
+					ASSERT_OR_FAIL(q.try_enqueue(Foo()));
+				ASSERT_OR_FAIL(q.size_approx() == 10);
+				ASSERT_OR_FAIL(Foo::destroy_count() == 0);
+				ASSERT_OR_FAIL(q.try_dequeue(item));
+				ASSERT_OR_FAIL(q.size_approx() == 9);
+				ASSERT_OR_FAIL(Foo::destroy_count() == 1);
+			}
+			ASSERT_OR_FAIL(Foo::destroy_count() == 2);
+			ASSERT_OR_FAIL(Foo::destroyed_in_order());
+
+			BlockingReaderWriterCircularBuffer<Foo> q2(std::move(q));
+			ASSERT_OR_FAIL(q.size_approx() == 0);
+			ASSERT_OR_FAIL(q2.size_approx() == 9);
+
+			BlockingReaderWriterCircularBuffer<Foo> q3(2);
+			q3 = std::move(q2);
+			ASSERT_OR_FAIL(q.size_approx() == 0);
+			ASSERT_OR_FAIL(q2.size_approx() == 0);
+			ASSERT_OR_FAIL(q3.size_approx() == 9);
+
+			q = std::move(q2);
+			ASSERT_OR_FAIL(q.size_approx() == 0);
+			ASSERT_OR_FAIL(q2.size_approx() == 0);
+			ASSERT_OR_FAIL(q3.size_approx() == 9);
+			ASSERT_OR_FAIL(Foo::destroy_count() == 2);
+		}
+		ASSERT_OR_FAIL(Foo::destroy_count() == 11);
+		ASSERT_OR_FAIL(Foo::destroyed_in_order());
+
+		weak_atomic<int> result;
+		result = 1;
+
+		{
+			// Threaded
+			BlockingReaderWriterCircularBuffer<int> q(8);
+			SimpleThread reader([&]() {
+				int item;
+				for (int i = 0; i != 1000000; ++i) {
+					q.wait_dequeue(item);
+					if (item != i)
+						result = 0;
+				}
+			});
+			SimpleThread writer([&]() {
+				for (int i = 0; i != 1000000; ++i)
+					q.wait_enqueue(i);
+			});
+
+			writer.join();
+			reader.join();
+
+			ASSERT_OR_FAIL(q.size_approx() == 0);
+			ASSERT_OR_FAIL(result.load());
+		}
+
+		return true;
+	}
 };
 
 



View it on GitLab: https://salsa.debian.org/med-team/readerwriterqueue/-/compare/7fa8798bbbd84911a1e1ea6c97a230554732843f...14d52c2dd85d187833d58d76dc0b76760b246890

-- 
View it on GitLab: https://salsa.debian.org/med-team/readerwriterqueue/-/compare/7fa8798bbbd84911a1e1ea6c97a230554732843f...14d52c2dd85d187833d58d76dc0b76760b246890
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20210503/90b595c0/attachment-0001.htm>


More information about the debian-med-commit mailing list