Skip to content

Commit

Permalink
Optimize connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkusSintonen committed Jun 13, 2024
1 parent e987df2 commit 0694653
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 66 deletions.
40 changes: 21 additions & 19 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
those connections to be handled seperately.
"""
closing_connections = []
idling_count = 0

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
Expand All @@ -249,27 +250,25 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and len([connection.is_idle() for connection in self._connections])
> self._max_keepalive_connections
):
elif connection.is_idle():
if idling_count < self._max_keepalive_connections:
idling_count += 1
continue
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
for pool_request in list(self._requests):
if not pool_request.is_queued():
continue

origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]

# There are three cases for how we may be able to handle the request:
#
Expand All @@ -286,15 +285,18 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
idling_connection = next(
(c for c in self._connections if c.is_idle()), None
)
if idling_connection is not None:
# log: "closing idle connection"
self._connections.remove(idling_connection)
closing_connections.append(idling_connection)
# log: "creating new connection"
new_connection = self.create_connection(origin)
self._connections.append(new_connection)
pool_request.assign_to_connection(new_connection)

return closing_connections

Expand Down
31 changes: 25 additions & 6 deletions httpcore/_async/http11.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import enum
import logging
import random
import ssl
import time
from types import TracebackType
Expand Down Expand Up @@ -56,10 +57,12 @@ def __init__(
origin: Origin,
stream: AsyncNetworkStream,
keepalive_expiry: Optional[float] = None,
socket_poll_interval_between: Tuple[float, float] = (1, 3),
) -> None:
self._origin = origin
self._network_stream = stream
self._keepalive_expiry: Optional[float] = keepalive_expiry
self._keepalive_expiry = keepalive_expiry
self._socket_poll_interval_between = socket_poll_interval_between
self._expire_at: Optional[float] = None
self._state = HTTPConnectionState.NEW
self._state_lock = AsyncLock()
Expand All @@ -68,6 +71,8 @@ def __init__(
our_role=h11.CLIENT,
max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
)
# Assuming we were just connected
self._network_stream_used_at = time.monotonic()

async def handle_async_request(self, request: Request) -> Response:
if not self.can_handle_request(request.url.origin):
Expand Down Expand Up @@ -173,6 +178,7 @@ async def _send_event(
bytes_to_send = self._h11_state.send(event)
if bytes_to_send is not None:
await self._network_stream.write(bytes_to_send, timeout=timeout)
self._network_stream_used_at = time.monotonic()

# Receiving the response...

Expand Down Expand Up @@ -224,6 +230,7 @@ async def _receive_event(
data = await self._network_stream.read(
self.READ_NUM_BYTES, timeout=timeout
)
self._network_stream_used_at = time.monotonic()

# If we feed this case through h11 we'll raise an exception like:
#
Expand Down Expand Up @@ -281,16 +288,28 @@ def is_available(self) -> bool:
def has_expired(self) -> bool:
now = time.monotonic()
keepalive_expired = self._expire_at is not None and now > self._expire_at
if keepalive_expired:
return True

# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
# Checking the readable status is relatively expensive so check it at a lower frequency.
if (now - self._network_stream_used_at) > self._socket_poll_interval():
self._network_stream_used_at = now
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if server_disconnected:
return True

return False

return keepalive_expired or server_disconnected
def _socket_poll_interval(self) -> float:
# Randomize to avoid polling for all the connections at once
low, high = self._socket_poll_interval_between
return random.uniform(low, high)

def is_idle(self) -> bool:
return self._state == HTTPConnectionState.IDLE
Expand Down
40 changes: 21 additions & 19 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
those connections to be handled seperately.
"""
closing_connections = []
idling_count = 0

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
Expand All @@ -249,27 +250,25 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and len([connection.is_idle() for connection in self._connections])
> self._max_keepalive_connections
):
elif connection.is_idle():
if idling_count < self._max_keepalive_connections:
idling_count += 1
continue
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)

# Assign queued requests to connections.
queued_requests = [request for request in self._requests if request.is_queued()]
for pool_request in queued_requests:
for pool_request in list(self._requests):
if not pool_request.is_queued():
continue

origin = pool_request.request.url.origin
available_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]

# There are three cases for how we may be able to handle the request:
#
Expand All @@ -286,15 +285,18 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
idling_connection = next(
(c for c in self._connections if c.is_idle()), None
)
if idling_connection is not None:
# log: "closing idle connection"
self._connections.remove(idling_connection)
closing_connections.append(idling_connection)
# log: "creating new connection"
new_connection = self.create_connection(origin)
self._connections.append(new_connection)
pool_request.assign_to_connection(new_connection)

return closing_connections

Expand Down
31 changes: 25 additions & 6 deletions httpcore/_sync/http11.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import enum
import logging
import random
import ssl
import time
from types import TracebackType
Expand Down Expand Up @@ -56,10 +57,12 @@ def __init__(
origin: Origin,
stream: NetworkStream,
keepalive_expiry: Optional[float] = None,
socket_poll_interval_between: Tuple[float, float] = (1, 3),
) -> None:
self._origin = origin
self._network_stream = stream
self._keepalive_expiry: Optional[float] = keepalive_expiry
self._keepalive_expiry = keepalive_expiry
self._socket_poll_interval_between = socket_poll_interval_between
self._expire_at: Optional[float] = None
self._state = HTTPConnectionState.NEW
self._state_lock = Lock()
Expand All @@ -68,6 +71,8 @@ def __init__(
our_role=h11.CLIENT,
max_incomplete_event_size=self.MAX_INCOMPLETE_EVENT_SIZE,
)
# Assuming we were just connected
self._network_stream_used_at = time.monotonic()

def handle_request(self, request: Request) -> Response:
if not self.can_handle_request(request.url.origin):
Expand Down Expand Up @@ -173,6 +178,7 @@ def _send_event(
bytes_to_send = self._h11_state.send(event)
if bytes_to_send is not None:
self._network_stream.write(bytes_to_send, timeout=timeout)
self._network_stream_used_at = time.monotonic()

# Receiving the response...

Expand Down Expand Up @@ -224,6 +230,7 @@ def _receive_event(
data = self._network_stream.read(
self.READ_NUM_BYTES, timeout=timeout
)
self._network_stream_used_at = time.monotonic()

# If we feed this case through h11 we'll raise an exception like:
#
Expand Down Expand Up @@ -281,16 +288,28 @@ def is_available(self) -> bool:
def has_expired(self) -> bool:
now = time.monotonic()
keepalive_expired = self._expire_at is not None and now > self._expire_at
if keepalive_expired:
return True

# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
# Checking the readable status is relatively expensive so check it at a lower frequency.
if (now - self._network_stream_used_at) > self._socket_poll_interval():
self._network_stream_used_at = now
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if server_disconnected:
return True

return False

return keepalive_expired or server_disconnected
def _socket_poll_interval(self) -> float:
# Randomize to avoid polling for all the connections at once
low, high = self._socket_poll_interval_between
return random.uniform(low, high)

def is_idle(self) -> bool:
return self._state == HTTPConnectionState.IDLE
Expand Down
12 changes: 6 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
# Docs
mkdocs==1.6.0
mkdocs-autorefs==1.0.1
mkdocs-material==9.5.20
mkdocs-material==9.5.25
mkdocs-material-extensions==1.3.1
mkdocstrings[python-legacy]==0.25.0
mkdocstrings[python-legacy]==0.25.1
jinja2==3.1.4

# Packaging
build==1.2.1
twine==5.0.0
twine==5.1.0

# Tests & Linting
coverage[toml]==7.5.0
ruff==0.4.2
coverage[toml]==7.5.3
ruff==0.4.7
mypy==1.10.0
trio-typing==0.10.0
pytest==8.2.0
pytest==8.2.1
pytest-httpbin==2.0.0
pytest-trio==0.8.0
werkzeug<2.1 # See: https://github.com/psf/httpbin/issues/35
Loading

0 comments on commit 0694653

Please sign in to comment.