[med-svn] [Git][med-team/libthread-pool][master] 5 commits: New upstream version 4.0.0
Nilesh Patra (@nilesh)
gitlab at salsa.debian.org
Fri Jul 30 21:02:15 BST 2021
Nilesh Patra pushed to branch master at Debian Med / libthread-pool
Commits:
dced7a4f by Nilesh Patra at 2021-07-31T00:21:25+05:30
New upstream version 4.0.0
- - - - -
d3661f43 by Nilesh Patra at 2021-07-31T00:21:26+05:30
Update upstream source from tag 'upstream/4.0.0'
Update to upstream version '4.0.0'
with Debian dir 039b34b90263bd7049c55731d62c9e52d2960f81
- - - - -
65fbade9 by Nilesh Patra at 2021-07-31T00:27:13+05:30
d/control: Change architecture to any, since it now install arch-dependent .cmake files
- - - - -
a84f9724 by Nilesh Patra at 2021-07-31T00:29:46+05:30
Interim changelog entry
- - - - -
bf51bb1c by Nilesh Patra at 2021-07-31T01:26:28+05:30
Update TODO
- - - - -
8 changed files:
- CMakeLists.txt
- README.md
- debian/changelog
- debian/control
- − include/thread_pool/semaphore.hpp
- include/thread_pool/thread_pool.hpp
- − test/semaphore_test.cpp
- test/thread_pool_test.cpp
Changes:
=====================================
CMakeLists.txt
=====================================
@@ -1,8 +1,8 @@
cmake_minimum_required(VERSION 3.11)
-project(thread_pool VERSION 3.0.3
+project(thread_pool VERSION 4.0.0
LANGUAGES CXX
- DESCRIPTION "ThreadPool is a c++ header only library modifying and extending https://github.com/progschj/ThreadPool.")
+ DESCRIPTION "ThreadPool is a c++ header only library combining https://github.com/progschj/ThreadPool and task stealing by Sean Parent.")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
set(CMAKE_CXX_STANDARD 11)
@@ -84,7 +84,6 @@ endif ()
if (thread_pool_build_tests)
add_executable(thread_pool_test
- test/semaphore_test.cpp
test/thread_pool_test.cpp)
target_link_libraries(thread_pool_test
=====================================
README.md
=====================================
@@ -3,7 +3,7 @@
[![Latest GitHub release](https://img.shields.io/github/release/rvaser/thread_pool.svg)](https://github.com/rvaser/thread_pool/releases/latest)
[![Build status for c++/clang++](https://travis-ci.com/rvaser/thread_pool.svg?branch=master)](https://travis-ci.com/rvaser/thread_pool)
-ThreadPool is a c++ header only library modifying and extending https://github.com/progschj/ThreadPool.
+ThreadPool is a c++ header only library combining https://github.com/progschj/ThreadPool and task stealing by Sean Parent.
## Usage
@@ -40,7 +40,7 @@ If you are not using CMake, include the appropriate header file directly to your
###### Hidden
-- (thread_pool_test) googletest 1.10.0
+- (thread_pool_test) google/googletest 1.10.0
## Examples
=====================================
debian/changelog
=====================================
@@ -1,9 +1,19 @@
-libthread-pool (3.0.3-1) UNRELEASED; urgency=medium
+libthread-pool (4.0.0-1) UNRELEASED; urgency=medium
* Team upload.
* New upstream version
-
- -- Nilesh Patra <nilesh at debian.org> Thu, 22 Apr 2021 21:14:27 +0530
+ * New upstream version 4.0.0
+ * d/control: Change architecture to any, since
+ it now installs arch-dependent .cmake files
+
+TODO: This release of the lib is *not* compatible w/ the racon, and breaks it
+However, there have been several releases of racon which have made it
+compatible w/ this lib. However, none of them have been properly tagged and
+released.
+Opened this issue upstream to track this
+https://github.com/isovic/racon/issues/195
+
+ -- Nilesh Patra <nilesh at debian.org> Sat, 31 Jul 2021 00:26:55 +0530
libthread-pool (3.0.2-1) unstable; urgency=medium
=====================================
debian/control
=====================================
@@ -15,7 +15,7 @@ Homepage: https://github.com/rvaser/thread_pool
Rules-Requires-Root: no
Package: libthread-pool-dev
-Architecture: all
+Architecture: any
Section: libdevel
Depends: ${misc:Depends}
Description: C++ header-only thread pool library (devel)
=====================================
include/thread_pool/semaphore.hpp deleted
=====================================
@@ -1,46 +0,0 @@
-// Copyright (c) 2020 Robert Vaser
-
-#ifndef THREAD_POOL_SEMAPHORE_HPP_
-#define THREAD_POOL_SEMAPHORE_HPP_
-
-#include <condition_variable> // NOLINT
-#include <cstdint>
-#include <memory>
-#include <mutex> // NOLINT
-
-namespace thread_pool {
-
-class Semaphore {
- public:
- explicit Semaphore(std::uint32_t count)
- : count_(count) {}
-
- Semaphore(const Semaphore&) = delete;
- Semaphore& operator=(const Semaphore&) = delete;
-
- Semaphore(Semaphore&&) = delete;
- Semaphore& operator=(Semaphore&&) = delete;
-
- ~Semaphore() = default;
-
- void Wait() {
- std::unique_lock<std::mutex> lock(mutex_);
- condition_.wait(lock, [&] () { return count_; });
- --count_;
- }
-
- void Signal() {
- std::unique_lock<std::mutex> lock(mutex_);
- ++count_;
- condition_.notify_one();
- }
-
- private:
- std::uint32_t count_;
- std::mutex mutex_;
- std::condition_variable condition_;
-};
-
-} // namespace thread_pool
-
-#endif // THREAD_POOL_SEMAPHORE_HPP_
=====================================
include/thread_pool/thread_pool.hpp
=====================================
@@ -1,38 +1,34 @@
// Copyright (c) 2020 Robert Vaser
+// Combination of ThreadPool implementation by progschj and
+// task stealing by Sean Parent
#ifndef THREAD_POOL_THREAD_POOL_HPP_
#define THREAD_POOL_THREAD_POOL_HPP_
#include <algorithm>
#include <atomic>
-#include <cstdint>
#include <functional>
#include <future> // NOLINT
#include <memory>
#include <queue>
-#include <string>
#include <thread> // NOLINT
#include <unordered_map>
#include <utility>
#include <vector>
-#include "thread_pool/semaphore.hpp"
-
namespace thread_pool {
class ThreadPool {
public:
- ThreadPool(std::uint32_t num_threads = std::thread::hardware_concurrency() / 2) // NOLINT
+ explicit ThreadPool(
+ std::size_t num_threads = std::thread::hardware_concurrency())
: threads_(),
- thread_ids_(),
- thread_semaphore_(0),
- queue_(),
- queue_semaphore_(1),
- terminate_(false) {
- num_threads = std::max(1U, num_threads);
- while (num_threads-- != 0) {
- threads_.emplace_back(ThreadPool::Task, this);
- thread_ids_.emplace(threads_.back().get_id(), threads_.size() - 1);
+ thread_map_(),
+ queues_(std::max(1UL, num_threads)),
+ task_id_(0) {
+ for (std::size_t i = 0; i != queues_.size(); ++i) {
+ threads_.emplace_back([this, i] () -> void { Task(i); });
+ thread_map_.emplace(threads_.back().get_id(), i);
}
}
@@ -43,21 +39,20 @@ class ThreadPool {
ThreadPool& operator=(ThreadPool&&) = delete;
~ThreadPool() {
- terminate_ = true;
- for (std::uint32_t i = 0; i < threads_.size(); ++i) {
- thread_semaphore_.Signal();
+ for (auto& it : queues_) {
+ it.Done();
}
for (auto& it : threads_) {
it.join();
}
}
- std::uint32_t num_threads() const {
+ std::size_t num_threads() const {
return threads_.size();
}
- const std::unordered_map<std::thread::id, std::uint32_t>& thread_ids() const {
- return thread_ids_;
+ const std::unordered_map<std::thread::id, std::size_t>& thread_map() const {
+ return thread_map_;
}
template<typename T, typename... Ts>
@@ -70,29 +65,32 @@ class ThreadPool {
(*task)();
};
- queue_semaphore_.Wait();
- queue_.emplace(task_wrapper);
- queue_semaphore_.Signal();
+ auto task_id = task_id_++;
+ bool is_submitted = false;
+ for (std::size_t i = 0; i != queues_.size() * 42; ++i) {
+ if (queues_[(task_id + i) % queues_.size()].TryPush(task_wrapper)) {
+ is_submitted = true;
+ break;
+ }
+ }
+ if (!is_submitted) {
+ queues_[task_id % queues_.size()].Push(task_wrapper);
+ }
- thread_semaphore_.Signal();
return task_result;
}
private:
- static void Task(ThreadPool* thread_pool) {
+ void Task(std::size_t thread_id) {
while (true) {
- thread_pool->thread_semaphore_.Wait();
+ std::function<void()> task;
- if (thread_pool->terminate_) {
- break;
+ for (std::size_t i = 0; i != queues_.size(); ++i) {
+ if (queues_[(thread_id + i) % queues_.size()].TryPop(&task)) {
+ break;
+ }
}
-
- thread_pool->queue_semaphore_.Wait();
- auto task = std::move(thread_pool->queue_.front());
- thread_pool->queue_.pop();
- thread_pool->queue_semaphore_.Signal();
-
- if (thread_pool->terminate_) {
+ if (!task && !queues_[thread_id].Pop(&task)) {
break;
}
@@ -100,12 +98,71 @@ class ThreadPool {
}
}
+ struct TaskQueue {
+ public:
+ template<typename F>
+ void Push(F&& f) {
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ queue.emplace(std::forward<F>(f));
+ }
+ is_ready.notify_one();
+ }
+
+ bool Pop(std::function<void()>* f) {
+ std::unique_lock<std::mutex> lock(mutex);
+ while (queue.empty() && !is_done) {
+ is_ready.wait(lock);
+ }
+ if (queue.empty()) {
+ return false;
+ }
+ *f = std::move(queue.front());
+ queue.pop();
+ return true;
+ }
+
+ template<typename F>
+ bool TryPush(F&& f) {
+ {
+ std::unique_lock<std::mutex> lock(mutex, std::try_to_lock);
+ if (!lock) {
+ return false;
+ }
+ queue.emplace(std::forward<F>(f));
+ }
+ is_ready.notify_one();
+ return true;
+ }
+
+ bool TryPop(std::function<void()>* f) {
+ std::unique_lock<std::mutex> lock(mutex, std::try_to_lock);
+ if (!lock || queue.empty()) {
+ return false;
+ }
+ *f = std::move(queue.front());
+ queue.pop();
+ return true;
+ }
+
+ void Done() {
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ is_done = true;
+ }
+ is_ready.notify_all();
+ }
+
+ std::queue<std::function<void()>> queue;
+ std::mutex mutex;
+ std::condition_variable is_ready;
+ bool is_done = false;
+ };
+
std::vector<std::thread> threads_;
- std::unordered_map<std::thread::id, std::uint32_t> thread_ids_;
- Semaphore thread_semaphore_;
- std::queue<std::function<void()>> queue_;
- Semaphore queue_semaphore_;
- std::atomic<bool> terminate_;
+ std::unordered_map<std::thread::id, std::size_t> thread_map_;
+ std::vector<TaskQueue> queues_;
+ std::atomic<std::size_t> task_id_;
};
} // namespace thread_pool
=====================================
test/semaphore_test.cpp deleted
=====================================
@@ -1,37 +0,0 @@
-// Copyright (c) 2020 Robert Vaser
-
-#include "thread_pool/semaphore.hpp"
-
-#include <thread> // NOLINT
-
-#include "gtest/gtest.h"
-
-namespace thread_pool {
-namespace test {
-
-TEST(ThreadPoolSemaphoreTest, Barrier) {
- Semaphore s{0}, b{0};
- auto check = [&] () -> void {
- s.Signal();
- b.Wait();
- };
-
- std::thread t1{check};
- std::thread t2{check};
- std::thread t3{check};
-
- s.Wait();
- s.Wait();
- s.Wait();
- // Release barrier
- b.Signal();
- b.Signal();
- b.Signal();
-
- t1.join();
- t2.join();
- t3.join();
-}
-
-} // namespace test
-} // namespace thread_pool
=====================================
test/thread_pool_test.cpp
=====================================
@@ -2,18 +2,14 @@
#include "thread_pool/thread_pool.hpp"
-#include <numeric>
-#include <unordered_map>
-#include <unordered_set>
-
#include "gtest/gtest.h"
namespace thread_pool {
namespace test {
TEST(ThreadPoolThreadPoolTest, Submit) {
- std::function<std::uint32_t(std::uint32_t)> fibonacci =
- [&fibonacci] (std::uint32_t i) -> std::uint32_t {
+ std::function<std::size_t(std::size_t)> fibonacci =
+ [&fibonacci] (std::size_t i) -> std::size_t {
if (i == 1 || i == 2) {
return 1;
}
@@ -22,8 +18,8 @@ TEST(ThreadPoolThreadPoolTest, Submit) {
ThreadPool tp{};
- std::vector<std::future<std::uint32_t>> f;
- for (std::uint32_t i = 0; i < tp.num_threads(); ++i) {
+ std::vector<std::future<std::size_t>> f;
+ for (std::size_t i = 0; i < tp.num_threads(); ++i) {
f.emplace_back(tp.Submit(fibonacci, 42));
}
for (auto& it : f) {
@@ -33,33 +29,19 @@ TEST(ThreadPoolThreadPoolTest, Submit) {
TEST(ThreadPoolThreadPoolTest, ThreadIds) {
ThreadPool tp{};
- EXPECT_EQ(tp.num_threads(), tp.thread_ids().size());
+ EXPECT_EQ(tp.num_threads(), tp.thread_map().size());
- Semaphore s{0}, b{0};
- auto check = [&] () -> std::uint32_t {
- EXPECT_EQ(1, tp.thread_ids().count(std::this_thread::get_id()));
- s.Signal();
- b.Wait();
- return tp.thread_ids().at(std::this_thread::get_id());
+ auto thread_id = [&] () -> std::size_t {
+ return tp.thread_map().count(std::this_thread::get_id());
};
- std::vector<std::future<std::uint32_t>> f;
- for (std::uint32_t i = 0; i < tp.num_threads(); ++i) {
- f.emplace_back(tp.Submit(check));
- }
-
- for (std::uint32_t i = 0; i < tp.num_threads(); ++i) {
- s.Wait();
+ std::vector<std::future<size_t>> f;
+ for (std::size_t i = 0; i < tp.num_threads() * 42; ++i) {
+ f.emplace_back(tp.Submit(thread_id));
}
- for (std::uint32_t i = 0; i < tp.num_threads(); ++i) {
- b.Signal();
- }
-
- std::unordered_set<std::uint32_t> ts;
for (auto& it : f) {
- ts.emplace(it.get());
+ EXPECT_EQ(1, it.get());
}
- EXPECT_EQ(tp.num_threads(), ts.size());
}
} // namespace test
View it on GitLab: https://salsa.debian.org/med-team/libthread-pool/-/compare/1f51183ccee99f4628b35f4642643a9c7479e0c9...bf51bb1c584ea64a593618962a61aa085545f26f
--
View it on GitLab: https://salsa.debian.org/med-team/libthread-pool/-/compare/1f51183ccee99f4628b35f4642643a9c7479e0c9...bf51bb1c584ea64a593618962a61aa085545f26f
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20210730/6aa7958a/attachment-0001.htm>
More information about the debian-med-commit
mailing list