LibThreading: Remove the thread pool

The thread pool test is currently flakey and takes over 2 minutes to run
on CI. It also currently has no users now that RequestServer uses curl,
so let's just remove it for now. If we need it in the future, we can
revive it from git history.
This commit is contained in:
Timothy Flynn 2024-09-22 13:41:31 -04:00 committed by Tim Flynn
parent 93ec010bee
commit 8472b2b1fb
Notes: github-actions[bot] 2024-09-22 18:08:10 +00:00
3 changed files with 0 additions and 212 deletions

View file

@ -1,6 +1,5 @@
set(TEST_SOURCES
TestThread.cpp
TestThreadPool.cpp
)
foreach(source IN LISTS TEST_SOURCES)

View file

@ -1,57 +0,0 @@
/*
* Copyright (c) 2024, Braydn Moore <braydn.moore@uwaterloo.ca>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#include <AK/Time.h>
#include <LibCore/ElapsedTimer.h>
#include <LibTest/TestCase.h>
#include <LibThreading/ThreadPool.h>
using namespace AK::TimeLiterals;
TEST_CASE(thread_pool_deadlock)
{
static constexpr auto RUN_TIMEOUT = 120_sec;
static constexpr u64 NUM_RUNS = 1000;
static constexpr u64 MAX_VALUE = 1 << 15;
for (u64 i = 0; i < NUM_RUNS; ++i) {
u64 expected_value = (MAX_VALUE * (MAX_VALUE + 1)) / 2;
Atomic<u64> sum;
// heap allocate the ThreadPool in case it deadlocks. Exiting in the
// case of a deadlock will purposefully leak memory to avoid calling the
// destructor and hanging the test
auto* thread_pool = new Threading::ThreadPool<u64>(
[&sum](u64 current_val) {
sum += current_val;
});
for (u64 j = 0; j <= MAX_VALUE; ++j) {
thread_pool->submit(j);
}
auto join_thread = Threading::Thread::construct([thread_pool]() -> intptr_t {
thread_pool->wait_for_all();
delete thread_pool;
return 0;
});
join_thread->start();
auto timer = Core::ElapsedTimer::start_new(Core::TimerType::Precise);
while (!join_thread->has_exited() && timer.elapsed_milliseconds() < RUN_TIMEOUT.to_milliseconds())
;
EXPECT(join_thread->has_exited());
// exit since the current pool is deadlocked and we have no way of
// unblocking the pool other than having the OS teardown the process
// struct
if (!join_thread->has_exited()) {
return;
}
(void)join_thread->join();
EXPECT_EQ(sum.load(), expected_value);
}
}

View file

@ -1,154 +0,0 @@
/*
* Copyright (c) 2024, Ali Mohammad Pur <mpfard@serenityos.org>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#pragma once
#include <AK/Concepts.h>
#include <AK/Noncopyable.h>
#include <AK/Queue.h>
#include <LibCore/System.h>
#include <LibThreading/ConditionVariable.h>
#include <LibThreading/MutexProtected.h>
#include <LibThreading/Thread.h>
namespace Threading {
template<typename Pool>
struct ThreadPoolLooper {
IterationDecision next(Pool& pool, bool wait)
{
Optional<typename Pool::Work> entry;
while (true) {
pool.m_busy_count++;
entry = pool.m_work_queue.with_locked([&](auto& queue) -> Optional<typename Pool::Work> {
if (queue.is_empty())
return {};
return queue.dequeue();
});
if (entry.has_value())
break;
pool.m_busy_count--;
if (pool.m_should_exit)
return IterationDecision::Break;
if (!wait)
return IterationDecision::Continue;
pool.m_mutex.lock();
// broadcast on m_work_done here since it is possible the
// wait_for_all loop missed the previous broadcast when work was
// actually done. Without this broadcast the ThreadPool could
// deadlock as there is no remaining work to be done, so this thread
// never resumes and the wait_for_all loop never wakes as there is no
// more work to be completed.
pool.m_work_done.broadcast();
pool.m_work_available.wait();
pool.m_mutex.unlock();
}
pool.m_handler(entry.release_value());
pool.m_busy_count--;
pool.m_work_done.signal();
return IterationDecision::Continue;
}
};
template<typename TWork, template<typename> class Looper = ThreadPoolLooper>
class ThreadPool {
AK_MAKE_NONCOPYABLE(ThreadPool);
AK_MAKE_NONMOVABLE(ThreadPool);
public:
using Work = TWork;
friend struct ThreadPoolLooper<ThreadPool>;
ThreadPool(Optional<size_t> concurrency = {})
requires(IsFunction<Work>)
: m_handler([](Work work) { return work(); })
, m_work_available(m_mutex)
, m_work_done(m_mutex)
{
initialize_workers(concurrency.value_or(Core::System::hardware_concurrency()));
}
explicit ThreadPool(Function<void(Work)> handler, Optional<size_t> concurrency = {})
: m_handler(move(handler))
, m_work_available(m_mutex)
, m_work_done(m_mutex)
{
initialize_workers(concurrency.value_or(Core::System::hardware_concurrency()));
}
~ThreadPool()
{
m_should_exit.store(true, AK::MemoryOrder::memory_order_release);
for (auto& worker : m_workers) {
while (!worker->has_exited()) {
m_work_available.broadcast();
}
(void)worker->join();
}
}
void submit(Work work)
{
m_work_queue.with_locked([&](auto& queue) {
queue.enqueue({ move(work) });
});
m_work_available.broadcast();
}
void wait_for_all()
{
{
MutexLocker lock(m_mutex);
m_work_done.wait_while([this]() {
return m_work_queue.with_locked([](auto& queue) {
return !queue.is_empty();
});
});
}
{
MutexLocker lock(m_mutex);
m_work_done.wait_while([this] {
return m_busy_count.load(AK::MemoryOrder::memory_order_acquire) > 0;
});
}
}
private:
void initialize_workers(size_t concurrency)
{
for (size_t i = 0; i < concurrency; ++i) {
m_workers.append(Thread::construct([this]() -> intptr_t {
Looper<ThreadPool> thread_looper;
for (; !m_should_exit;) {
auto result = thread_looper.next(*this, true);
if (result == IterationDecision::Break)
break;
}
return 0;
},
"ThreadPool worker"sv));
}
for (auto& worker : m_workers)
worker->start();
}
Vector<NonnullRefPtr<Thread>> m_workers;
MutexProtected<Queue<Work>> m_work_queue;
Function<void(Work)> m_handler;
Mutex m_mutex;
ConditionVariable m_work_available;
ConditionVariable m_work_done;
Atomic<bool> m_should_exit { false };
Atomic<size_t> m_busy_count { 0 };
};
}