[med-svn] [Git][med-team/libthread-pool][upstream] New upstream version 4.0.0

Nilesh Patra (@nilesh) gitlab at salsa.debian.org
Fri Jul 30 21:02:20 BST 2021



Nilesh Patra pushed to branch upstream at Debian Med / libthread-pool


Commits:
dced7a4f by Nilesh Patra at 2021-07-31T00:21:25+05:30
New upstream version 4.0.0
- - - - -


6 changed files:

- CMakeLists.txt
- README.md
- − include/thread_pool/semaphore.hpp
- include/thread_pool/thread_pool.hpp
- − test/semaphore_test.cpp
- test/thread_pool_test.cpp


Changes:

=====================================
CMakeLists.txt
=====================================
@@ -1,8 +1,8 @@
 cmake_minimum_required(VERSION 3.11)
 
-project(thread_pool VERSION 3.0.3
+project(thread_pool VERSION 4.0.0
                     LANGUAGES CXX
-                    DESCRIPTION "ThreadPool is a c++ header only library modifying and extending https://github.com/progschj/ThreadPool.")
+                    DESCRIPTION "ThreadPool is a c++ header only library combining https://github.com/progschj/ThreadPool and task stealing by Sean Parent.")
 
 set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic")
 set(CMAKE_CXX_STANDARD 11)
@@ -84,7 +84,6 @@ endif ()
 
 if (thread_pool_build_tests)
   add_executable(thread_pool_test
-    test/semaphore_test.cpp
     test/thread_pool_test.cpp)
 
   target_link_libraries(thread_pool_test


=====================================
README.md
=====================================
@@ -3,7 +3,7 @@
 [![Latest GitHub release](https://img.shields.io/github/release/rvaser/thread_pool.svg)](https://github.com/rvaser/thread_pool/releases/latest)
 [![Build status for c++/clang++](https://travis-ci.com/rvaser/thread_pool.svg?branch=master)](https://travis-ci.com/rvaser/thread_pool)
 
-ThreadPool is a c++ header only library modifying and extending https://github.com/progschj/ThreadPool.
+ThreadPool is a c++ header only library combining https://github.com/progschj/ThreadPool and task stealing by Sean Parent.
 
 ## Usage
 
@@ -40,7 +40,7 @@ If you are not using CMake, include the appropriate header file directly to your
 
 ###### Hidden
 
-- (thread_pool_test) googletest 1.10.0
+- (thread_pool_test) google/googletest 1.10.0
 
 ## Examples
 


=====================================
include/thread_pool/semaphore.hpp deleted
=====================================
@@ -1,46 +0,0 @@
-// Copyright (c) 2020 Robert Vaser
-
-#ifndef THREAD_POOL_SEMAPHORE_HPP_
-#define THREAD_POOL_SEMAPHORE_HPP_
-
-#include <condition_variable>  // NOLINT
-#include <cstdint>
-#include <memory>
-#include <mutex>  // NOLINT
-
-namespace thread_pool {
-
-class Semaphore {
- public:
-  explicit Semaphore(std::uint32_t count)
-      : count_(count) {}
-
-  Semaphore(const Semaphore&) = delete;
-  Semaphore& operator=(const Semaphore&) = delete;
-
-  Semaphore(Semaphore&&) = delete;
-  Semaphore& operator=(Semaphore&&) = delete;
-
-  ~Semaphore() = default;
-
-  void Wait() {
-    std::unique_lock<std::mutex> lock(mutex_);
-    condition_.wait(lock, [&] () { return count_; });
-    --count_;
-  }
-
-  void Signal() {
-    std::unique_lock<std::mutex> lock(mutex_);
-    ++count_;
-    condition_.notify_one();
-  }
-
- private:
-  std::uint32_t count_;
-  std::mutex mutex_;
-  std::condition_variable condition_;
-};
-
-}  // namespace thread_pool
-
-#endif  // THREAD_POOL_SEMAPHORE_HPP_


=====================================
include/thread_pool/thread_pool.hpp
=====================================
@@ -1,38 +1,34 @@
 // Copyright (c) 2020 Robert Vaser
+// Combination of ThreadPool implementation by progschj and
+//   task stealing by Sean Parent
 
 #ifndef THREAD_POOL_THREAD_POOL_HPP_
 #define THREAD_POOL_THREAD_POOL_HPP_
 
 #include <algorithm>
 #include <atomic>
-#include <cstdint>
 #include <functional>
 #include <future>  // NOLINT
 #include <memory>
 #include <queue>
-#include <string>
 #include <thread>  // NOLINT
 #include <unordered_map>
 #include <utility>
 #include <vector>
 
-#include "thread_pool/semaphore.hpp"
-
 namespace thread_pool {
 
 class ThreadPool {
  public:
-  ThreadPool(std::uint32_t num_threads = std::thread::hardware_concurrency() / 2)  // NOLINT
+  explicit ThreadPool(
+      std::size_t num_threads = std::thread::hardware_concurrency())
       : threads_(),
-        thread_ids_(),
-        thread_semaphore_(0),
-        queue_(),
-        queue_semaphore_(1),
-        terminate_(false) {
-    num_threads = std::max(1U, num_threads);
-    while (num_threads-- != 0) {
-      threads_.emplace_back(ThreadPool::Task, this);
-      thread_ids_.emplace(threads_.back().get_id(), threads_.size() - 1);
+        thread_map_(),
+        queues_(std::max(1UL, num_threads)),
+        task_id_(0) {
+    for (std::size_t i = 0; i != queues_.size(); ++i) {
+      threads_.emplace_back([this, i] () -> void { Task(i); });
+      thread_map_.emplace(threads_.back().get_id(), i);
     }
   }
 
@@ -43,21 +39,20 @@ class ThreadPool {
   ThreadPool& operator=(ThreadPool&&) = delete;
 
   ~ThreadPool() {
-    terminate_ = true;
-    for (std::uint32_t i = 0; i < threads_.size(); ++i) {
-      thread_semaphore_.Signal();
+    for (auto& it : queues_) {
+      it.Done();
     }
     for (auto& it : threads_) {
       it.join();
     }
   }
 
-  std::uint32_t num_threads() const {
+  std::size_t num_threads() const {
     return threads_.size();
   }
 
-  const std::unordered_map<std::thread::id, std::uint32_t>& thread_ids() const {
-    return thread_ids_;
+  const std::unordered_map<std::thread::id, std::size_t>& thread_map() const {
+    return thread_map_;
   }
 
   template<typename T, typename... Ts>
@@ -70,29 +65,32 @@ class ThreadPool {
       (*task)();
     };
 
-    queue_semaphore_.Wait();
-    queue_.emplace(task_wrapper);
-    queue_semaphore_.Signal();
+    auto task_id = task_id_++;
+    bool is_submitted = false;
+    for (std::size_t i = 0; i != queues_.size() * 42; ++i) {
+      if (queues_[(task_id + i) % queues_.size()].TryPush(task_wrapper)) {
+        is_submitted = true;
+        break;
+      }
+    }
+    if (!is_submitted) {
+      queues_[task_id % queues_.size()].Push(task_wrapper);
+    }
 
-    thread_semaphore_.Signal();
     return task_result;
   }
 
  private:
-  static void Task(ThreadPool* thread_pool) {
+  void Task(std::size_t thread_id) {
     while (true) {
-      thread_pool->thread_semaphore_.Wait();
+      std::function<void()> task;
 
-      if (thread_pool->terminate_) {
-        break;
+      for (std::size_t i = 0; i != queues_.size(); ++i) {
+        if (queues_[(thread_id + i) % queues_.size()].TryPop(&task)) {
+          break;
+        }
       }
-
-      thread_pool->queue_semaphore_.Wait();
-      auto task = std::move(thread_pool->queue_.front());
-      thread_pool->queue_.pop();
-      thread_pool->queue_semaphore_.Signal();
-
-      if (thread_pool->terminate_) {
+      if (!task && !queues_[thread_id].Pop(&task)) {
         break;
       }
 
@@ -100,12 +98,71 @@ class ThreadPool {
     }
   }
 
+  struct TaskQueue {
+   public:
+    template<typename F>
+    void Push(F&& f) {
+      {
+        std::unique_lock<std::mutex> lock(mutex);
+        queue.emplace(std::forward<F>(f));
+      }
+      is_ready.notify_one();
+    }
+
+    bool Pop(std::function<void()>* f) {
+      std::unique_lock<std::mutex> lock(mutex);
+      while (queue.empty() && !is_done) {
+        is_ready.wait(lock);
+      }
+      if (queue.empty()) {
+        return false;
+      }
+      *f = std::move(queue.front());
+      queue.pop();
+      return true;
+    }
+
+    template<typename F>
+    bool TryPush(F&& f) {
+      {
+        std::unique_lock<std::mutex> lock(mutex, std::try_to_lock);
+        if (!lock) {
+          return false;
+        }
+        queue.emplace(std::forward<F>(f));
+      }
+      is_ready.notify_one();
+      return true;
+    }
+
+    bool TryPop(std::function<void()>* f) {
+      std::unique_lock<std::mutex> lock(mutex, std::try_to_lock);
+      if (!lock || queue.empty()) {
+        return false;
+      }
+      *f = std::move(queue.front());
+      queue.pop();
+      return true;
+    }
+
+    void Done() {
+      {
+        std::unique_lock<std::mutex> lock(mutex);
+        is_done = true;
+      }
+      is_ready.notify_all();
+    }
+
+    std::queue<std::function<void()>> queue;
+    std::mutex mutex;
+    std::condition_variable is_ready;
+    bool is_done = false;
+  };
+
   std::vector<std::thread> threads_;
-  std::unordered_map<std::thread::id, std::uint32_t> thread_ids_;
-  Semaphore thread_semaphore_;
-  std::queue<std::function<void()>> queue_;
-  Semaphore queue_semaphore_;
-  std::atomic<bool> terminate_;
+  std::unordered_map<std::thread::id, std::size_t> thread_map_;
+  std::vector<TaskQueue> queues_;
+  std::atomic<std::size_t> task_id_;
 };
 
 }  // namespace thread_pool


=====================================
test/semaphore_test.cpp deleted
=====================================
@@ -1,37 +0,0 @@
-// Copyright (c) 2020 Robert Vaser
-
-#include "thread_pool/semaphore.hpp"
-
-#include <thread>  // NOLINT
-
-#include "gtest/gtest.h"
-
-namespace thread_pool {
-namespace test {
-
-TEST(ThreadPoolSemaphoreTest, Barrier) {
-  Semaphore s{0}, b{0};
-  auto check = [&] () -> void {
-    s.Signal();
-    b.Wait();
-  };
-
-  std::thread t1{check};
-  std::thread t2{check};
-  std::thread t3{check};
-
-  s.Wait();
-  s.Wait();
-  s.Wait();
-  // Release barrier
-  b.Signal();
-  b.Signal();
-  b.Signal();
-
-  t1.join();
-  t2.join();
-  t3.join();
-}
-
-}  // namespace test
-}  // namespace thread_pool


=====================================
test/thread_pool_test.cpp
=====================================
@@ -2,18 +2,14 @@
 
 #include "thread_pool/thread_pool.hpp"
 
-#include <numeric>
-#include <unordered_map>
-#include <unordered_set>
-
 #include "gtest/gtest.h"
 
 namespace thread_pool {
 namespace test {
 
 TEST(ThreadPoolThreadPoolTest, Submit) {
-  std::function<std::uint32_t(std::uint32_t)> fibonacci =
-      [&fibonacci] (std::uint32_t i) -> std::uint32_t {
+  std::function<std::size_t(std::size_t)> fibonacci =
+      [&fibonacci] (std::size_t i) -> std::size_t {
     if (i == 1 || i == 2) {
       return 1;
     }
@@ -22,8 +18,8 @@ TEST(ThreadPoolThreadPoolTest, Submit) {
 
   ThreadPool tp{};
 
-  std::vector<std::future<std::uint32_t>> f;
-  for (std::uint32_t i = 0; i < tp.num_threads(); ++i) {
+  std::vector<std::future<std::size_t>> f;
+  for (std::size_t i = 0; i < tp.num_threads(); ++i) {
     f.emplace_back(tp.Submit(fibonacci, 42));
   }
   for (auto& it : f) {
@@ -33,33 +29,19 @@ TEST(ThreadPoolThreadPoolTest, Submit) {
 
 TEST(ThreadPoolThreadPoolTest, ThreadIds) {
   ThreadPool tp{};
-  EXPECT_EQ(tp.num_threads(), tp.thread_ids().size());
+  EXPECT_EQ(tp.num_threads(), tp.thread_map().size());
 
-  Semaphore s{0}, b{0};
-  auto check = [&] () -> std::uint32_t {
-    EXPECT_EQ(1, tp.thread_ids().count(std::this_thread::get_id()));
-    s.Signal();
-    b.Wait();
-    return tp.thread_ids().at(std::this_thread::get_id());
+  auto thread_id = [&] () -> std::size_t {
+    return tp.thread_map().count(std::this_thread::get_id());
   };
 
-  std::vector<std::future<std::uint32_t>> f;
-  for (std::uint32_t i = 0; i < tp.num_threads(); ++i) {
-    f.emplace_back(tp.Submit(check));
-  }
-
-  for (std::uint32_t i = 0; i < tp.num_threads(); ++i) {
-    s.Wait();
+  std::vector<std::future<size_t>> f;
+  for (std::size_t i = 0; i < tp.num_threads() * 42; ++i) {
+    f.emplace_back(tp.Submit(thread_id));
   }
-  for (std::uint32_t i = 0; i < tp.num_threads(); ++i) {
-    b.Signal();
-  }
-
-  std::unordered_set<std::uint32_t> ts;
   for (auto& it : f) {
-    ts.emplace(it.get());
+    EXPECT_EQ(1, it.get());
   }
-  EXPECT_EQ(tp.num_threads(), ts.size());
 }
 
 }  // namespace test



View it on GitLab: https://salsa.debian.org/med-team/libthread-pool/-/commit/dced7a4f7e00a9fd83c130cc76f06b2335510fda

-- 
View it on GitLab: https://salsa.debian.org/med-team/libthread-pool/-/commit/dced7a4f7e00a9fd83c130cc76f06b2335510fda
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20210730/6690f0f0/attachment-0001.htm>


More information about the debian-med-commit mailing list