LibThreading: Fix deadlocks in ThreadPool

When adding tests for `ThreadPool` a handful of deadlocks can be
observed when worker threads wait on `m_work_available`.

The first deadlock is in the destruction of `ThreadPool` where it
is possible for a worker thread to be in the process of acquiring
`m_mutex` when the  broadcast to `m_work_available` in the
destructor happens. This causes the destructor to hang on joining the
thread which is now perpetually waiting on `m_work_available`. This is
solved by repeatedly broadcasting on `m_work_available` until the thread
to join exits.

The second deadlock occurs when the final signal to `m_work_done` is
missed by the wait in `wait_for_all`. At this point all workers are in
the hot loop of attempting to get work from the work queue, however
since there is no work remaining all workers end up waiting on
`m_work_available`. At this point the `wait_for_all` call is also
waiting on `m_work_done`, which will never be signalled again as all
workers are waiting on `m_work_available`.

This requires 2 changes to fix, the first is that workers will signal
`m_done_work` before waiting on `m_work_available`. The second change is
to acquire `m_mutex` before checking the wait conditions as done when
using `wait_while`.
This commit is contained in:
Braydn 2024-08-01 22:23:05 -04:00 committed by Ali Mohammad Pur
parent ed153a1720
commit a0fd7cf371
Notes: github-actions[bot] 2024-08-19 01:09:10 +00:00

View file

@ -36,6 +36,13 @@ struct ThreadPoolLooper {
return IterationDecision::Continue;
pool.m_mutex.lock();
// broadcast on m_work_done here since it is possible the
// wait_for_all loop missed the previous broadcast when work was
// actually done. Without this broadcast the ThreadPool could
// deadlock as there is no remaining work to be done, so this thread
// never resumes and the wait_for_all loop never wakes as there is no
// more work to be completed.
pool.m_work_done.broadcast();
pool.m_work_available.wait();
pool.m_mutex.unlock();
}
@ -76,7 +83,9 @@ public:
{
m_should_exit.store(true, AK::MemoryOrder::memory_order_release);
for (auto& worker : m_workers) {
m_work_available.broadcast();
while (!worker->has_exited()) {
m_work_available.broadcast();
}
(void)worker->join();
}
}
@ -91,18 +100,19 @@ public:
void wait_for_all()
{
while (true) {
if (m_work_queue.with_locked([](auto& queue) { return queue.is_empty(); }))
break;
m_mutex.lock();
m_work_done.wait();
m_mutex.unlock();
{
MutexLocker lock(m_mutex);
m_work_done.wait_while([this]() {
return m_work_queue.with_locked([](auto& queue) {
return !queue.is_empty();
});
});
}
while (m_busy_count.load(AK::MemoryOrder::memory_order_acquire) > 0) {
m_mutex.lock();
m_work_done.wait();
m_mutex.unlock();
{
MutexLocker lock(m_mutex);
m_work_done.wait_while([this] {
return m_busy_count.load(AK::MemoryOrder::memory_order_acquire) > 0;
});
}
}