diff --git a/Tests/LibThreading/CMakeLists.txt b/Tests/LibThreading/CMakeLists.txt index b505bfe17d5..5f4914ab4fd 100644 --- a/Tests/LibThreading/CMakeLists.txt +++ b/Tests/LibThreading/CMakeLists.txt @@ -1,6 +1,5 @@ set(TEST_SOURCES TestThread.cpp - TestThreadPool.cpp ) foreach(source IN LISTS TEST_SOURCES) diff --git a/Tests/LibThreading/TestThreadPool.cpp b/Tests/LibThreading/TestThreadPool.cpp deleted file mode 100644 index 2d12d0970fb..00000000000 --- a/Tests/LibThreading/TestThreadPool.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2024, Braydn Moore - * - * SPDX-License-Identifier: BSD-2-Clause - */ - -#include -#include -#include -#include - -using namespace AK::TimeLiterals; - -TEST_CASE(thread_pool_deadlock) -{ - static constexpr auto RUN_TIMEOUT = 120_sec; - static constexpr u64 NUM_RUNS = 1000; - static constexpr u64 MAX_VALUE = 1 << 15; - - for (u64 i = 0; i < NUM_RUNS; ++i) { - u64 expected_value = (MAX_VALUE * (MAX_VALUE + 1)) / 2; - Atomic sum; - - // heap allocate the ThreadPool in case it deadlocks. Exiting in the - // case of a deadlock will purposefully leak memory to avoid calling the - // destructor and hanging the test - auto* thread_pool = new Threading::ThreadPool( - [&sum](u64 current_val) { - sum += current_val; - }); - - for (u64 j = 0; j <= MAX_VALUE; ++j) { - thread_pool->submit(j); - } - - auto join_thread = Threading::Thread::construct([thread_pool]() -> intptr_t { - thread_pool->wait_for_all(); - delete thread_pool; - return 0; - }); - - join_thread->start(); - auto timer = Core::ElapsedTimer::start_new(Core::TimerType::Precise); - while (!join_thread->has_exited() && timer.elapsed_milliseconds() < RUN_TIMEOUT.to_milliseconds()) - ; - EXPECT(join_thread->has_exited()); - // exit since the current pool is deadlocked and we have no way of - // unblocking the pool other than having the OS teardown the process - // struct - if (!join_thread->has_exited()) { - return; - } - - (void)join_thread->join(); - EXPECT_EQ(sum.load(), expected_value); - } -} diff --git a/Userland/Libraries/LibThreading/ThreadPool.h b/Userland/Libraries/LibThreading/ThreadPool.h deleted file mode 100644 index 91e6af99dc5..00000000000 --- a/Userland/Libraries/LibThreading/ThreadPool.h +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright (c) 2024, Ali Mohammad Pur - * - * SPDX-License-Identifier: BSD-2-Clause - */ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -namespace Threading { - -template -struct ThreadPoolLooper { - IterationDecision next(Pool& pool, bool wait) - { - Optional entry; - while (true) { - pool.m_busy_count++; - entry = pool.m_work_queue.with_locked([&](auto& queue) -> Optional { - if (queue.is_empty()) - return {}; - return queue.dequeue(); - }); - if (entry.has_value()) - break; - - pool.m_busy_count--; - if (pool.m_should_exit) - return IterationDecision::Break; - - if (!wait) - return IterationDecision::Continue; - - pool.m_mutex.lock(); - // broadcast on m_work_done here since it is possible the - // wait_for_all loop missed the previous broadcast when work was - // actually done. Without this broadcast the ThreadPool could - // deadlock as there is no remaining work to be done, so this thread - // never resumes and the wait_for_all loop never wakes as there is no - // more work to be completed. - pool.m_work_done.broadcast(); - pool.m_work_available.wait(); - pool.m_mutex.unlock(); - } - - pool.m_handler(entry.release_value()); - pool.m_busy_count--; - pool.m_work_done.signal(); - return IterationDecision::Continue; - } -}; - -template class Looper = ThreadPoolLooper> -class ThreadPool { - AK_MAKE_NONCOPYABLE(ThreadPool); - AK_MAKE_NONMOVABLE(ThreadPool); - -public: - using Work = TWork; - friend struct ThreadPoolLooper; - - ThreadPool(Optional concurrency = {}) - requires(IsFunction) - : m_handler([](Work work) { return work(); }) - , m_work_available(m_mutex) - , m_work_done(m_mutex) - { - initialize_workers(concurrency.value_or(Core::System::hardware_concurrency())); - } - - explicit ThreadPool(Function handler, Optional concurrency = {}) - : m_handler(move(handler)) - , m_work_available(m_mutex) - , m_work_done(m_mutex) - { - initialize_workers(concurrency.value_or(Core::System::hardware_concurrency())); - } - - ~ThreadPool() - { - m_should_exit.store(true, AK::MemoryOrder::memory_order_release); - for (auto& worker : m_workers) { - while (!worker->has_exited()) { - m_work_available.broadcast(); - } - (void)worker->join(); - } - } - - void submit(Work work) - { - m_work_queue.with_locked([&](auto& queue) { - queue.enqueue({ move(work) }); - }); - m_work_available.broadcast(); - } - - void wait_for_all() - { - { - MutexLocker lock(m_mutex); - m_work_done.wait_while([this]() { - return m_work_queue.with_locked([](auto& queue) { - return !queue.is_empty(); - }); - }); - } - { - MutexLocker lock(m_mutex); - m_work_done.wait_while([this] { - return m_busy_count.load(AK::MemoryOrder::memory_order_acquire) > 0; - }); - } - } - -private: - void initialize_workers(size_t concurrency) - { - for (size_t i = 0; i < concurrency; ++i) { - m_workers.append(Thread::construct([this]() -> intptr_t { - Looper thread_looper; - for (; !m_should_exit;) { - auto result = thread_looper.next(*this, true); - if (result == IterationDecision::Break) - break; - } - - return 0; - }, - "ThreadPool worker"sv)); - } - - for (auto& worker : m_workers) - worker->start(); - } - - Vector> m_workers; - MutexProtected> m_work_queue; - Function m_handler; - Mutex m_mutex; - ConditionVariable m_work_available; - ConditionVariable m_work_done; - Atomic m_should_exit { false }; - Atomic m_busy_count { 0 }; -}; - -}