[med-svn] [Git][med-team/concurrentqueue][upstream] New upstream version 1.0.4+ds
Andreas Tille (@tille)
gitlab at salsa.debian.org
Tue Jul 11 16:00:56 BST 2023
Andreas Tille pushed to branch upstream at Debian Med / concurrentqueue
Commits:
1db216a0 by Andreas Tille at 2023-07-04T11:37:35+02:00
New upstream version 1.0.4+ds
- - - - -
14 changed files:
- .gitignore
- CMakeLists.txt
- README.md
- benchmarks/benchmarks.cpp
- + benchmarks/dlib/test_for_odr_violations.cpp
- blockingconcurrentqueue.h
- build/makefile
- c_api/concurrentqueue.cpp
- c_api/concurrentqueue.h
- concurrentqueue.h
- + concurrentqueueConfig.cmake.in
- lightweightsemaphore.h
- samples.md
- tests/unittests/unittests.cpp
Changes:
=====================================
.gitignore
=====================================
@@ -8,6 +8,7 @@
*.vs
*.VC.db
build/bin/
+build/*.o
build/*.log
build/msvc16/*.log
build/msvc16/obj/
@@ -20,6 +21,8 @@ build/msvc12/obj/
build/msvc11/*.log
build/msvc11/obj/
build/xcode/build/
+.idea/
+cmake-build*/
tests/fuzztests/fuzztests.log
benchmarks/benchmarks.log
tests/CDSChecker/*.o
=====================================
CMakeLists.txt
=====================================
@@ -2,11 +2,73 @@ cmake_minimum_required(VERSION 3.9)
project(concurrentqueue VERSION 1.0.0)
include(GNUInstallDirs)
+include(CMakePackageConfigHelpers)
add_library(${PROJECT_NAME} INTERFACE)
-target_include_directories(concurrentqueue INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
+target_include_directories(${PROJECT_NAME}
+ INTERFACE
+ $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
+ $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}/${PROJECT_NAME}/>
+)
-install(FILES blockingconcurrentqueue.h concurrentqueue.h lightweightsemaphore.h LICENSE.md
- DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${PROJECT_NAME})
+install(TARGETS ${PROJECT_NAME}
+ EXPORT ${PROJECT_NAME}Targets
+)
+write_basic_package_version_file(
+ ${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}ConfigVersion.cmake
+ VERSION
+ ${PROJECT_VERSION}
+ COMPATIBILITY AnyNewerVersion
+)
+
+configure_package_config_file(${PROJECT_NAME}Config.cmake.in
+ ${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}Config.cmake
+ INSTALL_DESTINATION
+ ${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}/
+)
+
+install(EXPORT
+ ${PROJECT_NAME}Targets
+ FILE
+ ${PROJECT_NAME}Targets.cmake
+ NAMESPACE
+ "${PROJECT_NAME}::"
+ DESTINATION
+ ${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}
+ COMPONENT
+ Devel
+)
+
+install(
+ FILES
+ ${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}Config.cmake
+ ${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}ConfigVersion.cmake
+ DESTINATION
+ ${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}
+ COMPONENT
+ Devel
+)
+
+install(
+ FILES
+ blockingconcurrentqueue.h
+ concurrentqueue.h
+ lightweightsemaphore.h
+ LICENSE.md
+ DESTINATION
+ ${CMAKE_INSTALL_INCLUDEDIR}/${PROJECT_NAME}/moodycamel
+)
+
+set(CPACK_PACKAGE_NAME ${PROJECT_NAME})
+set(CPACK_PACKAGE_VENDOR "Cameron Desrochers <cameron at moodycamel.com>")
+set(CPACK_PACKAGE_DESCRIPTION_SUMMARY "An industrial-strength lock-free queue for C++.")
+set(CPACK_PACKAGE_VERSION "${PROJECT_VERSION}")
+set(CPACK_PACKAGE_VERSION_MAJOR "${PROJECT_VERSION_MAJOR}")
+set(CPACK_PACKAGE_VERSION_MINOR "${PROJECT_VERSION_MINOR}")
+set(CPACK_PACKAGE_VERSION_PATCH "${PROJECT_VERSION_PATCH}")
+set(CPACK_DEBIAN_PACKAGE_MAINTAINER ${CPACK_PACKAGE_VENDOR})
+set(CPACK_GENERATOR "RPM;DEB")
+
+include(CPack)
\ No newline at end of file
=====================================
README.md
=====================================
@@ -91,7 +91,7 @@ The entire queue's implementation is contained in **one header**, [`concurrentqu
Simply download and include that to use the queue. The blocking version is in a separate header,
[`blockingconcurrentqueue.h`][blockingconcurrentqueue.h], that depends on [`concurrentqueue.h`][concurrentqueue.h] and
[`lightweightsemaphore.h`][lightweightsemaphore.h]. The implementation makes use of certain key C++11 features,
-so it requires a fairly recent compiler (e.g. VS2012+ or g++ 4.8; note that g++ 4.6 has a known bug with `std::atomic`
+so it requires a relatively recent compiler (e.g. VS2012+ or g++ 4.8; note that g++ 4.6 has a known bug with `std::atomic`
and is thus not supported). The algorithm implementations themselves are platform independent.
Use it like you would any other templated queue, with the exception that you can use
@@ -99,14 +99,16 @@ it from many threads at once :-)
Simple example:
- #include "concurrentqueue.h"
-
- moodycamel::ConcurrentQueue<int> q;
- q.enqueue(25);
-
- int item;
- bool found = q.try_dequeue(item);
- assert(found && item == 25);
+```C++
+#include "concurrentqueue.h"
+
+moodycamel::ConcurrentQueue<int> q;
+q.enqueue(25);
+
+int item;
+bool found = q.try_dequeue(item);
+assert(found && item == 25);
+```
Description of basic methods:
- `ConcurrentQueue(size_t initialSizeEstimate)`
@@ -128,8 +130,12 @@ There's usually two versions of each method, one "explicit" version that takes a
per-consumer token, and one "implicit" version that works without tokens. Using the explicit methods is almost
always faster (though not necessarily by a huge factor). Apart from performance, the primary distinction between them
is their sub-queue allocation behaviour for enqueue operations: Using the implicit enqueue methods causes an
-automatically-allocated thread-local producer sub-queue to be allocated (it is marked for reuse once the thread exits).
-Explicit producers, on the other hand, are tied directly to their tokens' lifetimes (and are also recycled as needed).
+automatically-allocated thread-local producer sub-queue to be allocated.
+Explicit producers, on the other hand, are tied directly to their tokens' lifetimes (but are recycled internally).
+
+In order to avoid the number of sub-queues growing without bound, implicit producers are marked for reuse once
+their thread exits. However, this is not supported on all platforms. If using the queue from short-lived threads,
+it is recommended to use explicit producer tokens instead.
Full API (pseudocode):
@@ -176,31 +182,33 @@ in use either, but it can be easier to coordinate the cleanup.)
Blocking example:
- #include "blockingconcurrentqueue.h"
-
- moodycamel::BlockingConcurrentQueue<int> q;
- std::thread producer([&]() {
- for (int i = 0; i != 100; ++i) {
- std::this_thread::sleep_for(std::chrono::milliseconds(i % 10));
- q.enqueue(i);
- }
- });
- std::thread consumer([&]() {
- for (int i = 0; i != 100; ++i) {
- int item;
- q.wait_dequeue(item);
+```C++
+#include "blockingconcurrentqueue.h"
+
+moodycamel::BlockingConcurrentQueue<int> q;
+std::thread producer([&]() {
+ for (int i = 0; i != 100; ++i) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(i % 10));
+ q.enqueue(i);
+ }
+});
+std::thread consumer([&]() {
+ for (int i = 0; i != 100; ++i) {
+ int item;
+ q.wait_dequeue(item);
+ assert(item == i);
+
+ if (q.wait_dequeue_timed(item, std::chrono::milliseconds(5))) {
+ ++i;
assert(item == i);
-
- if (q.wait_dequeue_timed(item, std::chrono::milliseconds(5))) {
- ++i;
- assert(item == i);
- }
}
- });
- producer.join();
- consumer.join();
-
- assert(q.size_approx() == 0);
+ }
+});
+producer.join();
+consumer.join();
+
+assert(q.size_approx() == 0);
+```
## Advanced features
@@ -212,15 +220,17 @@ You can create a consumer token and/or a producer token for each thread or task
(tokens themselves are not thread-safe), and use the methods that accept a token
as their first parameter:
- moodycamel::ConcurrentQueue<int> q;
-
- moodycamel::ProducerToken ptok(q);
- q.enqueue(ptok, 17);
-
- moodycamel::ConsumerToken ctok(q);
- int item;
- q.try_dequeue(ctok, item);
- assert(item == 17);
+```C++
+moodycamel::ConcurrentQueue<int> q;
+
+moodycamel::ProducerToken ptok(q);
+q.enqueue(ptok, 17);
+
+moodycamel::ConsumerToken ctok(q);
+int item;
+q.try_dequeue(ctok, item);
+assert(item == 17);
+```
If you happen to know which producer you want to consume from (e.g. in
a single-producer, multi-consumer scenario), you can use the `try_dequeue_from_producer`
@@ -249,16 +259,18 @@ Thanks to the [novel design][blog] of the queue, it's just as easy to enqueue/de
items as it is to do one at a time. This means that overhead can be cut drastically for
bulk operations. Example syntax:
- moodycamel::ConcurrentQueue<int> q;
+```C++
+moodycamel::ConcurrentQueue<int> q;
- int items[] = { 1, 2, 3, 4, 5 };
- q.enqueue_bulk(items, 5);
-
- int results[5]; // Could also be any iterator
- size_t count = q.try_dequeue_bulk(results, 5);
- for (size_t i = 0; i != count; ++i) {
- assert(results[i] == items[i]);
- }
+int items[] = { 1, 2, 3, 4, 5 };
+q.enqueue_bulk(items, 5);
+
+int results[5]; // Could also be any iterator
+size_t count = q.try_dequeue_bulk(results, 5);
+for (size_t i = 0; i != count; ++i) {
+ assert(results[i] == items[i]);
+}
+```
#### Preallocation (correctly using `try_enqueue`)
@@ -272,7 +284,7 @@ in order to obtain an effective number of pre-allocated element slots is non-obv
First, be aware that the count passed is rounded up to the next multiple of the block size. Note that the
default block size is 32 (this can be changed via the traits). Second, once a slot in a block has been
-enqueued to, that slot cannot be re-used until the rest of the block has completely been completely filled
+enqueued to, that slot cannot be re-used until the rest of the block has been completely filled
up and then completely emptied. This affects the number of blocks you need in order to account for the
overhead of partially-filled blocks. Third, each producer (whether implicit or explicit) claims and recycles
blocks in a different manner, which again affects the number of blocks you need to account for a desired number of
@@ -285,20 +297,35 @@ integer division (in order for `ceil()` to work).
For explicit producers (using tokens to enqueue):
- (ceil(N / BLOCK_SIZE) + 1) * MAX_NUM_PRODUCERS * BLOCK_SIZE
+```C++
+(ceil(N / BLOCK_SIZE) + 1) * MAX_NUM_PRODUCERS * BLOCK_SIZE
+```
For implicit producers (no tokens):
- (ceil(N / BLOCK_SIZE) - 1 + 2 * MAX_NUM_PRODUCERS) * BLOCK_SIZE
+```C++
+(ceil(N / BLOCK_SIZE) - 1 + 2 * MAX_NUM_PRODUCERS) * BLOCK_SIZE
+```
When using mixed producer types:
- ((ceil(N / BLOCK_SIZE) - 1) * (MAX_EXPLICIT_PRODUCERS + 1) + 2 * (MAX_IMPLICIT_PRODUCERS + MAX_EXPLICIT_PRODUCERS)) * BLOCK_SIZE
+```C++
+((ceil(N / BLOCK_SIZE) - 1) * (MAX_EXPLICIT_PRODUCERS + 1) + 2 * (MAX_IMPLICIT_PRODUCERS + MAX_EXPLICIT_PRODUCERS)) * BLOCK_SIZE
+```
If these formulas seem rather inconvenient, you can use the constructor overload that accepts the minimum
number of elements (`N`) and the maximum number of explicit and implicit producers directly, and let it do the
computation for you.
+In addition to blocks, there are other internal data structures that require allocating memory if they need to resize (grow).
+If using `try_enqueue` exclusively, the initial sizes may be exceeded, causing subsequent `try_enqueue` operations to fail.
+Specifically, the `INITIAL_IMPLICIT_PRODUCER_HASH_SIZE` trait limits the number of implicit producers that can be active at once
+before the internal hash needs resizing. Along the same lines, the `IMPLICIT_INITIAL_INDEX_SIZE` trait limits the number of
+unconsumed elements that an implicit producer can insert before its internal hash needs resizing. Similarly, the
+`EXPLICIT_INITIAL_INDEX_SIZE` trait limits the number of unconsumed elements that an explicit producer can insert before its
+internal hash needs resizing. In order to avoid hitting these limits when using `try_enqueue`, it is crucial to adjust the
+initial sizes in the traits appropriately, in addition to sizing the number of blocks properly as outlined above.
+
Finally, it's important to note that because the queue is only eventually consistent and takes advantage of
weak memory ordering for speed, there's always a possibility that under contention `try_enqueue` will fail
even if the queue is correctly pre-sized for the desired number of elements. (e.g. A given thread may think that
@@ -320,7 +347,7 @@ so be sure to reserve enough capacity in the target container first if you do th
The guarantees are presently as follows:
- Enqueue operations are rolled back completely if an exception is thrown from an element's constructor.
For bulk enqueue operations, this means that elements are copied instead of moved (in order to avoid
- having only some of the objects be moved in the event of an exception). Non-bulk enqueues always use
+ having only some objects moved in the event of an exception). Non-bulk enqueues always use
the move constructor if one is available.
- If the assignment operator throws during a dequeue operation (both single and bulk), the element(s) are
considered dequeued regardless. In such a case, the dequeued elements are all properly destructed before
@@ -339,12 +366,14 @@ and the memory allocation and deallocation functions that are to be used by the
to providing your own traits is to create a class that inherits from the default traits
and override only the values you wish to change. Example:
- struct MyTraits : public moodycamel::ConcurrentQueueDefaultTraits
- {
- static const size_t BLOCK_SIZE = 256; // Use bigger blocks
- };
-
- moodycamel::ConcurrentQueue<int, MyTraits> q;
+```C++
+struct MyTraits : public moodycamel::ConcurrentQueueDefaultTraits
+{
+ static const size_t BLOCK_SIZE = 256; // Use bigger blocks
+};
+
+moodycamel::ConcurrentQueue<int, MyTraits> q;
+```
#### How to dequeue types without calling the constructor
@@ -356,44 +385,48 @@ workaround: Create a wrapper class that copies the memory contents of the object
is assigned by the queue (a poor man's move, essentially). Note that this only works if
the object contains no internal pointers. Example:
- struct MyObjectMover {
- inline void operator=(MyObject&& obj) {
- std::memcpy(data, &obj, sizeof(MyObject));
-
- // TODO: Cleanup obj so that when it's destructed by the queue
- // it doesn't corrupt the data of the object we just moved it into
- }
+```C++
+struct MyObjectMover {
+ inline void operator=(MyObject&& obj) {
+ std::memcpy(data, &obj, sizeof(MyObject));
+
+ // TODO: Cleanup obj so that when it's destructed by the queue
+ // it doesn't corrupt the data of the object we just moved it into
+ }
- inline MyObject& obj() { return *reinterpret_cast<MyObject*>(data); }
+ inline MyObject& obj() { return *reinterpret_cast<MyObject*>(data); }
- private:
- align(alignof(MyObject)) char data[sizeof(MyObject)];
- };
+private:
+ align(alignof(MyObject)) char data[sizeof(MyObject)];
+};
+```
A less dodgy alternative, if moves are cheap but default construction is not, is to use a
wrapper that defers construction until the object is assigned, enabling use of the move
constructor:
- struct MyObjectMover {
- inline void operator=(MyObject&& x) {
- new (data) MyObject(std::move(x));
- created = true;
- }
+```C++
+struct MyObjectMover {
+ inline void operator=(MyObject&& x) {
+ new (data) MyObject(std::move(x));
+ created = true;
+ }
- inline MyObject& obj() {
- assert(created);
- return *reinterpret_cast<MyObject*>(data);
- }
+ inline MyObject& obj() {
+ assert(created);
+ return *reinterpret_cast<MyObject*>(data);
+ }
- ~MyObjectMover() {
- if (created)
- obj().~MyObject();
- }
+ ~MyObjectMover() {
+ if (created)
+ obj().~MyObject();
+ }
- private:
- align(alignof(MyObject)) char data[sizeof(MyObject)];
- bool created = false;
- };
+private:
+ align(alignof(MyObject)) char data[sizeof(MyObject)];
+ bool created = false;
+};
+```
## Samples
@@ -406,9 +439,11 @@ See my blog post for some [benchmark results][benchmarks] (including versus `boo
or run the benchmarks yourself (requires MinGW and certain GnuWin32 utilities to build on Windows, or a recent
g++ on Linux):
- cd build
- make benchmarks
- bin/benchmarks
+```Shell
+cd build
+make benchmarks
+bin/benchmarks
+```
The short version of the benchmarks is that it's so fast (especially the bulk methods), that if you're actually
using the queue to *do* anything, the queue won't be your bottleneck.
@@ -426,6 +461,19 @@ written to be platform-independent, however, and should work across all processo
Due to the complexity of the implementation and the difficult-to-test nature of lock-free code in general,
there may still be bugs. If anyone is seeing buggy behaviour, I'd like to hear about it! (Especially if
a unit test for it can be cooked up.) Just open an issue on GitHub.
+
+## Using vcpkg
+You can download and install `moodycamel::ConcurrentQueue` using the [vcpkg](https://github.com/Microsoft/vcpkg) dependency manager:
+
+```Shell
+git clone https://github.com/Microsoft/vcpkg.git
+cd vcpkg
+./bootstrap-vcpkg.sh
+./vcpkg integrate install
+vcpkg install concurrentqueue
+```
+
+The `moodycamel::ConcurrentQueue` port in vcpkg is kept up to date by Microsoft team members and community contributors. If the version is out of date, please [create an issue or pull request](https://github.com/Microsoft/vcpkg) on the vcpkg repository.
## License
@@ -474,6 +522,7 @@ of the queue itself, followed lastly by the free-standing swap functions.
[source]: https://github.com/cameron314/concurrentqueue
[concurrentqueue.h]: https://github.com/cameron314/concurrentqueue/blob/master/concurrentqueue.h
[blockingconcurrentqueue.h]: https://github.com/cameron314/concurrentqueue/blob/master/blockingconcurrentqueue.h
+[lightweightsemaphore.h]: https://github.com/cameron314/concurrentqueue/blob/master/lightweightsemaphore.h
[unittest-src]: https://github.com/cameron314/concurrentqueue/tree/master/tests/unittests
[benchmarks]: http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++#benchmarks
[benchmark-src]: https://github.com/cameron314/concurrentqueue/tree/master/benchmarks
=====================================
benchmarks/benchmarks.cpp
=====================================
@@ -26,7 +26,13 @@
#include "lockbasedqueue.h"
#include "simplelockfree.h"
#include "boostqueue.h"
+
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wdeprecated-declarations"
+// this uses an old version of OS memory fences on OSX
#include "tbbqueue.h"
+#pragma clang diagnostic pop
+
#include "stdqueue.h"
#include "dlibqueue.h"
#include "../tests/common/simplethread.h"
@@ -260,6 +266,9 @@ struct Traits : public moodycamel::ConcurrentQueueDefaultTraits
// block size will improve throughput (which is mostly what
// we're after with these benchmarks).
static const size_t BLOCK_SIZE = 64;
+
+ // Reuse blocks once allocated.
+ static const bool RECYCLE_ALLOCATED_BLOCKS = true;
};
@@ -963,7 +972,6 @@ double runBenchmark(benchmark_type_t benchmark, int nthreads, bool useTokens, un
case bench_only_enqueue_bulk: {
TQueue q;
- item_t item = 1;
item_t item_rec;
std::vector<counter_t> data;
for (counter_t i = 0; i != BULK_BATCH_SIZE; ++i) {
@@ -1024,7 +1032,6 @@ double runBenchmark(benchmark_type_t benchmark, int nthreads, bool useTokens, un
case bench_mostly_enqueue_bulk: {
// Measures the average speed of enqueueing in bulk under light contention
TQueue q;
- item_t item = 1;
item_t item_rec;
std::vector<counter_t> data;
for (counter_t i = 0; i != BULK_BATCH_SIZE; ++i) {
@@ -1100,7 +1107,6 @@ double runBenchmark(benchmark_type_t benchmark, int nthreads, bool useTokens, un
case bench_only_dequeue_bulk: {
// Measures the average speed of dequeueing in bulk when all threads are consumers
TQueue q;
- item_t item = 1;
item_t item_rec;
{
// Fill up the queue first
@@ -1191,7 +1197,6 @@ double runBenchmark(benchmark_type_t benchmark, int nthreads, bool useTokens, un
case bench_mostly_dequeue_bulk: {
// Measures the average speed of dequeueing in bulk under light contention
TQueue q;
- item_t item = 1;
item_t item_rec;
auto enqueueThreads = std::max(1, nthreads / 4);
out_opCount = maxOps * BULK_BATCH_SIZE * enqueueThreads;
@@ -1502,11 +1507,10 @@ double runBenchmark(benchmark_type_t benchmark, int nthreads, bool useTokens, un
// (that eight separate threads had at one point enqueued to)
out_opCount = maxOps * 2 * nthreads;
TQueue q;
- item_t item = 1;
item_t item_rec;
if (nthreads == 1) {
// No contention -- measures speed of immediately dequeueing the item that was just enqueued
- int item;
+ int item = 0;
auto start = getSystemTime();
if (useTokens) {
typename TQueue::producer_token_t prodTok(q);
@@ -1535,7 +1539,7 @@ double runBenchmark(benchmark_type_t benchmark, int nthreads, bool useTokens, un
while (ready.load(std::memory_order_relaxed) != nthreads)
continue;
- int item;
+ int item = 0;
auto start = getSystemTime();
if (useTokens) {
typename TQueue::producer_token_t prodTok(q);
@@ -1582,7 +1586,7 @@ double runBenchmark(benchmark_type_t benchmark, int nthreads, bool useTokens, un
auto start = getSystemTime();
if (id < 2) {
// Alternate
- int item;
+ int item = 0;
if (useTokens) {
typename TQueue::consumer_token_t consTok(q);
typename TQueue::producer_token_t prodTok(q);
@@ -2017,7 +2021,7 @@ int main(int argc, char** argv)
int maxThreads = QUEUE_MAX_THREADS[queue];
std::vector<BenchmarkResult> results(ITERATIONS);
for (int i = 0; i < ITERATIONS; ++i) {
- double elapsed;
+ double elapsed = 0.0;
counter_t ops = 0;
switch ((queue_id_t)queue) {
=====================================
benchmarks/dlib/test_for_odr_violations.cpp
=====================================
@@ -0,0 +1,47 @@
+// Copyright (C) 2014 Davis E. King (davis at dlib.net)
+// License: Boost Software License See LICENSE.txt for the full license.
+#ifndef DLIB_TEST_FOR_ODR_VIOLATIONS_CPp_
+#define DLIB_TEST_FOR_ODR_VIOLATIONS_CPp_
+
+#include "test_for_odr_violations.h"
+
+extern "C"
+{
+// The point of this block of code is to cause a link time error that will prevent a user
+// from compiling part of their application with DLIB_ASSERT enabled and part with them
+// disabled since doing that would be a violation of C++'s one definition rule.
+#ifdef ENABLE_ASSERTS
+ const int USER_ERROR__inconsistent_build_configuration__see_dlib_faq_1 = 0;
+#else
+ const int USER_ERROR__inconsistent_build_configuration__see_dlib_faq_1_ = 0;
+#endif
+
+
+// The point of this block of code is to cause a link time error if someone builds dlib via
+// cmake as a separately installable library, and therefore generates a dlib/config.h from
+// cmake, but then proceeds to use the default unconfigured dlib/config.h from version
+// control. It should be obvious why this is bad, if it isn't you need to read a book
+// about C++. Moreover, it can only happen if someone manually copies files around and
+// messes things up. If instead they run `make install` or `cmake --build . --target
+// install` things will be setup correctly, which is what they should do. To summarize: DO
+// NOT BUILD A STANDALONE DLIB AND THEN GO CHERRY PICKING FILES FROM THE BUILD FOLDER AND
+// MIXING THEM WITH THE SOURCE FROM GITHUB. USE CMAKE'S INSTALL SCRIPTS TO INSTALL DLIB.
+// Or even better, don't install dlib at all and instead build your program as shown in
+// examples/CMakeLists.txt
+#if defined(DLIB_NOT_CONFIGURED) && !defined(DLIB__CMAKE_GENERATED_A_CONFIG_H_FILE)
+ const int USER_ERROR__inconsistent_build_configuration__see_dlib_faq_2 = 0;
+#endif
+
+
+
+
+
+#ifdef DLIB_CHECK_FOR_VERSION_MISMATCH
+ const int DLIB_CHECK_FOR_VERSION_MISMATCH = 0;
+#endif
+
+}
+
+
+#endif // DLIB_TEST_FOR_ODR_VIOLATIONS_CPp_
+
=====================================
blockingconcurrentqueue.h
=====================================
@@ -544,7 +544,7 @@ public:
// Returns true if the underlying atomic variables used by
// the queue are lock-free (they should be on most platforms).
// Thread-safe.
- static bool is_lock_free()
+ static constexpr bool is_lock_free()
{
return ConcurrentQueue::is_lock_free();
}
=====================================
build/makefile
=====================================
@@ -13,7 +13,10 @@ ifeq ($(OS),Windows_NT)
PLATFORM_OPTS = -static
TBB_PLATFORM_OPTS = -DUSE_WINTHREAD
else
- LD_PLATFORM_OPTS = -lrt
+ UNAME_S := $(shell uname -s)
+ ifeq ($(UNAME_S),Linux)
+ LD_PLATFORM_OPTS = -lrt
+ endif
# -fsanitize=address seems to have a slow memory leak when creating/destroying a lot of threads
#DEBUG_OPTS += -fno-omit-frame-pointer -fsanitize=address
endif
@@ -38,9 +41,9 @@ bin/fuzztests$(EXT): ../concurrentqueue.h ../tests/fuzztests/fuzztests.cpp ../te
test -d bin || mkdir bin
g++ -std=c++11 -Wall -pedantic-errors -Wpedantic $(BENCH_OPTS) ../tests/common/simplethread.cpp ../tests/common/systemtime.cpp ../tests/fuzztests/fuzztests.cpp -o bin/fuzztests$(EXT) $(LD_OPTS)
-bin/benchmarks$(EXT): bin/libtbb.a ../concurrentqueue.h ../benchmarks/benchmarks.cpp ../benchmarks/cpuid.h ../benchmarks/cpuid.cpp ../benchmarks/lockbasedqueue.h ../benchmarks/simplelockfree.h ../tests/common/simplethread.h ../tests/common/simplethread.cpp ../tests/common/systemtime.h ../tests/common/systemtime.cpp makefile
+bin/benchmarks$(EXT): bin/libtbb.a ../concurrentqueue.h ../benchmarks/benchmarks.cpp ../benchmarks/cpuid.h ../benchmarks/cpuid.cpp ../benchmarks/lockbasedqueue.h ../benchmarks/simplelockfree.h ../tests/common/simplethread.h ../tests/common/simplethread.cpp ../tests/common/systemtime.h ../tests/common/systemtime.cpp ../benchmarks/dlib/test_for_odr_violations.cpp makefile
test -d bin || mkdir bin
- g++ -std=c++11 -Wall -pedantic-errors -Wpedantic $(BENCH_OPTS) -I../benchmarks ../benchmarks/cpuid.cpp ../tests/common/simplethread.cpp ../tests/common/systemtime.cpp ../benchmarks/benchmarks.cpp -o bin/benchmarks$(EXT) -Lbin -ltbb $(LD_OPTS)
+ g++ -std=c++11 -Wall -pedantic-errors -Wpedantic $(BENCH_OPTS) -I../benchmarks ../benchmarks/cpuid.cpp ../tests/common/simplethread.cpp ../tests/common/systemtime.cpp ../benchmarks/dlib/test_for_odr_violations.cpp ../benchmarks/benchmarks.cpp -o bin/benchmarks$(EXT) -Lbin -ltbb $(LD_OPTS)
bin/libtbb.a: makefile
test -d bin || mkdir bin
=====================================
c_api/concurrentqueue.cpp
=====================================
@@ -31,4 +31,9 @@ int moodycamel_cq_try_dequeue(MoodycamelCQHandle handle, MoodycamelValue* value)
return reinterpret_cast<MoodycamelCQPtr>(handle)->try_dequeue(*value) ? 1 : 0;
}
+size_t moodycamel_cq_size_approx(MoodycamelCQHandle handle)
+{
+ return reinterpret_cast<MoodycamelCQPtr>(handle)->size_approx();
+}
+
}
=====================================
c_api/concurrentqueue.h
=====================================
@@ -1,5 +1,7 @@
#pragma once
+#include <stddef.h>
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -13,10 +15,10 @@ extern "C" {
#else
#define MOODYCAMEL_EXPORT __declspec(dllimport)
#endif
-#endif
#else
#define MOODYCAMEL_EXPORT
#endif
+#endif
typedef void* MoodycamelCQHandle;
typedef void* MoodycamelBCQHandle;
@@ -26,6 +28,7 @@ MOODYCAMEL_EXPORT int moodycamel_cq_create(MoodycamelCQHandle* handle);
MOODYCAMEL_EXPORT int moodycamel_cq_destroy(MoodycamelCQHandle handle);
MOODYCAMEL_EXPORT int moodycamel_cq_enqueue(MoodycamelCQHandle handle, MoodycamelValue value);
MOODYCAMEL_EXPORT int moodycamel_cq_try_dequeue(MoodycamelCQHandle handle, MoodycamelValue* value);
+MOODYCAMEL_EXPORT size_t moodycamel_cq_size_approx(MoodycamelCQHandle handle);
MOODYCAMEL_EXPORT int moodycamel_bcq_create(MoodycamelBCQHandle* handle);
MOODYCAMEL_EXPORT int moodycamel_bcq_destroy(MoodycamelBCQHandle handle);
=====================================
concurrentqueue.h
=====================================
@@ -31,7 +31,7 @@
#pragma once
-#if defined(__GNUC__)
+#if defined(__GNUC__) && !defined(__INTEL_COMPILER)
// Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
// Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
// upon assigning any computed values)
@@ -77,6 +77,7 @@
#include <climits> // for CHAR_BIT
#include <array>
#include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
+#include <mutex> // used for thread exit synchronization
// Platform-specific definitions of a numeric thread ID type and an invalid value
namespace moodycamel { namespace details {
@@ -104,7 +105,7 @@ namespace moodycamel { namespace details {
static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU; // Not technically guaranteed to be invalid, but is never used in practice. Note that all Win32 thread IDs are presently multiples of 4.
static inline thread_id_t thread_id() { return static_cast<thread_id_t>(::GetCurrentThreadId()); }
} }
-#elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE)
+#elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE) || defined(__MVS__) || defined(MOODYCAMEL_NO_THREAD_LOCAL)
namespace moodycamel { namespace details {
static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8, "std::thread::id is expected to be either 4 or 8 bytes");
@@ -216,9 +217,9 @@ namespace moodycamel { namespace details {
// VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a crippling bug: http://sourceforge.net/p/mingw-w64/bugs/445
// g++ <=4.7 doesn't support thread_local either.
// Finally, iOS/ARM doesn't have support for it either, and g++/ARM allows it to compile but it's unconfirmed to actually work
-#if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
+#if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__) && !defined(__MVS__)
// Assume `thread_local` is fully supported in all other C++11 compilers/platforms
-//#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // always disabled for now since several users report having problems with it on
+#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // tentatively enabled for now; years ago several users report having problems with it on
#endif
#endif
#endif
@@ -378,7 +379,14 @@ struct ConcurrentQueueDefaultTraits
// consumer threads exceeds the number of idle cores (in which case try 0-100).
// Only affects instances of the BlockingConcurrentQueue.
static const int MAX_SEMA_SPINS = 10000;
-
+
+ // Whether to recycle dynamically-allocated blocks into an internal free list or
+ // not. If false, only pre-allocated blocks (controlled by the constructor
+ // arguments) will be recycled, and all others will be `free`d back to the heap.
+ // Note that blocks consumed by explicit producers are only freed on destruction
+ // of the queue (not following destruction of the token) regardless of this trait.
+ static const bool RECYCLE_ALLOCATED_BLOCKS = false;
+
#ifndef MCDBGQ_USE_RELACY
// Memory allocation can be customized if needed.
@@ -468,15 +476,10 @@ namespace details
template<typename T>
static inline bool circular_less_than(T a, T b)
{
-#ifdef _MSC_VER
-#pragma warning(push)
-#pragma warning(disable: 4554)
-#endif
static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types");
- return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << static_cast<T>(sizeof(T) * CHAR_BIT - 1));
-#ifdef _MSC_VER
-#pragma warning(pop)
-#endif
+ return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << (static_cast<T>(sizeof(T) * CHAR_BIT - 1)));
+ // Note: extra parens around rhs of operator<< is MSVC bug: https://developercommunity2.visualstudio.com/t/C4554-triggers-when-both-lhs-and-rhs-is/10034931
+ // silencing the bug requires #pragma warning(disable: 4554) around the calling code and has no effect when done here.
}
template<typename U>
@@ -555,6 +558,8 @@ namespace details
typedef RelacyThreadExitListener ThreadExitListener;
typedef RelacyThreadExitNotifier ThreadExitNotifier;
#else
+ class ThreadExitNotifier;
+
struct ThreadExitListener
{
typedef void (*callback_t)(void*);
@@ -562,22 +567,29 @@ namespace details
void* userData;
ThreadExitListener* next; // reserved for use by the ThreadExitNotifier
+ ThreadExitNotifier* chain; // reserved for use by the ThreadExitNotifier
};
-
-
+
class ThreadExitNotifier
{
public:
static void subscribe(ThreadExitListener* listener)
{
auto& tlsInst = instance();
+ std::lock_guard<std::mutex> guard(mutex());
listener->next = tlsInst.tail;
+ listener->chain = &tlsInst;
tlsInst.tail = listener;
}
static void unsubscribe(ThreadExitListener* listener)
{
- auto& tlsInst = instance();
+ std::lock_guard<std::mutex> guard(mutex());
+ if (!listener->chain) {
+ return; // race with ~ThreadExitNotifier
+ }
+ auto& tlsInst = *listener->chain;
+ listener->chain = nullptr;
ThreadExitListener** prev = &tlsInst.tail;
for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
if (ptr == listener) {
@@ -597,7 +609,9 @@ namespace details
{
// This thread is about to exit, let everyone know!
assert(this == &instance() && "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
+ std::lock_guard<std::mutex> guard(mutex());
for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
+ ptr->chain = nullptr;
ptr->callback(ptr->userData);
}
}
@@ -608,6 +622,13 @@ namespace details
static thread_local ThreadExitNotifier notifier;
return notifier;
}
+
+ static inline std::mutex& mutex()
+ {
+ // Must be static because the ThreadExitNotifier could be destroyed while unsubscribe is called
+ static std::mutex mutex;
+ return mutex;
+ }
private:
ThreadExitListener* tail;
@@ -789,7 +810,7 @@ public:
// queue is fully constructed before it starts being used by other threads (this
// includes making the memory effects of construction visible, possibly with a
// memory barrier).
- explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
+ explicit ConcurrentQueue(size_t capacity = 32 * BLOCK_SIZE)
: producerListTail(nullptr),
producerCount(0),
initialBlockPoolIndex(0),
@@ -1314,7 +1335,7 @@ public:
// Returns true if the underlying atomic variables used by
// the queue are lock-free (they should be on most platforms).
// Thread-safe.
- static bool is_lock_free()
+ static constexpr bool is_lock_free()
{
return
details::static_is_lock_free<bool>::value == 2 &&
@@ -1538,7 +1559,7 @@ private:
struct Block
{
Block()
- : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), shouldBeOnFreeList(false), dynamicallyAllocated(true)
+ : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), dynamicallyAllocated(true)
{
#ifdef MCDBGQ_TRACKMEM
owner = nullptr;
@@ -1655,7 +1676,6 @@ private:
public:
std::atomic<std::uint32_t> freeListRefs;
std::atomic<Block*> freeListNext;
- std::atomic<bool> shouldBeOnFreeList;
bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
#ifdef MCDBGQ_TRACKMEM
@@ -1810,12 +1830,7 @@ private:
auto block = this->tailBlock;
do {
auto nextBlock = block->next;
- if (block->dynamicallyAllocated) {
- destroy(block);
- }
- else {
- this->parent->add_block_to_free_list(block);
- }
+ this->parent->add_block_to_free_list(block);
block = nextBlock;
} while (block != this->tailBlock);
}
@@ -1998,7 +2013,7 @@ private:
// block size (in order to get a correct signed block count offset in all cases):
auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
- auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) / BLOCK_SIZE);
+ auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) / static_cast<typename std::make_signed<index_t>::type>(BLOCK_SIZE));
auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
// Dequeue
@@ -2259,7 +2274,7 @@ private:
auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
- auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE);
+ auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / static_cast<typename std::make_signed<index_t>::type>(BLOCK_SIZE));
auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
// Iterate the blocks and dequeue
@@ -2906,13 +2921,15 @@ private:
else if (!new_block_index()) {
return false;
}
- localBlockIndex = blockIndex.load(std::memory_order_relaxed);
- newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
- idxEntry = localBlockIndex->index[newTail];
- assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
- idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
- localBlockIndex->tail.store(newTail, std::memory_order_release);
- return true;
+ else {
+ localBlockIndex = blockIndex.load(std::memory_order_relaxed);
+ newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
+ idxEntry = localBlockIndex->index[newTail];
+ assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
+ idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
+ localBlockIndex->tail.store(newTail, std::memory_order_release);
+ return true;
+ }
}
inline void rewind_block_index_tail()
@@ -2940,7 +2957,7 @@ private:
assert(tailBase != INVALID_BLOCK_BASE);
// Note: Must use division instead of shift because the index may wrap around, causing a negative
// offset, whose negativity we want to preserve
- auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(index - tailBase) / BLOCK_SIZE);
+ auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(index - tailBase) / static_cast<typename std::make_signed<index_t>::type>(BLOCK_SIZE));
size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr);
return idx;
@@ -3052,7 +3069,12 @@ private:
#ifdef MCDBGQ_TRACKMEM
block->owner = nullptr;
#endif
- freeList.add(block);
+ if (!Traits::RECYCLE_ALLOCATED_BLOCKS && block->dynamicallyAllocated) {
+ destroy(block);
+ }
+ else {
+ freeList.add(block);
+ }
}
inline void add_blocks_to_free_list(Block* block)
@@ -3203,12 +3225,6 @@ private:
//////////////////////////////////
ProducerBase* recycle_or_create_producer(bool isExplicit)
- {
- bool recycled;
- return recycle_or_create_producer(isExplicit, recycled);
- }
-
- ProducerBase* recycle_or_create_producer(bool isExplicit, bool& recycled)
{
#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
debug::DebugLock lock(implicitProdMutex);
@@ -3219,13 +3235,11 @@ private:
bool expected = true;
if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) {
// We caught one! It's been marked as activated, the caller can have it
- recycled = true;
return ptr;
}
}
}
-
- recycled = false;
+
return add_producer(isExplicit ? static_cast<ProducerBase*>(create<ExplicitProducer>(this)) : create<ImplicitProducer>(this));
}
@@ -3396,7 +3410,7 @@ private:
// Look for the id in this hash
auto index = hashedId;
while (true) { // Not an infinite loop because at least one slot is free in the hash table
- index &= hash->capacity - 1;
+ index &= hash->capacity - 1u;
auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
if (probedKey == id) {
@@ -3409,15 +3423,14 @@ private:
if (hash != mainHash) {
index = hashedId;
while (true) {
- index &= mainHash->capacity - 1;
- probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
+ index &= mainHash->capacity - 1u;
auto empty = details::invalid_thread_id;
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
auto reusable = details::invalid_thread_id2;
- if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
- (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
+ if (mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_seq_cst, std::memory_order_relaxed) ||
+ mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
#else
- if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed))) {
+ if (mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
#endif
mainHash->entries[index].value = value;
break;
@@ -3446,7 +3459,7 @@ private:
// locked block).
mainHash = implicitProducerHash.load(std::memory_order_acquire);
if (newCount >= (mainHash->capacity >> 1)) {
- auto newCapacity = mainHash->capacity << 1;
+ size_t newCapacity = mainHash->capacity << 1;
while (newCount >= (newCapacity >> 1)) {
newCapacity <<= 1;
}
@@ -3479,15 +3492,11 @@ private:
// to finish being allocated by another thread (and if we just finished allocating above, the condition will
// always be true)
if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
- bool recycled;
- auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false, recycled));
+ auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false));
if (producer == nullptr) {
implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
return nullptr;
}
- if (recycled) {
- implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
- }
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
@@ -3497,17 +3506,17 @@ private:
auto index = hashedId;
while (true) {
- index &= mainHash->capacity - 1;
- auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
-
+ index &= mainHash->capacity - 1u;
auto empty = details::invalid_thread_id;
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
auto reusable = details::invalid_thread_id2;
- if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
- (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
-#else
- if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed))) {
+ if (mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
+ implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed); // already counted as a used slot
+ mainHash->entries[index].value = producer;
+ break;
+ }
#endif
+ if (mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_seq_cst, std::memory_order_relaxed)) {
mainHash->entries[index].value = producer;
break;
}
@@ -3526,9 +3535,6 @@ private:
#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
void implicit_producer_thread_exited(ImplicitProducer* producer)
{
- // Remove from thread exit listeners
- details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
-
// Remove from hash
#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
debug::DebugLock lock(implicitProdMutex);
@@ -3544,10 +3550,9 @@ private:
for (; hash != nullptr; hash = hash->prev) {
auto index = hashedId;
do {
- index &= hash->capacity - 1;
- probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
- if (probedKey == id) {
- hash->entries[index].key.store(details::invalid_thread_id2, std::memory_order_release);
+ index &= hash->capacity - 1u;
+ probedKey = id;
+ if (hash->entries[index].key.compare_exchange_strong(probedKey, details::invalid_thread_id2, std::memory_order_seq_cst, std::memory_order_relaxed)) {
break;
}
++index;
@@ -3737,6 +3742,6 @@ inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, ty
#pragma warning(pop)
#endif
-#if defined(__GNUC__)
+#if defined(__GNUC__) && !defined(__INTEL_COMPILER)
#pragma GCC diagnostic pop
#endif
=====================================
concurrentqueueConfig.cmake.in
=====================================
@@ -0,0 +1,3 @@
+ at PACKAGE_INIT@
+
+include(${CMAKE_CURRENT_LIST_DIR}/@PROJECT_NAME at Targets.cmake)
=====================================
lightweightsemaphore.h
=====================================
@@ -24,8 +24,16 @@ extern "C" {
}
#elif defined(__MACH__)
#include <mach/mach.h>
+#elif defined(__MVS__)
+#include <zos-semaphore.h>
#elif defined(__unix__)
#include <semaphore.h>
+
+#if defined(__GLIBC_PREREQ) && defined(_GNU_SOURCE)
+#if __GLIBC_PREREQ(2,30)
+#define MOODYCAMEL_LIGHTWEIGHTSEMAPHORE_MONOTONIC
+#endif
+#endif
#endif
namespace moodycamel
@@ -159,9 +167,9 @@ public:
}
}
};
-#elif defined(__unix__)
+#elif defined(__unix__) || defined(__MVS__)
//---------------------------------------------------------
-// Semaphore (POSIX, Linux)
+// Semaphore (POSIX, Linux, zOS)
//---------------------------------------------------------
class Semaphore
{
@@ -209,7 +217,11 @@ public:
struct timespec ts;
const int usecs_in_1_sec = 1000000;
const int nsecs_in_1_sec = 1000000000;
+#ifdef MOODYCAMEL_LIGHTWEIGHTSEMAPHORE_MONOTONIC
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+#else
clock_gettime(CLOCK_REALTIME, &ts);
+#endif
ts.tv_sec += (time_t)(usecs / usecs_in_1_sec);
ts.tv_nsec += (long)(usecs % usecs_in_1_sec) * 1000;
// sem_timedwait bombs if you have more than 1e9 in tv_nsec
@@ -221,7 +233,11 @@ public:
int rc;
do {
+#ifdef MOODYCAMEL_LIGHTWEIGHTSEMAPHORE_MONOTONIC
+ rc = sem_clockwait(&m_sema, CLOCK_MONOTONIC, &ts);
+#else
rc = sem_timedwait(&m_sema, &ts);
+#endif
} while (rc == -1 && errno == EINTR);
return rc == 0;
}
=====================================
samples.md
=====================================
@@ -186,7 +186,7 @@ for (int i = 0; i != ProducerCount; ++i) {
for (int i = 0; i != ConsumerCount; ++i) {
consumers[i] = std::thread([&]() {
Item item;
- while (promisedElementsRemaining.fetch_sub(1, std::memory_order_relaxed)) {
+ while (promisedElementsRemaining.fetch_sub(1, std::memory_order_relaxed) > 0) {
q.wait_dequeue(item);
consumeItem(item);
}
=====================================
tests/unittests/unittests.cpp
=====================================
@@ -96,7 +96,7 @@ struct MallocTrackingTraits : public ConcurrentQueueDefaultTraits
static inline void free(void* ptr) { tracking_allocator::free(ptr); }
};
-template<std::size_t BlockSize = ConcurrentQueueDefaultTraits::BLOCK_SIZE, std::size_t InitialIndexSize = ConcurrentQueueDefaultTraits::EXPLICIT_INITIAL_INDEX_SIZE>
+template<std::size_t BlockSize = ConcurrentQueueDefaultTraits::BLOCK_SIZE, std::size_t InitialIndexSize = ConcurrentQueueDefaultTraits::EXPLICIT_INITIAL_INDEX_SIZE, bool RecycleBlocks = ConcurrentQueueDefaultTraits::RECYCLE_ALLOCATED_BLOCKS>
struct TestTraits : public MallocTrackingTraits
{
typedef std::size_t size_t;
@@ -105,6 +105,7 @@ struct TestTraits : public MallocTrackingTraits
static const size_t BLOCK_SIZE = BlockSize;
static const size_t EXPLICIT_INITIAL_INDEX_SIZE = InitialIndexSize;
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = InitialIndexSize * 2;
+ static const bool RECYCLE_ALLOCATED_BLOCKS = RecycleBlocks;
static inline void reset() { _malloc_count() = 0; _free_count() = 0; }
static inline std::atomic<int>& _malloc_count() { static std::atomic<int> c; return c; }
@@ -348,12 +349,14 @@ public:
REGISTER_TEST(index_wrapping);
REGISTER_TEST(subqueue_size_limit);
REGISTER_TEST(exceptions);
+ REGISTER_TEST(implicit_producer_churn);
REGISTER_TEST(test_threaded);
REGISTER_TEST(test_threaded_bulk);
REGISTER_TEST(full_api<ConcurrentQueueDefaultTraits>);
REGISTER_TEST(full_api<SmallIndexTraits>);
REGISTER_TEST(blocking_wrappers);
REGISTER_TEST(timed_blocking_wrappers);
+
//c_api/concurrentqueue
REGISTER_TEST(c_api_create);
REGISTER_TEST(c_api_enqueue);
@@ -991,8 +994,10 @@ public:
bool block_alloc()
{
typedef TestTraits<2> Traits;
+ typedef TestTraits<2, 32, true> RecycleTraits;
Traits::reset();
+ // Explicit
{
ConcurrentQueue<int, Traits> q(7);
ASSERT_OR_FAIL(q.initialBlockPoolSize == 4);
@@ -1000,71 +1005,121 @@ public:
ASSERT_OR_FAIL(Traits::malloc_count() == 1);
ASSERT_OR_FAIL(Traits::free_count() == 0);
- ProducerToken tok(q);
+ {
+ ProducerToken tok(q);
+ ASSERT_OR_FAIL(Traits::malloc_count() == 3); // one for producer, one for its block index
+ ASSERT_OR_FAIL(Traits::free_count() == 0);
+
+ // Enqueue one item too many (force extra block allocation)
+ for (int i = 0; i != 9; ++i) {
+ ASSERT_OR_FAIL(q.enqueue(tok, i));
+ }
+
+ ASSERT_OR_FAIL(Traits::malloc_count() == 4);
+ ASSERT_OR_FAIL(Traits::free_count() == 0);
+
+ // Still room for one more...
+ ASSERT_OR_FAIL(q.enqueue(tok, 9));
+ ASSERT_OR_FAIL(Traits::malloc_count() == 4);
+ ASSERT_OR_FAIL(Traits::free_count() == 0);
+
+ // No more room without further allocations
+ ASSERT_OR_FAIL(!q.try_enqueue(tok, 10));
+ ASSERT_OR_FAIL(Traits::malloc_count() == 4);
+ ASSERT_OR_FAIL(Traits::free_count() == 0);
+
+ // Check items were enqueued properly
+ int item;
+ for (int i = 0; i != 10; ++i) {
+ ASSERT_OR_FAIL(q.try_dequeue_from_producer(tok, item));
+ ASSERT_OR_FAIL(item == i);
+ }
+
+ // Queue should be empty, but not freed
+ ASSERT_OR_FAIL(!q.try_dequeue_from_producer(tok, item));
+ ASSERT_OR_FAIL(Traits::free_count() == 0);
+ }
+ // Explicit producers are recycled, so block should still be allocated
+ ASSERT_OR_FAIL(Traits::free_count() == 0);
+ }
+
+ ASSERT_OR_FAIL(Traits::malloc_count() == 4);
+ ASSERT_OR_FAIL(Traits::free_count() == 4);
+
+ // Implicit
+ Traits::reset();
+ {
+ ConcurrentQueue<int, Traits> q(7);
+ ASSERT_OR_FAIL(q.initialBlockPoolSize == 4);
+
+ ASSERT_OR_FAIL(q.enqueue(39));
+
ASSERT_OR_FAIL(Traits::malloc_count() == 3); // one for producer, one for its block index
ASSERT_OR_FAIL(Traits::free_count() == 0);
// Enqueue one item too many (force extra block allocation)
- for (int i = 0; i != 9; ++i) {
- ASSERT_OR_FAIL(q.enqueue(tok, i));
+ for (int i = 0; i != 8; ++i) {
+ ASSERT_OR_FAIL(q.enqueue(i));
}
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 0);
// Still room for one more...
- ASSERT_OR_FAIL(q.enqueue(tok, 9));
+ ASSERT_OR_FAIL(q.enqueue(8));
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 0);
// No more room without further allocations
- ASSERT_OR_FAIL(!q.try_enqueue(tok, 10));
+ ASSERT_OR_FAIL(!q.try_enqueue(9));
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 0);
// Check items were enqueued properly
int item;
- for (int i = 0; i != 10; ++i) {
- ASSERT_OR_FAIL(q.try_dequeue_from_producer(tok, item));
+ ASSERT_OR_FAIL(q.try_dequeue(item));
+ ASSERT_OR_FAIL(item == 39);
+ for (int i = 0; i != 9; ++i) {
+ ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
- // Queue should be empty, but not freed
- ASSERT_OR_FAIL(!q.try_dequeue_from_producer(tok, item));
- ASSERT_OR_FAIL(Traits::free_count() == 0);
+ // Queue should be empty, and extra block freed
+ ASSERT_OR_FAIL(!q.try_dequeue(item));
+ ASSERT_OR_FAIL(Traits::free_count() == 1);
}
ASSERT_OR_FAIL(Traits::malloc_count() == 4);
ASSERT_OR_FAIL(Traits::free_count() == 4);
// Implicit
- Traits::reset();
+ RecycleTraits::reset();
{
- ConcurrentQueue<int, Traits> q(7);
+ ConcurrentQueue<int, RecycleTraits> q(7);
ASSERT_OR_FAIL(q.initialBlockPoolSize == 4);
ASSERT_OR_FAIL(q.enqueue(39));
- ASSERT_OR_FAIL(Traits::malloc_count() == 3); // one for producer, one for its block index
- ASSERT_OR_FAIL(Traits::free_count() == 0);
+ ASSERT_OR_FAIL(RecycleTraits::malloc_count() == 3); // one for producer, one for its block index
+ ASSERT_OR_FAIL(RecycleTraits::free_count() == 0);
// Enqueue one item too many (force extra block allocation)
for (int i = 0; i != 8; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
}
- ASSERT_OR_FAIL(Traits::malloc_count() == 4);
- ASSERT_OR_FAIL(Traits::free_count() == 0);
+ ASSERT_OR_FAIL(RecycleTraits::malloc_count() == 4);
+ ASSERT_OR_FAIL(RecycleTraits::free_count() == 0);
// Still room for one more...
ASSERT_OR_FAIL(q.enqueue(8));
- ASSERT_OR_FAIL(Traits::malloc_count() == 4);
- ASSERT_OR_FAIL(Traits::free_count() == 0);
+ ASSERT_OR_FAIL(RecycleTraits::malloc_count() == 4);
+ ASSERT_OR_FAIL(RecycleTraits::free_count() == 0);
// No more room without further allocations
ASSERT_OR_FAIL(!q.try_enqueue(9));
- ASSERT_OR_FAIL(Traits::malloc_count() == 4);
- ASSERT_OR_FAIL(Traits::free_count() == 0);
+ ASSERT_OR_FAIL(RecycleTraits::malloc_count() == 4);
+ ASSERT_OR_FAIL(RecycleTraits::free_count() == 0);
// Check items were enqueued properly
int item;
@@ -1075,13 +1130,12 @@ public:
ASSERT_OR_FAIL(item == i);
}
- // Queue should be empty, but not freed
+ // Queue should be empty, but extra block not freed
ASSERT_OR_FAIL(!q.try_dequeue(item));
- ASSERT_OR_FAIL(Traits::free_count() == 0);
+ ASSERT_OR_FAIL(RecycleTraits::free_count() == 0);
}
-
- ASSERT_OR_FAIL(Traits::malloc_count() == 4);
- ASSERT_OR_FAIL(Traits::free_count() == 4);
+ ASSERT_OR_FAIL(RecycleTraits::malloc_count() == 4);
+ ASSERT_OR_FAIL(RecycleTraits::free_count() == 4);
// Super-aligned
Traits::reset();
@@ -1125,9 +1179,9 @@ public:
ASSERT_OR_FAIL(VeryAligned::errors == 0);
}
- // Queue should be empty, but not freed
+ // Queue should be empty, and extra block freed
ASSERT_OR_FAIL(!q.try_dequeue(item));
- ASSERT_OR_FAIL(Traits::free_count() == 0);
+ ASSERT_OR_FAIL(Traits::free_count() == 1);
ASSERT_OR_FAIL(VeryAligned::errors == 0);
}
@@ -3026,6 +3080,29 @@ public:
return true;
}
+
+ bool implicit_producer_churn()
+ {
+ typedef TestTraits<4> Traits;
+
+ for (int i = 0; i != 256; ++i) {
+ std::vector<SimpleThread> threads(32);
+ ConcurrentQueue<int, Traits> q;
+ for (auto& thread : threads) {
+ thread = SimpleThread([&] {
+ int x;
+ for (int j = 0; j != 16; ++j) {
+ q.enqueue(0);
+ q.try_dequeue(x);
+ }
+ });
+ }
+ for (auto& thread : threads) {
+ thread.join();
+ }
+ }
+ return true;
+ }
bool test_threaded()
{
@@ -3643,9 +3720,9 @@ public:
// is_lock_free()
{
- bool lockFree = ConcurrentQueue<Foo, Traits>::is_lock_free();
+ constexpr bool lockFree = ConcurrentQueue<Foo, Traits>::is_lock_free();
#if defined(__amd64__) || defined(_M_X64) || defined(__x86_64__) || defined(_M_IX86) || defined(__i386__) || defined(_M_PPC) || defined(__powerpc__)
- ASSERT_OR_FAIL(lockFree);
+ static_assert(lockFree, "is_lock_free should be true");
#else
(void)lockFree;
#endif
View it on GitLab: https://salsa.debian.org/med-team/concurrentqueue/-/commit/1db216a0ae7703a41da53bdf5c5d24b38ec34b09
--
View it on GitLab: https://salsa.debian.org/med-team/concurrentqueue/-/commit/1db216a0ae7703a41da53bdf5c5d24b38ec34b09
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20230711/a5865fdb/attachment-0001.htm>
More information about the debian-med-commit
mailing list