Scheduler: Move thread unblocking out of a lambda to help make things more readable

We also use a switch to explicitly make sure we handle all cases properly.
This commit is contained in:
Robin Burchell 2019-07-18 14:10:28 +02:00 committed by Andreas Kling
parent d19136ce07
commit 4da2521606
Notes: sideshowbarker 2024-07-19 13:10:40 +09:00
2 changed files with 120 additions and 123 deletions

View file

@ -51,6 +51,123 @@ void Scheduler::beep()
s_beep_timeout = g_uptime + 100;
}
// Called by the scheduler on threads that are blocked for some reason.
// Make a decision as to whether to unblock them or not.
void Thread::consider_unblock(time_t now_sec, long now_usec)
{
auto& process = this->process();
switch (state()) {
case Thread::__Begin_Blocked_States__:
case Thread::__End_Blocked_States__:
ASSERT_NOT_REACHED();
[[fallthrough]];
case Thread::Invalid:
case Thread::Runnable:
case Thread::Running:
case Thread::Dead:
case Thread::Stopped:
case Thread::BlockedLurking:
case Thread::BlockedSignal:
/* don't know, don't care */
return;
case Thread::BlockedSleep:
if (wakeup_time() <= g_uptime)
unblock();
return;
case Thread::BlockedWait:
process.for_each_child([&](Process& child) {
if (waitee_pid() != -1 && waitee_pid() != child.pid())
return IterationDecision::Continue;
bool child_exited = child.is_dead();
bool child_stopped = child.main_thread().state() == Thread::State::Stopped;
bool wait_finished = ((m_wait_options & WEXITED) && child_exited)
|| ((m_wait_options & WSTOPPED) && child_stopped);
if (!wait_finished)
return IterationDecision::Continue;
m_waitee_pid = child.pid();
unblock();
return IterationDecision::Break;
});
return;
case Thread::BlockedRead:
ASSERT(m_blocked_description);
// FIXME: Block until the amount of data wanted is available.
if (m_blocked_description->can_read())
unblock();
return;
case Thread::BlockedWrite:
ASSERT(m_blocked_description != -1);
if (m_blocked_description->can_write())
unblock();
return;
case Thread::BlockedConnect: {
auto& description = *m_blocked_description;
auto& socket = *description.socket();
if (socket.is_connected())
unblock();
return;
}
case Thread::BlockedReceive: {
auto& description = *m_blocked_description;
auto& socket = *description.socket();
// FIXME: Block until the amount of data wanted is available.
bool timed_out = now_sec > socket.receive_deadline().tv_sec || (now_sec == socket.receive_deadline().tv_sec && now_usec >= socket.receive_deadline().tv_usec);
if (timed_out || description.can_read())
unblock();
return;
}
case Thread::BlockedAccept: {
auto& description = *m_blocked_description;
auto& socket = *description.socket();
if (socket.can_accept())
unblock();
return;
}
case Thread::BlockedSelect:
if (m_select_has_timeout) {
if (now_sec > m_select_timeout.tv_sec || (now_sec == m_select_timeout.tv_sec && now_usec >= m_select_timeout.tv_usec)) {
unblock();
return;
}
}
for (int fd : m_select_read_fds) {
if (process.m_fds[fd].description->can_read()) {
unblock();
return;
}
}
for (int fd : m_select_write_fds) {
if (process.m_fds[fd].description->can_write()) {
unblock();
return;
}
}
return;
case Thread::BlockedCondition:
if (m_block_until_condition()) {
m_block_until_condition = nullptr;
unblock();
}
return;
case Thread::Skip1SchedulerPass:
set_state(Thread::Skip0SchedulerPasses);
return;
case Thread::Skip0SchedulerPasses:
set_state(Thread::Runnable);
return;
case Thread::Dying:
ASSERT(g_finalizer);
if (g_finalizer->state() == Thread::BlockedLurking)
g_finalizer->unblock();
return;
}
}
bool Scheduler::pick_next()
{
ASSERT_INTERRUPTS_DISABLED();
@ -68,134 +185,13 @@ bool Scheduler::pick_next()
struct timeval now;
kgettimeofday(now);
auto now_sec = now.tv_sec;
auto now_usec = now.tv_usec;
// Check and unblock threads whose wait conditions have been met.
Thread::for_each_nonrunnable([&](Thread& thread) {
auto& process = thread.process();
if (thread.state() == Thread::BlockedSleep) {
if (thread.wakeup_time() <= g_uptime)
thread.unblock();
return IterationDecision::Continue;
}
if (thread.state() == Thread::BlockedWait) {
process.for_each_child([&](Process& child) {
if (thread.waitee_pid() != -1 && thread.waitee_pid() != child.pid())
return IterationDecision::Continue;
bool child_exited = child.is_dead();
bool child_stopped = child.main_thread().state() == Thread::State::Stopped;
bool wait_finished = ((thread.m_wait_options & WEXITED) && child_exited)
|| ((thread.m_wait_options & WSTOPPED) && child_stopped);
if (!wait_finished)
return IterationDecision::Continue;
thread.m_waitee_pid = child.pid();
thread.unblock();
return IterationDecision::Break;
});
return IterationDecision::Continue;
}
if (thread.state() == Thread::BlockedRead) {
ASSERT(thread.m_blocked_description);
// FIXME: Block until the amount of data wanted is available.
if (thread.m_blocked_description->can_read())
thread.unblock();
return IterationDecision::Continue;
}
if (thread.state() == Thread::BlockedWrite) {
ASSERT(thread.m_blocked_description != -1);
if (thread.m_blocked_description->can_write())
thread.unblock();
return IterationDecision::Continue;
}
if (thread.state() == Thread::BlockedConnect) {
auto& description = *thread.m_blocked_description;
auto& socket = *description.socket();
if (socket.is_connected())
thread.unblock();
return IterationDecision::Continue;
}
if (thread.state() == Thread::BlockedReceive) {
auto& description = *thread.m_blocked_description;
auto& socket = *description.socket();
// FIXME: Block until the amount of data wanted is available.
bool timed_out = now_sec > socket.receive_deadline().tv_sec || (now_sec == socket.receive_deadline().tv_sec && now_usec >= socket.receive_deadline().tv_usec);
if (timed_out || description.can_read()) {
thread.unblock();
return IterationDecision::Continue;
}
return IterationDecision::Continue;
}
if (thread.state() == Thread::BlockedAccept) {
auto& description = *thread.m_blocked_description;
auto& socket = *description.socket();
if (socket.can_accept()) {
thread.unblock();
return IterationDecision::Continue;
}
return IterationDecision::Continue;
}
if (thread.state() == Thread::BlockedSelect) {
if (thread.m_select_has_timeout) {
if (now_sec > thread.m_select_timeout.tv_sec || (now_sec == thread.m_select_timeout.tv_sec && now_usec >= thread.m_select_timeout.tv_usec)) {
thread.unblock();
return IterationDecision::Continue;
}
}
for (int fd : thread.m_select_read_fds) {
if (process.m_fds[fd].description->can_read()) {
thread.unblock();
return IterationDecision::Continue;
}
}
for (int fd : thread.m_select_write_fds) {
if (process.m_fds[fd].description->can_write()) {
thread.unblock();
return IterationDecision::Continue;
}
}
return IterationDecision::Continue;
}
if (thread.state() == Thread::BlockedCondition) {
if (thread.m_block_until_condition()) {
thread.m_block_until_condition = nullptr;
thread.unblock();
}
return IterationDecision::Continue;
}
if (thread.state() == Thread::Skip1SchedulerPass) {
thread.set_state(Thread::Skip0SchedulerPasses);
return IterationDecision::Continue;
}
if (thread.state() == Thread::Skip0SchedulerPasses) {
thread.set_state(Thread::Runnable);
return IterationDecision::Continue;
}
if (thread.state() == Thread::Dying) {
ASSERT(g_finalizer);
if (g_finalizer->state() == Thread::BlockedLurking)
g_finalizer->unblock();
return IterationDecision::Continue;
}
thread.consider_unblock(now_sec, now_usec);
return IterationDecision::Continue;
});

View file

@ -120,6 +120,7 @@ public:
void set_state(State);
void send_signal(u8 signal, Process* sender);
void consider_unblock(time_t now_sec, long now_usec);
ShouldUnblockThread dispatch_one_pending_signal();
ShouldUnblockThread dispatch_signal(u8 signal);