Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize connection pool implementation #924

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
those connections to be handled seperately.
"""
closing_connections = []
idling_count = 0

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
Expand All @@ -249,27 +250,25 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and len([connection.is_idle() for connection in self._connections])
> self._max_keepalive_connections
):
elif connection.is_idle():
if idling_count < self._max_keepalive_connections:
idling_count += 1
continue
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
for pool_request in list(self._requests):
if not pool_request.is_queued():
continue

origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]

# There are three cases for how we may be able to handle the request:
#
Expand All @@ -286,15 +285,18 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
idling_connection = next(
(c for c in self._connections if c.is_idle()), None
)
if idling_connection is not None:
# log: "closing idle connection"
self._connections.remove(idling_connection)
closing_connections.append(idling_connection)
# log: "creating new connection"
new_connection = self.create_connection(origin)
self._connections.append(new_connection)
pool_request.assign_to_connection(new_connection)

return closing_connections

Expand Down
31 changes: 25 additions & 6 deletions httpcore/_async/http11.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import enum
import logging
import random
import ssl
import time
from types import TracebackType
Expand Down Expand Up @@ -56,10 +57,12 @@ def __init__(
origin: Origin,
stream: AsyncNetworkStream,
keepalive_expiry: Optional[float] = None,
socket_poll_interval_between: Tuple[float, float] = (1, 3),
) -> None:
self._origin = origin
self._network_stream = stream
self._keepalive_expiry: Optional[float] = keepalive_expiry
self._keepalive_expiry = keepalive_expiry
self._socket_poll_interval_between = socket_poll_interval_between
self._expire_at: Optional[float] = None
self._state = HTTPConnectionState.NEW
self._state_lock = AsyncLock()
Expand All @@ -68,6 +71,8 @@ def __init__(
our_role=h11.CLIENT,
max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
)
# Assuming we were just connected
self._network_stream_used_at = time.monotonic()

async def handle_async_request(self, request: Request) -> Response:
if not self.can_handle_request(request.url.origin):
Expand Down Expand Up @@ -173,6 +178,7 @@ async def _send_event(
bytes_to_send = self._h11_state.send(event)
if bytes_to_send is not None:
await self._network_stream.write(bytes_to_send, timeout=timeout)
self._network_stream_used_at = time.monotonic()

# Receiving the response...

Expand Down Expand Up @@ -224,6 +230,7 @@ async def _receive_event(
data = await self._network_stream.read(
self.READ_NUM_BYTES, timeout=timeout
)
self._network_stream_used_at = time.monotonic()

# If we feed this case through h11 we'll raise an exception like:
#
Expand Down Expand Up @@ -281,16 +288,28 @@ def is_available(self) -> bool:
def has_expired(self) -> bool:
now = time.monotonic()
keepalive_expired = self._expire_at is not None and now > self._expire_at
if keepalive_expired:
return True

# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
# Checking the readable status is relatively expensive so check it at a lower frequency.
if (now - self._network_stream_used_at) > self._socket_poll_interval():
self._network_stream_used_at = now
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if server_disconnected:
return True

return False
Comment on lines +297 to +307
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm not happy with interval calculating.

Could we improve is_readable instead? FWIW I noticed anyio and sync backend use httpcore._utils.is_socket_readable while trio backend uses its own, @MarkusSintonen is there any benchmark difference when you switch backend to trio?

For improving is_readable in get_extra_info:

  • Always assume it is readable and turn it to false by specified events such as received close socket from server.
  • Use synchronized Event on readability status change.

I didn't go deep to search these cases are possible or not. They are only my opinions 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trio also suffers badly from the constantly happening socket polling.

Without intervalled socket polling:
old_trio

With intervalled socket polling:
new_trio

So in trio its over 5x slower when constantly doing the socket polling.

Copy link
Contributor Author

@MarkusSintonen MarkusSintonen Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For improving is_readable in get_extra_info:
Always assume it is readable and turn it to false by specified events such as received close socket from server.

Im not aware of anyway to to get events about socket getting closed. As far as I know the only way to know it is to use the socket. 🤔 But I agree the is_readable could be better so its not so heavy weight. We could make it just a flag based so in networking side we just set some boolean flag when we detect a network error on usage. This has a downside that we have greater probability of giving out already broken connections from the pool. But as far as I know this is how its usually done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other options would be to move the intervalled polling into the network backends side. Or get even more elaborate and run the socket polling in a specific interval via loop.call_later which run the actual poll via loop.run_in_executor to avoid any possible nonasync socket IO in the async land. Gets easily hairy 😄


return keepalive_expired or server_disconnected
def _socket_poll_interval(self) -> float:
# Randomize to avoid polling for all the connections at once
low, high = self._socket_poll_interval_between
return random.uniform(low, high)

def is_idle(self) -> bool:
return self._state == HTTPConnectionState.IDLE
Expand Down
40 changes: 21 additions & 19 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
those connections to be handled seperately.
"""
closing_connections = []
idling_count = 0

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
Expand All @@ -249,27 +250,25 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and len([connection.is_idle() for connection in self._connections])
> self._max_keepalive_connections
):
elif connection.is_idle():
if idling_count < self._max_keepalive_connections:
idling_count += 1
continue
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
for pool_request in list(self._requests):
if not pool_request.is_queued():
continue

origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]

# There are three cases for how we may be able to handle the request:
#
Expand All @@ -286,15 +285,18 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
idling_connection = next(
(c for c in self._connections if c.is_idle()), None
)
if idling_connection is not None:
# log: "closing idle connection"
self._connections.remove(idling_connection)
closing_connections.append(idling_connection)
# log: "creating new connection"
new_connection = self.create_connection(origin)
self._connections.append(new_connection)
pool_request.assign_to_connection(new_connection)

return closing_connections

Expand Down
31 changes: 25 additions & 6 deletions httpcore/_sync/http11.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import enum
import logging
import random
import ssl
import time
from types import TracebackType
Expand Down Expand Up @@ -56,10 +57,12 @@ def __init__(
origin: Origin,
stream: NetworkStream,
keepalive_expiry: Optional[float] = None,
socket_poll_interval_between: Tuple[float, float] = (1, 3),
) -> None:
self._origin = origin
self._network_stream = stream
self._keepalive_expiry: Optional[float] = keepalive_expiry
self._keepalive_expiry = keepalive_expiry
self._socket_poll_interval_between = socket_poll_interval_between
self._expire_at: Optional[float] = None
self._state = HTTPConnectionState.NEW
self._state_lock = Lock()
Expand All @@ -68,6 +71,8 @@ def __init__(
our_role=h11.CLIENT,
max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
)
# Assuming we were just connected
self._network_stream_used_at = time.monotonic()

def handle_request(self, request: Request) -> Response:
if not self.can_handle_request(request.url.origin):
Expand Down Expand Up @@ -173,6 +178,7 @@ def _send_event(
bytes_to_send = self._h11_state.send(event)
if bytes_to_send is not None:
self._network_stream.write(bytes_to_send, timeout=timeout)
self._network_stream_used_at = time.monotonic()

# Receiving the response...

Expand Down Expand Up @@ -224,6 +230,7 @@ def _receive_event(
data = self._network_stream.read(
self.READ_NUM_BYTES, timeout=timeout
)
self._network_stream_used_at = time.monotonic()

# If we feed this case through h11 we'll raise an exception like:
#
Expand Down Expand Up @@ -281,16 +288,28 @@ def is_available(self) -> bool:
def has_expired(self) -> bool:
now = time.monotonic()
keepalive_expired = self._expire_at is not None and now > self._expire_at
if keepalive_expired:
return True

# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
# Checking the readable status is relatively expensive so check it at a lower frequency.
if (now - self._network_stream_used_at) > self._socket_poll_interval():
self._network_stream_used_at = now
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if server_disconnected:
return True

return False

return keepalive_expired or server_disconnected
def _socket_poll_interval(self) -> float:
# Randomize to avoid polling for all the connections at once
low, high = self._socket_poll_interval_between
return random.uniform(low, high)

def is_idle(self) -> bool:
return self._state == HTTPConnectionState.IDLE
Expand Down
Loading
Loading