diff --git a/Ladybird/RequestServer/CMakeLists.txt b/Ladybird/RequestServer/CMakeLists.txt index 798d7715fae..1f3ae575f89 100644 --- a/Ladybird/RequestServer/CMakeLists.txt +++ b/Ladybird/RequestServer/CMakeLists.txt @@ -33,7 +33,7 @@ target_link_libraries(RequestServer PRIVATE requestserver) target_include_directories(requestserver PRIVATE ${SERENITY_SOURCE_DIR}/Userland/Services/) target_include_directories(requestserver PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/..) -target_link_libraries(requestserver PUBLIC LibCore LibMain LibCrypto LibFileSystem LibGemini LibHTTP LibIPC LibMain LibTLS LibWebView LibWebSocket LibURL) +target_link_libraries(requestserver PUBLIC LibCore LibMain LibCrypto LibFileSystem LibGemini LibHTTP LibIPC LibMain LibTLS LibWebView LibWebSocket LibURL LibThreading) if (${CMAKE_SYSTEM_NAME} MATCHES "SunOS") # Solaris has socket and networking related functions in two extra libraries target_link_libraries(requestserver PUBLIC nsl socket) diff --git a/Userland/Libraries/LibCore/ElapsedTimer.cpp b/Userland/Libraries/LibCore/ElapsedTimer.cpp index bc81d93be99..e4dd994759f 100644 --- a/Userland/Libraries/LibCore/ElapsedTimer.cpp +++ b/Userland/Libraries/LibCore/ElapsedTimer.cpp @@ -10,9 +10,9 @@ namespace Core { -ElapsedTimer ElapsedTimer::start_new() +ElapsedTimer ElapsedTimer::start_new(TimerType timer_type) { - ElapsedTimer timer; + ElapsedTimer timer(timer_type); timer.start(); return timer; } diff --git a/Userland/Libraries/LibCore/ElapsedTimer.h b/Userland/Libraries/LibCore/ElapsedTimer.h index 380e8bce049..716943cad90 100644 --- a/Userland/Libraries/LibCore/ElapsedTimer.h +++ b/Userland/Libraries/LibCore/ElapsedTimer.h @@ -17,7 +17,7 @@ enum class TimerType { class ElapsedTimer { public: - static ElapsedTimer start_new(); + static ElapsedTimer start_new(TimerType timer_type = TimerType::Coarse); ElapsedTimer(TimerType timer_type = TimerType::Coarse) : m_timer_type(timer_type) diff --git a/Userland/Libraries/LibTLS/TLSv12.cpp b/Userland/Libraries/LibTLS/TLSv12.cpp index 6e4b2cce6b3..6adefa5855c 100644 --- a/Userland/Libraries/LibTLS/TLSv12.cpp +++ b/Userland/Libraries/LibTLS/TLSv12.cpp @@ -569,7 +569,7 @@ DefaultRootCACertificates::DefaultRootCACertificates() DefaultRootCACertificates& DefaultRootCACertificates::the() { - static DefaultRootCACertificates s_the; + static thread_local DefaultRootCACertificates s_the; return s_the; } diff --git a/Userland/Services/RequestServer/CMakeLists.txt b/Userland/Services/RequestServer/CMakeLists.txt index 85711038409..34c533c18ce 100644 --- a/Userland/Services/RequestServer/CMakeLists.txt +++ b/Userland/Services/RequestServer/CMakeLists.txt @@ -26,4 +26,4 @@ set(GENERATED_SOURCES ) serenity_bin(RequestServer) -target_link_libraries(RequestServer PRIVATE LibCore LibCrypto LibIPC LibGemini LibHTTP LibMain LibTLS LibWebSocket LibURL) +target_link_libraries(RequestServer PRIVATE LibCore LibCrypto LibIPC LibGemini LibHTTP LibMain LibTLS LibWebSocket LibURL LibThreading) diff --git a/Userland/Services/RequestServer/ConnectionCache.cpp b/Userland/Services/RequestServer/ConnectionCache.cpp index 3c9c46985df..c7dfdd13538 100644 --- a/Userland/Services/RequestServer/ConnectionCache.cpp +++ b/Userland/Services/RequestServer/ConnectionCache.cpp @@ -11,9 +11,9 @@ namespace RequestServer::ConnectionCache { -HashMap>>>> g_tcp_connection_cache {}; -HashMap>>>> g_tls_connection_cache {}; -HashMap g_inferred_server_properties; +Threading::RWLockProtected>>>>> g_tcp_connection_cache {}; +Threading::RWLockProtected>>>>> g_tls_connection_cache {}; +Threading::RWLockProtected> g_inferred_server_properties; void request_did_finish(URL::URL const& url, Core::Socket const* socket) { @@ -26,8 +26,20 @@ void request_did_finish(URL::URL const& url, Core::Socket const* socket) ConnectionKey partial_key { url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string(), url.port_or_default() }; auto fire_off_next_job = [&](auto& cache) { - auto it = find_if(cache.begin(), cache.end(), [&](auto& connection) { return connection.key.hostname == partial_key.hostname && connection.key.port == partial_key.port; }); - if (it == cache.end()) { + using CacheType = typename RemoveCVReference::ProtectedType; + auto [it, end] = cache.with_read_locked([&](auto const& cache) { + struct Result { + decltype(cache.begin()) it; + decltype(cache.end()) end; + }; + return Result { + find_if(cache.begin(), cache.end(), [&](auto& connection) { + return connection.key.hostname == partial_key.hostname && connection.key.port == partial_key.port; + }), + cache.end(), + }; + }); + if (it == end) { dbgln("Request for URL {} finished, but we don't own that!", url); return; } @@ -38,11 +50,16 @@ void request_did_finish(URL::URL const& url, Core::Socket const* socket) } auto& connection = *connection_it; - auto& properties = g_inferred_server_properties.ensure(partial_key.hostname); + if constexpr (REQUESTSERVER_DEBUG) { + connection->job_data->timing_info.performing_request = Duration::from_milliseconds(connection->job_data->timing_info.timer.elapsed_milliseconds()); + connection->job_data->timing_info.timer.start(); + } + + auto& properties = g_inferred_server_properties.with_write_locked([&](auto& map) -> InferredServerProperties& { return map.ensure(partial_key.hostname); }); if (!connection->socket->is_open()) properties.requests_served_per_connection = min(properties.requests_served_per_connection, connection->max_queue_length + 1); - if (connection->request_queue.is_empty()) { + if (connection->request_queue.with_read_locked([](auto const& queue) { return queue.is_empty(); })) { // Immediately mark the connection as finished, as new jobs will never be run if they are queued // before the deferred_invoke() below runs otherwise. connection->has_started = false; @@ -59,30 +76,47 @@ void request_did_finish(URL::URL const& url, Core::Socket const* socket) if (ptr->has_started) return; - dbgln_if(REQUESTSERVER_DEBUG, "Removing no-longer-used connection {} (socket {})", ptr, ptr->socket); - auto did_remove = cache_entry.remove_first_matching([&](auto& entry) { return entry == ptr; }); - VERIFY(did_remove); - if (cache_entry.is_empty()) - cache.remove(key); + dbgln_if(REQUESTSERVER_DEBUG, "Removing no-longer-used connection {} (socket {})", ptr, ptr->socket.ptr()); + cache.with_write_locked([&](CacheType& cache) { + auto did_remove = cache_entry.remove_first_matching([&](auto& entry) { return entry == ptr; }); + VERIFY(did_remove); + if (cache_entry.is_empty()) + cache.remove(key); + }); }); }; connection->removal_timer->start(); }); } else { + auto timer = Core::ElapsedTimer::start_new(); if (auto result = recreate_socket_if_needed(*connection, url); result.is_error()) { - dbgln("ConnectionCache request finish handler, reconnection failed with {}", result.error()); - connection->job_data.fail(Core::NetworkJob::Error::ConnectionFailed); + if constexpr (REQUESTSERVER_DEBUG) { + connection->job_data->timing_info.starting_connection += Duration::from_milliseconds(timer.elapsed_milliseconds()); + } + cache.with_read_locked([&](auto&) { + dbgln("ConnectionCache request finish handler, reconnection failed with {}", result.error()); + connection->job_data->fail(Core::NetworkJob::Error::ConnectionFailed); + }); return; } + if constexpr (REQUESTSERVER_DEBUG) { + connection->job_data->timing_info.starting_connection += Duration::from_milliseconds(timer.elapsed_milliseconds()); + } connection->has_started = true; - Core::deferred_invoke([&connection = *connection, url] { - dbgln_if(REQUESTSERVER_DEBUG, "Running next job in queue for connection {}", &connection); - connection.timer.start(); - connection.current_url = url; - connection.job_data = connection.request_queue.take_first(); - connection.socket->set_notifications_enabled(true); - connection.job_data.start(*connection.socket); + Core::deferred_invoke([&connection = *connection, url, &cache] { + cache.with_read_locked([&](auto&) { + dbgln_if(REQUESTSERVER_DEBUG, "Running next job in queue for connection {}", &connection); + connection.timer.start(); + connection.current_url = url; + connection.job_data = connection.request_queue.with_write_locked([](auto& queue) { return queue.take_first(); }); + if constexpr (REQUESTSERVER_DEBUG) { + connection.job_data->timing_info.waiting_in_queue = Duration::from_milliseconds(connection.job_data->timing_info.timer.elapsed_milliseconds() - connection.job_data->timing_info.performing_request.to_milliseconds()); + connection.job_data->timing_info.timer.start(); + } + connection.socket->set_notifications_enabled(true); + connection.job_data->start(*connection.socket); + }); }); } }; @@ -97,28 +131,37 @@ void request_did_finish(URL::URL const& url, Core::Socket const* socket) void dump_jobs() { - dbgln("=========== TLS Connection Cache =========="); - for (auto& connection : g_tls_connection_cache) { - dbgln(" - {}:{}", connection.key.hostname, connection.key.port); - for (auto& entry : *connection.value) { - dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket); - dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0); - dbgln(" Request Queue:"); - for (auto& job : entry->request_queue) - dbgln(" - {}", &job); + g_tls_connection_cache.with_read_locked([](auto& cache) { + dbgln("=========== TLS Connection Cache =========="); + for (auto& connection : cache) { + dbgln(" - {}:{}", connection.key.hostname, connection.key.port); + for (auto& entry : *connection.value) { + dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket.ptr()); + dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0); + dbgln(" Request Queue:"); + entry->request_queue.for_each_locked([](auto const& job) { + dbgln(" - {}", &job); + }); + } } - } - dbgln("=========== TCP Connection Cache =========="); - for (auto& connection : g_tcp_connection_cache) { - dbgln(" - {}:{}", connection.key.hostname, connection.key.port); - for (auto& entry : *connection.value) { - dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket); - dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0); - dbgln(" Request Queue:"); - for (auto& job : entry->request_queue) - dbgln(" - {}", &job); + }); + + g_tcp_connection_cache.with_read_locked([](auto& cache) { + dbgln("=========== TCP Connection Cache =========="); + for (auto& connection : cache) { + dbgln(" - {}:{}", connection.key.hostname, connection.key.port); + for (auto& entry : *connection.value) { + dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket.ptr()); + dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0); + dbgln(" Request Queue:"); + entry->request_queue.for_each_locked([](auto const& job) { + dbgln(" - {}", &job); + }); + } } - } + }); } +size_t hits; +size_t misses; } diff --git a/Userland/Services/RequestServer/ConnectionCache.h b/Userland/Services/RequestServer/ConnectionCache.h index f812d91ad51..f9ad626da3c 100644 --- a/Userland/Services/RequestServer/ConnectionCache.h +++ b/Userland/Services/RequestServer/ConnectionCache.h @@ -16,6 +16,7 @@ #include #include #include +#include #include namespace RequestServer { @@ -53,42 +54,95 @@ struct Proxy { } }; +struct JobData { + Function start {}; + Function fail {}; + Function()> provide_client_certificates {}; + struct TimingInfo { +#if REQUESTSERVER_DEBUG + bool valid { true }; + Core::ElapsedTimer timer {}; + URL::URL url {}; + Duration waiting_in_queue {}; + Duration starting_connection {}; + Duration performing_request {}; +#endif + } timing_info {}; + + JobData(Function start, Function fail, Function()> provide_client_certificates, TimingInfo timing_info) + : start(move(start)) + , fail(move(fail)) + , provide_client_certificates(move(provide_client_certificates)) + , timing_info(move(timing_info)) + { + } + + JobData(JobData&& other) + : start(move(other.start)) + , fail(move(other.fail)) + , provide_client_certificates(move(other.provide_client_certificates)) + , timing_info(move(other.timing_info)) + { +#if REQUESTSERVER_DEBUG + other.timing_info.valid = false; +#endif + } + +#if REQUESTSERVER_DEBUG + ~JobData() + { + if (timing_info.valid) { + dbgln("JobData for {} timings:", timing_info.url); + dbgln(" - Waiting in queue: {}ms", timing_info.waiting_in_queue.to_milliseconds()); + dbgln(" - Starting connection: {}ms", timing_info.starting_connection.to_milliseconds()); + dbgln(" - Performing request: {}ms", timing_info.performing_request.to_milliseconds()); + } + } +#endif + + template + static JobData create(NonnullRefPtr job, [[maybe_unused]] URL::URL url) + { + return JobData { + [job](auto& socket) { job->start(socket); }, + [job](auto error) { job->fail(error); }, + [job] { + if constexpr (requires { job->on_certificate_requested; }) { + if (job->on_certificate_requested) + return job->on_certificate_requested(); + } else { + // "use" `job`, otherwise clang gets sad. + (void)job; + } + return Vector {}; + }, + { +#if REQUESTSERVER_DEBUG + .timer = Core::ElapsedTimer::start_new(Core::TimerType::Precise), + .url = move(url), + .waiting_in_queue = {}, + .starting_connection = {}, + .performing_request = {}, +#endif + }, + }; + } +}; + template struct Connection { - struct JobData { - Function start {}; - Function fail {}; - Function()> provide_client_certificates {}; - - template - static JobData create(NonnullRefPtr job) - { - return JobData { - .start = [job](auto& socket) { job->start(socket); }, - .fail = [job](auto error) { job->fail(error); }, - .provide_client_certificates = [job] { - if constexpr (requires { job->on_certificate_requested; }) { - if (job->on_certificate_requested) - return job->on_certificate_requested(); - } else { - // "use" `job`, otherwise clang gets sad. - (void)job; - } - return Vector {}; }, - }; - } - }; using QueueType = Vector; using SocketType = Socket; using StorageType = SocketStorageType; - NonnullOwnPtr> socket; - QueueType request_queue; + OwnPtr> socket; + Threading::RWLockProtected request_queue; NonnullRefPtr removal_timer; + Atomic is_being_started { false }; bool has_started { false }; URL::URL current_url {}; Core::ElapsedTimer timer {}; - JobData job_data {}; + Optional job_data {}; Proxy proxy {}; size_t max_queue_length { 0 }; }; @@ -117,15 +171,16 @@ struct InferredServerProperties { size_t requests_served_per_connection { NumericLimits::max() }; }; -extern HashMap>>>> g_tcp_connection_cache; -extern HashMap>>>> g_tls_connection_cache; -extern HashMap g_inferred_server_properties; +extern Threading::RWLockProtected>>>>> g_tcp_connection_cache; +extern Threading::RWLockProtected>>>>> g_tls_connection_cache; +extern Threading::RWLockProtected> g_inferred_server_properties; void request_did_finish(URL::URL const&, Core::Socket const*); void dump_jobs(); constexpr static size_t MaxConcurrentConnectionsPerURL = 4; -constexpr static size_t ConnectionKeepAliveTimeMilliseconds = 10'000; +constexpr static size_t ConnectionKeepAliveTimeMilliseconds = 20'000; +constexpr static size_t ConnectionCacheQueueHighWatermark = 4; template ErrorOr recreate_socket_if_needed(T& connection, URL::URL const& url) @@ -134,6 +189,7 @@ ErrorOr recreate_socket_if_needed(T& connection, URL::URL const& url) using SocketStorageType = typename T::StorageType; if (!connection.socket->is_open() || connection.socket->is_eof()) { + connection.socket = nullptr; // Create another socket for the connection. auto set_socket = [&](NonnullOwnPtr&& socket) -> ErrorOr { connection.socket = TRY(Core::BufferedSocket::create(move(socket))); @@ -151,52 +207,82 @@ ErrorOr recreate_socket_if_needed(T& connection, URL::URL const& url) else reason = Core::NetworkJob::Error::TransmissionFailed; - if (connection.job_data.fail) - connection.job_data.fail(reason); + if (connection.job_data->fail) + connection.job_data->fail(reason); }); options.set_certificate_provider([&connection]() -> Vector { - if (connection.job_data.provide_client_certificates) - return connection.job_data.provide_client_certificates(); + if (connection.job_data->provide_client_certificates) + return connection.job_data->provide_client_certificates(); return {}; }); TRY(set_socket(TRY((connection.proxy.template tunnel(url, move(options)))))); } else { TRY(set_socket(TRY((connection.proxy.template tunnel(url))))); } - dbgln_if(REQUESTSERVER_DEBUG, "Creating a new socket for {} -> {}", url, connection.socket); + dbgln_if(REQUESTSERVER_DEBUG, "Creating a new socket for {} -> {}", url, connection.socket.ptr()); } return {}; } -decltype(auto) get_or_create_connection(auto& cache, URL::URL const& url, auto job, Core::ProxyData proxy_data = {}) +extern size_t hits; +extern size_t misses; + +template +void start_connection(const URL::URL& url, auto job, auto& sockets_for_url, size_t index, Duration, Cache&); + +void ensure_connection(auto& cache, const URL::URL& url, auto job, Core::ProxyData proxy_data = {}) { - using CacheEntryType = RemoveCVReferencevalue)>; + using CacheEntryType = RemoveCVReference::ProtectedType>().begin()->value)>; auto hostname = url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string(); - auto& properties = g_inferred_server_properties.ensure(hostname); + auto& properties = g_inferred_server_properties.with_write_locked([&](auto& map) -> InferredServerProperties& { return map.ensure(hostname); }); - auto& sockets_for_url = *cache.ensure({ move(hostname), url.port_or_default(), proxy_data }, [] { return make(); }); + auto& sockets_for_url = *cache.with_write_locked([&](auto& map) -> NonnullOwnPtr& { + return map.ensure({ move(hostname), url.port_or_default(), proxy_data }, [] { return make(); }); + }); - Proxy proxy { proxy_data }; - - using ReturnType = decltype(sockets_for_url[0].ptr()); // Find the connection with an empty queue; if none exist, we'll find the least backed-up connection later. // Note that servers that are known to serve a single request per connection (e.g. HTTP/1.0) usually have // issues with concurrent connections, so we'll only allow one connection per URL in that case to avoid issues. // This is a bit too aggressive, but there's no way to know if the server can handle concurrent connections // without trying it out first, and that's not worth the effort as HTTP/1.0 is a legacy protocol anyway. - auto it = sockets_for_url.find_if([&](auto& connection) { return properties.requests_served_per_connection < 2 || connection->request_queue.is_empty(); }); + auto it = sockets_for_url.find_if([&](auto const& connection) { + return properties.requests_served_per_connection < 2 + || connection->request_queue.with_read_locked([](auto const& queue) { return queue.size(); }) <= ConnectionCacheQueueHighWatermark; + }); auto did_add_new_connection = false; auto failed_to_find_a_socket = it.is_end(); - if (failed_to_find_a_socket && sockets_for_url.size() < ConnectionCache::MaxConcurrentConnectionsPerURL) { - using ConnectionType = RemoveCVReferencevalue->at(0))>; + + Proxy proxy { proxy_data }; + size_t index; + + auto timer = Core::ElapsedTimer::start_new(); + if (failed_to_find_a_socket && sockets_for_url.size() < MaxConcurrentConnectionsPerURL) { + using ConnectionType = RemoveCVReference().at(0))>; + auto& connection = cache.with_write_locked([&](auto&) -> ConnectionType& { + index = sockets_for_url.size(); + sockets_for_url.append(AK::make( + nullptr, + typename ConnectionType::QueueType {}, + Core::Timer::create_single_shot(ConnectionKeepAliveTimeMilliseconds, nullptr), + true)); + auto& connection = sockets_for_url.last(); + connection->proxy = move(proxy); + return *connection; + }); + ScopeGuard start_guard = [&] { + connection.is_being_started = false; + }; + dbgln_if(REQUESTSERVER_DEBUG, "I will start a connection ({}) for URL {}", &connection, url); + auto connection_result = proxy.tunnel(url); + misses++; if (connection_result.is_error()) { dbgln("ConnectionCache: Connection to {} failed: {}", url, connection_result.error()); Core::deferred_invoke([job] { job->fail(Core::NetworkJob::Error::ConnectionFailed); }); - return ReturnType { nullptr }; + return; } auto socket_result = Core::BufferedSocket::create(connection_result.release_value()); if (socket_result.is_error()) { @@ -204,25 +290,21 @@ decltype(auto) get_or_create_connection(auto& cache, URL::URL const& url, auto j Core::deferred_invoke([job] { job->fail(Core::NetworkJob::Error::ConnectionFailed); }); - return ReturnType { nullptr }; + return; } - sockets_for_url.append(make( - socket_result.release_value(), - typename ConnectionType::QueueType {}, - Core::Timer::create_single_shot(ConnectionKeepAliveTimeMilliseconds, nullptr))); - sockets_for_url.last()->proxy = move(proxy); did_add_new_connection = true; + connection.socket = socket_result.release_value(); } - size_t index; + + auto elapsed = Duration::from_milliseconds(timer.elapsed_milliseconds()); + if (failed_to_find_a_socket) { - if (did_add_new_connection) { - index = sockets_for_url.size() - 1; - } else { + if (!did_add_new_connection) { // Find the least backed-up connection (based on how many entries are in their request queue). index = 0; auto min_queue_size = (size_t)-1; for (auto it = sockets_for_url.begin(); it != sockets_for_url.end(); ++it) { - if (auto queue_size = (*it)->request_queue.size(); min_queue_size > queue_size) { + if (auto queue_size = (*it)->request_queue.with_read_locked([](auto const& queue) { return queue.size(); }); min_queue_size > queue_size) { index = it.index(); min_queue_size = queue_size; } @@ -230,39 +312,76 @@ decltype(auto) get_or_create_connection(auto& cache, URL::URL const& url, auto j } } else { index = it.index(); + hits++; } + + dbgln_if(REQUESTSERVER_DEBUG, "ConnectionCache: Hits: {}, Misses: {}", RequestServer::ConnectionCache::hits, RequestServer::ConnectionCache::misses); + start_connection(url, job, sockets_for_url, index, elapsed, cache); +} + +template +void start_connection(URL::URL const& url, auto job, auto& sockets_for_url, size_t index, Duration setup_time, Cache& cache) +{ if (sockets_for_url.is_empty()) { Core::deferred_invoke([job] { job->fail(Core::NetworkJob::Error::ConnectionFailed); }); - return ReturnType { nullptr }; + return; } auto& connection = *sockets_for_url[index]; + if (connection.is_being_started) { + // Someone else is creating the connection, queue the job and let them handle it. + dbgln_if(REQUESTSERVER_DEBUG, "Enqueue request for URL {} in {} - {}", url, &connection, connection.socket.ptr()); + auto size = connection.request_queue.with_write_locked([&](auto& queue) { + queue.append(JobData::create(job, url)); + return queue.size(); + }); + connection.max_queue_length = max(connection.max_queue_length, size); + return; + } + if (!connection.has_started) { connection.has_started = true; - Core::deferred_invoke([&connection, url, job] { + Core::deferred_invoke([&connection, &cache, url, job, setup_time] { + (void)setup_time; + auto job_data = JobData::create(job, url); + if constexpr (REQUESTSERVER_DEBUG) { + job_data.timing_info.waiting_in_queue = Duration::from_milliseconds(job_data.timing_info.timer.elapsed_milliseconds()); + job_data.timing_info.timer.start(); + } if (auto result = recreate_socket_if_needed(connection, url); result.is_error()) { - dbgln("ConnectionCache: request failed to start, failed to make a socket: {}", result.error()); + dbgln_if(REQUESTSERVER_DEBUG, "ConnectionCache: request failed to start, failed to make a socket: {}", result.error()); + if constexpr (REQUESTSERVER_DEBUG) { + job_data.timing_info.starting_connection += Duration::from_milliseconds(job_data.timing_info.timer.elapsed_milliseconds()) + setup_time; + job_data.timing_info.timer.start(); + } Core::deferred_invoke([job] { job->fail(Core::NetworkJob::Error::ConnectionFailed); }); } else { - dbgln_if(REQUESTSERVER_DEBUG, "Immediately start request for url {} in {} - {}", url, &connection, connection.socket); - connection.removal_timer->stop(); - connection.timer.start(); - connection.current_url = url; - connection.job_data = decltype(connection.job_data)::create(job); - connection.socket->set_notifications_enabled(true); - connection.job_data.start(*connection.socket); + cache.with_write_locked([&](auto&) { + dbgln_if(REQUESTSERVER_DEBUG, "Immediately start request for url {} in {} - {}", url, &connection, connection.socket.ptr()); + connection.job_data = move(job_data); + if constexpr (REQUESTSERVER_DEBUG) { + connection.job_data->timing_info.starting_connection += Duration::from_milliseconds(connection.job_data->timing_info.timer.elapsed_milliseconds()) + setup_time; + connection.job_data->timing_info.timer.start(); + } + connection.removal_timer->stop(); + connection.timer.start(); + connection.current_url = url; + connection.socket->set_notifications_enabled(true); + connection.job_data->start(*connection.socket); + }); } }); } else { - dbgln_if(REQUESTSERVER_DEBUG, "Enqueue request for URL {} in {} - {}", url, &connection, connection.socket); - connection.request_queue.append(decltype(connection.job_data)::create(job)); - connection.max_queue_length = max(connection.max_queue_length, connection.request_queue.size()); + dbgln_if(REQUESTSERVER_DEBUG, "Enqueue request for URL {} in {} - {}", url, &connection, connection.socket.ptr()); + auto size = connection.request_queue.with_write_locked([&](auto& queue) { + queue.append(JobData::create(job, url)); + return queue.size(); + }); + connection.max_queue_length = max(connection.max_queue_length, size); } - return &connection; } - } diff --git a/Userland/Services/RequestServer/ConnectionFromClient.cpp b/Userland/Services/RequestServer/ConnectionFromClient.cpp index 013147ef16e..53d78a7b681 100644 --- a/Userland/Services/RequestServer/ConnectionFromClient.cpp +++ b/Userland/Services/RequestServer/ConnectionFromClient.cpp @@ -26,121 +26,11 @@ static IDAllocator s_client_ids; ConnectionFromClient::ConnectionFromClient(NonnullOwnPtr socket) : IPC::ConnectionFromClient(*this, move(socket), s_client_ids.allocate()) + , m_thread_pool([this](Work work) { worker_do_work(move(work)); }) { s_connections.set(client_id(), *this); } -void ConnectionFromClient::die() -{ - auto client_id = this->client_id(); - s_connections.remove(client_id); - s_client_ids.deallocate(client_id); - - if (s_connections.is_empty()) - Core::EventLoop::current().quit(0); -} - -Messages::RequestServer::ConnectNewClientResponse ConnectionFromClient::connect_new_client() -{ - int socket_fds[2] {}; - if (auto err = Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, socket_fds); err.is_error()) { - dbgln("Failed to create client socketpair: {}", err.error()); - return IPC::File {}; - } - - auto client_socket_or_error = Core::LocalSocket::adopt_fd(socket_fds[0]); - if (client_socket_or_error.is_error()) { - close(socket_fds[0]); - close(socket_fds[1]); - dbgln("Failed to adopt client socket: {}", client_socket_or_error.error()); - return IPC::File {}; - } - auto client_socket = client_socket_or_error.release_value(); - // Note: A ref is stored in the static s_connections map - auto client = adopt_ref(*new ConnectionFromClient(move(client_socket))); - - return IPC::File::adopt_fd(socket_fds[1]); -} - -Messages::RequestServer::IsSupportedProtocolResponse ConnectionFromClient::is_supported_protocol(ByteString const& protocol) -{ - bool supported = Protocol::find_by_name(protocol.to_lowercase()); - return supported; -} - -void ConnectionFromClient::start_request(i32 request_id, ByteString const& method, URL::URL const& url, HashMap const& request_headers, ByteBuffer const& request_body, Core::ProxyData const& proxy_data) -{ - if (!url.is_valid()) { - dbgln("StartRequest: Invalid URL requested: '{}'", url); - (void)post_message(Messages::RequestClient::RequestFinished(request_id, false, 0)); - return; - } - - auto* protocol = Protocol::find_by_name(url.scheme().to_byte_string()); - if (!protocol) { - dbgln("StartRequest: No protocol handler for URL: '{}'", url); - (void)post_message(Messages::RequestClient::RequestFinished(request_id, false, 0)); - return; - } - auto request = protocol->start_request(request_id, *this, method, url, request_headers, request_body, proxy_data); - if (!request) { - dbgln("StartRequest: Protocol handler failed to start request: '{}'", url); - (void)post_message(Messages::RequestClient::RequestFinished(request_id, false, 0)); - return; - } - auto id = request->id(); - auto fd = request->request_fd(); - m_requests.set(id, move(request)); - (void)post_message(Messages::RequestClient::RequestStarted(request_id, IPC::File::adopt_fd(fd))); -} - -Messages::RequestServer::StopRequestResponse ConnectionFromClient::stop_request(i32 request_id) -{ - auto* request = const_cast(m_requests.get(request_id).value_or(nullptr)); - bool success = false; - if (request) { - request->stop(); - m_requests.remove(request_id); - success = true; - } - return success; -} - -void ConnectionFromClient::did_receive_headers(Badge, Request& request) -{ - auto response_headers = request.response_headers().clone().release_value_but_fixme_should_propagate_errors(); - async_headers_became_available(request.id(), move(response_headers), request.status_code()); -} - -void ConnectionFromClient::did_finish_request(Badge, Request& request, bool success) -{ - if (request.total_size().has_value()) - async_request_finished(request.id(), success, request.total_size().value()); - - m_requests.remove(request.id()); -} - -void ConnectionFromClient::did_progress_request(Badge, Request& request) -{ - async_request_progress(request.id(), request.total_size(), request.downloaded_size()); -} - -void ConnectionFromClient::did_request_certificates(Badge, Request& request) -{ - async_certificate_requested(request.id()); -} - -Messages::RequestServer::SetCertificateResponse ConnectionFromClient::set_certificate(i32 request_id, ByteString const& certificate, ByteString const& key) -{ - auto* request = const_cast(m_requests.get(request_id).value_or(nullptr)); - bool success = false; - if (request) { - request->set_certificate(certificate, key); - success = true; - } - return success; -} - class Job : public RefCounted , public Weakable { public: @@ -183,6 +73,195 @@ private: inline static HashMap> s_jobs {}; }; +template +IterationDecision ConnectionFromClient::Looper::next(Pool& pool, bool wait) +{ + bool should_exit = false; + auto timer = Core::Timer::create_repeating(100, [&] { + if (Threading::ThreadPoolLooper::next(pool, false) == IterationDecision::Break) { + event_loop.quit(0); + should_exit = true; + } + }); + + timer->start(); + if (!wait) { + event_loop.deferred_invoke([&] { + event_loop.quit(0); + }); + } + + event_loop.exec(); + + if (should_exit) + return IterationDecision::Break; + return IterationDecision::Continue; +} + +void ConnectionFromClient::worker_do_work(Work work) +{ + work.visit( + [&](StartRequest& start_request) { + auto* protocol = Protocol::find_by_name(start_request.url.scheme().to_byte_string()); + if (!protocol) { + dbgln("StartRequest: No protocol handler for URL: '{}'", start_request.url); + (void)post_message(Messages::RequestClient::RequestFinished(start_request.request_id, false, 0)); + return; + } + auto request = protocol->start_request(start_request.request_id, *this, start_request.method, start_request.url, start_request.request_headers, start_request.request_body, start_request.proxy_data); + if (!request) { + dbgln("StartRequest: Protocol handler failed to start request: '{}'", start_request.url); + (void)post_message(Messages::RequestClient::RequestFinished(start_request.request_id, false, 0)); + return; + } + auto id = request->id(); + auto fd = request->request_fd(); + m_requests.with_locked([&](auto& map) { map.set(id, move(request)); }); + (void)post_message(Messages::RequestClient::RequestStarted(start_request.request_id, IPC::File::adopt_fd(fd))); + }, + [&](EnsureConnection& ensure_connection) { + auto& url = ensure_connection.url; + auto& cache_level = ensure_connection.cache_level; + + if (cache_level == CacheLevel::ResolveOnly) { + Core::deferred_invoke([host = url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string()] { + dbgln("EnsureConnection: DNS-preload for {}", host); + auto resolved_host = Core::Socket::resolve_host(host, Core::Socket::SocketType::Stream); + if (resolved_host.is_error()) + dbgln("EnsureConnection: DNS-preload failed for {}", host); + }); + dbgln("EnsureConnection: DNS-preload for {} done", url); + return; + } + + auto job = Job::ensure(url); + dbgln("EnsureConnection: Pre-connect to {}", url); + auto do_preconnect = [&](auto& cache) { + ConnectionCache::ensure_connection(cache, url, job); + }; + + if (url.scheme() == "http"sv) + do_preconnect(ConnectionCache::g_tcp_connection_cache); + else if (url.scheme() == "https"sv) + do_preconnect(ConnectionCache::g_tls_connection_cache); + else + dbgln("EnsureConnection: Invalid URL scheme: '{}'", url.scheme()); + }, + [&](Empty) {}); +} + +void ConnectionFromClient::die() +{ + auto client_id = this->client_id(); + s_connections.remove(client_id); + s_client_ids.deallocate(client_id); + + if (s_connections.is_empty()) + Core::EventLoop::current().quit(0); +} + +Messages::RequestServer::ConnectNewClientResponse ConnectionFromClient::connect_new_client() +{ + int socket_fds[2] {}; + if (auto err = Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, socket_fds); err.is_error()) { + dbgln("Failed to create client socketpair: {}", err.error()); + return IPC::File {}; + } + + auto client_socket_or_error = Core::LocalSocket::adopt_fd(socket_fds[0]); + if (client_socket_or_error.is_error()) { + close(socket_fds[0]); + close(socket_fds[1]); + dbgln("Failed to adopt client socket: {}", client_socket_or_error.error()); + return IPC::File {}; + } + auto client_socket = client_socket_or_error.release_value(); + // Note: A ref is stored in the static s_connections map + auto client = adopt_ref(*new ConnectionFromClient(move(client_socket))); + + return IPC::File::adopt_fd(socket_fds[1]); +} + +void ConnectionFromClient::enqueue(Work work) +{ + m_thread_pool.submit(move(work)); +} + +Messages::RequestServer::IsSupportedProtocolResponse ConnectionFromClient::is_supported_protocol(ByteString const& protocol) +{ + bool supported = Protocol::find_by_name(protocol.to_lowercase()); + return supported; +} + +void ConnectionFromClient::start_request(i32 request_id, ByteString const& method, URL::URL const& url, HashMap const& request_headers, ByteBuffer const& request_body, Core::ProxyData const& proxy_data) +{ + if (!url.is_valid()) { + dbgln("StartRequest: Invalid URL requested: '{}'", url); + (void)post_message(Messages::RequestClient::RequestFinished(request_id, false, 0)); + return; + } + + enqueue(StartRequest { + .request_id = request_id, + .method = method, + .url = url, + .request_headers = request_headers, + .request_body = request_body, + .proxy_data = proxy_data, + }); +} + +Messages::RequestServer::StopRequestResponse ConnectionFromClient::stop_request(i32 request_id) +{ + return m_requests.with_locked([&](auto& map) { + auto* request = const_cast(map.get(request_id).value_or(nullptr)); + bool success = false; + if (request) { + request->stop(); + map.remove(request_id); + success = true; + } + return success; + }); +} + +void ConnectionFromClient::did_receive_headers(Badge, Request& request) +{ + auto response_headers = request.response_headers().clone().release_value_but_fixme_should_propagate_errors(); + async_headers_became_available(request.id(), move(response_headers), request.status_code()); +} + +void ConnectionFromClient::did_finish_request(Badge, Request& request, bool success) +{ + if (request.total_size().has_value()) + async_request_finished(request.id(), success, request.total_size().value()); + + m_requests.with_locked([&](auto& map) { map.remove(request.id()); }); +} + +void ConnectionFromClient::did_progress_request(Badge, Request& request) +{ + async_request_progress(request.id(), request.total_size(), request.downloaded_size()); +} + +void ConnectionFromClient::did_request_certificates(Badge, Request& request) +{ + async_certificate_requested(request.id()); +} + +Messages::RequestServer::SetCertificateResponse ConnectionFromClient::set_certificate(i32 request_id, ByteString const& certificate, ByteString const& key) +{ + return m_requests.with_locked([&](auto& map) { + auto* request = const_cast(map.get(request_id).value_or(nullptr)); + bool success = false; + if (request) { + request->set_certificate(certificate, key); + success = true; + } + return success; + }); +} + void ConnectionFromClient::ensure_connection(URL::URL const& url, ::RequestServer::CacheLevel const& cache_level) { if (!url.is_valid()) { @@ -190,30 +269,10 @@ void ConnectionFromClient::ensure_connection(URL::URL const& url, ::RequestServe return; } - if (cache_level == CacheLevel::ResolveOnly) { - return Core::deferred_invoke([host = url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string()] { - dbgln("EnsureConnection: DNS-preload for {}", host); - auto resolved_host = Core::Socket::resolve_host(host, Core::Socket::SocketType::Stream); - if (resolved_host.is_error()) - dbgln("EnsureConnection: DNS-preload failed for {}", host); - }); - } - - auto job = Job::ensure(url); - dbgln("EnsureConnection: Pre-connect to {}", url); - auto do_preconnect = [&](auto& cache) { - auto serialized_host = url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string(); - auto it = cache.find({ serialized_host, url.port_or_default() }); - if (it == cache.end() || it->value->is_empty()) - ConnectionCache::get_or_create_connection(cache, url, job); - }; - - if (url.scheme() == "http"sv) - do_preconnect(ConnectionCache::g_tcp_connection_cache); - else if (url.scheme() == "https"sv) - do_preconnect(ConnectionCache::g_tls_connection_cache); - else - dbgln("EnsureConnection: Invalid URL scheme: '{}'", url.scheme()); + enqueue(EnsureConnection { + .url = url, + .cache_level = cache_level, + }); } static i32 s_next_websocket_id = 1; diff --git a/Userland/Services/RequestServer/ConnectionFromClient.h b/Userland/Services/RequestServer/ConnectionFromClient.h index 709d5b3b6e3..f1f398e594a 100644 --- a/Userland/Services/RequestServer/ConnectionFromClient.h +++ b/Userland/Services/RequestServer/ConnectionFromClient.h @@ -7,7 +7,10 @@ #pragma once #include +#include #include +#include +#include #include #include #include @@ -46,8 +49,36 @@ private: virtual void websocket_close(i32, u16, ByteString const&) override; virtual Messages::RequestServer::WebsocketSetCertificateResponse websocket_set_certificate(i32, ByteString const&, ByteString const&) override; - HashMap> m_requests; + struct StartRequest { + i32 request_id; + ByteString method; + URL::URL url; + HashMap request_headers; + ByteBuffer request_body; + Core::ProxyData proxy_data; + }; + + struct EnsureConnection { + URL::URL url; + CacheLevel cache_level; + }; + + using Work = Variant; + + void worker_do_work(Work); + + Threading::MutexProtected>> m_requests; HashMap> m_websockets; + + void enqueue(Work); + + template + struct Looper : public Threading::ThreadPoolLooper { + IterationDecision next(Pool& pool, bool wait); + Core::EventLoop event_loop; + }; + + Threading::ThreadPool m_thread_pool; }; } diff --git a/Userland/Services/RequestServer/GeminiProtocol.cpp b/Userland/Services/RequestServer/GeminiProtocol.cpp index 9c79d8ed3ee..eba6f7a7955 100644 --- a/Userland/Services/RequestServer/GeminiProtocol.cpp +++ b/Userland/Services/RequestServer/GeminiProtocol.cpp @@ -32,7 +32,7 @@ OwnPtr GeminiProtocol::start_request(i32 request_id, ConnectionFromClie protocol_request->set_request_fd(pipe_result.value().read_fd); Core::EventLoop::current().deferred_invoke([=] { - ConnectionCache::get_or_create_connection(ConnectionCache::g_tls_connection_cache, url, job, proxy_data); + ConnectionCache::ensure_connection(ConnectionCache::g_tls_connection_cache, url, job, proxy_data); }); return protocol_request; diff --git a/Userland/Services/RequestServer/HttpCommon.h b/Userland/Services/RequestServer/HttpCommon.h index 386991012a0..3323dc6274a 100644 --- a/Userland/Services/RequestServer/HttpCommon.h +++ b/Userland/Services/RequestServer/HttpCommon.h @@ -102,10 +102,12 @@ OwnPtr start_request(TBadgedProtocol&& protocol, i32 request_id, Connec auto protocol_request = TRequest::create_with_job(forward(protocol), client, (TJob&)*job, move(output_stream), request_id); protocol_request->set_request_fd(pipe_result.value().read_fd); - if constexpr (IsSame) - ConnectionCache::get_or_create_connection(ConnectionCache::g_tls_connection_cache, url, job, proxy_data); - else - ConnectionCache::get_or_create_connection(ConnectionCache::g_tcp_connection_cache, url, job, proxy_data); + Core::deferred_invoke([=] { + if constexpr (IsSame) + ConnectionCache::ensure_connection(ConnectionCache::g_tls_connection_cache, url, job, proxy_data); + else + ConnectionCache::ensure_connection(ConnectionCache::g_tcp_connection_cache, url, job, proxy_data); + }); return protocol_request; } diff --git a/Userland/Services/RequestServer/main.cpp b/Userland/Services/RequestServer/main.cpp index 17b2234f86b..823c296d1bd 100644 --- a/Userland/Services/RequestServer/main.cpp +++ b/Userland/Services/RequestServer/main.cpp @@ -20,18 +20,18 @@ ErrorOr serenity_main(Main::Arguments) { if constexpr (TLS_SSL_KEYLOG_DEBUG) - TRY(Core::System::pledge("stdio inet accept unix cpath wpath rpath sendfd recvfd sigaction")); + TRY(Core::System::pledge("stdio inet accept thread unix cpath wpath rpath sendfd recvfd sigaction")); else - TRY(Core::System::pledge("stdio inet accept unix rpath sendfd recvfd sigaction")); + TRY(Core::System::pledge("stdio inet accept thread unix rpath sendfd recvfd sigaction")); #ifdef SIGINFO signal(SIGINFO, [](int) { RequestServer::ConnectionCache::dump_jobs(); }); #endif if constexpr (TLS_SSL_KEYLOG_DEBUG) - TRY(Core::System::pledge("stdio inet accept unix cpath wpath rpath sendfd recvfd")); + TRY(Core::System::pledge("stdio inet accept thread unix cpath wpath rpath sendfd recvfd")); else - TRY(Core::System::pledge("stdio inet accept unix rpath sendfd recvfd")); + TRY(Core::System::pledge("stdio inet accept thread unix rpath sendfd recvfd")); // Ensure the certificates are read out here. // FIXME: Allow specifying extra certificates on the command line, or in other configuration. @@ -40,6 +40,7 @@ ErrorOr serenity_main(Main::Arguments) Core::EventLoop event_loop; // FIXME: Establish a connection to LookupServer and then drop "unix"? TRY(Core::System::unveil("/tmp/portal/lookup", "rw")); + TRY(Core::System::unveil("/etc/cacert.pem", "rw")); TRY(Core::System::unveil("/etc/timezone", "r")); if constexpr (TLS_SSL_KEYLOG_DEBUG) TRY(Core::System::unveil("/home/anon", "rwc"));