feat(providers): retry all upstream 5xx like 429

Generalize retryable_upstream_status to HTTP 500-599 plus 429.
Improve execute_with_retry log label with actual status.
Preserve OpenAI InternalServerError 5xx in map_error after overload check.
Unify native stream send: log via _raise_for_status then optional async aclose.
Add _maybe_await_aclose for httpx doubles in tests.
Rename openai compat retry tests to 5xx; parameterize native/limiter/mapping tests.
This commit is contained in:
Alishahryar1
2026-05-10 19:56:14 -07:00
parent 21ff2137ef
commit 41f2bc71a0
7 changed files with 153 additions and 70 deletions
+16 -8
View File
@@ -2,6 +2,7 @@
from __future__ import annotations
import inspect
from collections.abc import AsyncIterator, Iterator
from typing import Any, Literal
@@ -38,6 +39,16 @@ from providers.rate_limit import GlobalRateLimiter
StreamChunkMode = Literal["line", "event"]
async def _maybe_await_aclose(response: Any) -> None:
"""Call ``aclose`` on httpx-like responses; ignore non-async test doubles."""
close = getattr(response, "aclose", None)
if not callable(close):
return
result = close()
if inspect.isawaitable(result):
await result
def _model_list_json(response: httpx.Response, *, provider_name: str) -> Any:
response.raise_for_status()
try:
@@ -96,7 +107,7 @@ class AnthropicMessagesTransport(BaseProvider):
payload = _model_list_json(response, provider_name=self._provider_name)
return self._extract_model_infos_from_model_list_payload(payload)
finally:
await response.aclose()
await _maybe_await_aclose(response)
async def _send_model_list_request(self) -> httpx.Response:
"""Query the provider endpoint that advertises available model ids."""
@@ -360,17 +371,14 @@ class AnthropicMessagesTransport(BaseProvider):
try:
async def _validated_stream_send() -> httpx.Response:
"""Send request; raise inside retry loop on 429/503 so limiter can backoff."""
"""Send request; retries apply to 429/5xx raises after structured logging."""
send_response = await self._send_stream_request(body)
if send_response.status_code in (429, 503):
await send_response.aclose()
send_response.raise_for_status()
if send_response.status_code != 200:
try:
await self._raise_for_status(send_response, req_tag=req_tag)
finally:
if not send_response.is_closed:
await send_response.aclose()
await _maybe_await_aclose(send_response)
return send_response
response = await self._global_rate_limiter.execute_with_retry(
@@ -409,7 +417,7 @@ class AnthropicMessagesTransport(BaseProvider):
error_message = self._get_error_message(error, request_id)
if response is not None and not response.is_closed:
await response.aclose()
await _maybe_await_aclose(response)
trace_event(
stage="provider",
@@ -441,4 +449,4 @@ class AnthropicMessagesTransport(BaseProvider):
return
finally:
if response is not None and not response.is_closed:
await response.aclose()
await _maybe_await_aclose(response)
+7 -7
View File
@@ -51,15 +51,15 @@ def map_error(
if isinstance(e, openai.InternalServerError):
raw_message = str(e)
sdk_status = getattr(e, "status_code", None)
if sdk_status == 503:
stable = APIError("_", status_code=503)
return APIError(
get_user_facing_error_message(stable),
status_code=503,
raw_error=str(e),
)
if "overloaded" in raw_message.lower() or "capacity" in raw_message.lower():
return OverloadedError(message, raw_error=raw_message)
if isinstance(sdk_status, int) and 500 <= sdk_status <= 599:
stable = APIError("_", status_code=sdk_status)
return APIError(
get_user_facing_error_message(stable),
status_code=sdk_status,
raw_error=str(e),
)
return APIError(message, status_code=500, raw_error=str(e))
if isinstance(e, openai.APIError):
return APIError(
+14 -9
View File
@@ -17,23 +17,28 @@ from core.trace import trace_event
T = TypeVar("T")
def _upstream_http_retryable(code: int) -> bool:
"""True for rate limit / upstream server failures that should backoff-retry."""
return code == 429 or 500 <= code <= 599
def retryable_upstream_status(exc: BaseException) -> int | None:
"""Return HTTP-like status codes that qualify for reactive backoff retries.
``429`` and ``503`` use the same exponential backoff plus scoped limiter
blocking semantics as today's rate-limit path.
``429`` plus any upstream ``5xx`` use the same exponential backoff and scoped
limiter blocking semantics as today's rate-limit path.
"""
if isinstance(exc, openai.RateLimitError):
return 429
if isinstance(exc, httpx.HTTPStatusError):
status = exc.response.status_code
if status in (429, 503):
if _upstream_http_retryable(status):
return status
return None
if isinstance(exc, openai.APIError):
status = getattr(exc, "status_code", None)
if isinstance(status, int) and status == 503:
return 503
if isinstance(status, int) and 500 <= status <= 599:
return status
return None
return None
@@ -48,7 +53,7 @@ class GlobalRateLimiter:
may be open simultaneously, independent of the sliding window.
Proactive limits - throttles requests to stay within API limits.
Reactive limits - pauses all requests when a 429 or 503 retry backoff is active.
Reactive limits - pauses all requests when a 429 or 5xx retry backoff is active.
Concurrency limit - caps simultaneously open streams.
"""
@@ -151,7 +156,7 @@ class GlobalRateLimiter:
Returns:
True if was reactively blocked and waited, False otherwise.
"""
# 1. Reactive check: Wait if someone hit a 429
# 1. Reactive check: Wait if someone hit a reactive backoff (429/5xx retries)
waited_reactively = False
now = time.monotonic()
if now < self._blocked_until:
@@ -228,7 +233,7 @@ class GlobalRateLimiter:
"""Execute an async callable with rate limiting and retry on transient limits.
Waits for the proactive limiter before each attempt. On ``429`` (rate limit)
or ``503`` (service unavailable), applies exponential backoff with jitter
or upstream ``5xx`` server errors, applies exponential backoff with jitter
and sets the reactive block before retrying.
Args:
@@ -260,7 +265,7 @@ class GlobalRateLimiter:
label = (
"Rate limited (429)"
if status == 429
else "Upstream unavailable (503)"
else f"Upstream server error ({status})"
)
last_exc = e
if attempt >= max_retries:
@@ -1,4 +1,4 @@
"""Native Anthropic transport: HTTP 429 and 503 are retried inside execute_with_retry."""
"""Native Anthropic transport: HTTP 429 and upstream 5xx are retried inside execute_with_retry."""
from contextlib import asynccontextmanager
from unittest.mock import AsyncMock, MagicMock, patch
@@ -80,9 +80,12 @@ async def test_native_stream_retries_on_http_429_then_streams(provider_config):
GlobalRateLimiter.reset_instance()
@pytest.mark.parametrize("status_code", [500, 502, 503, 504])
@pytest.mark.asyncio
async def test_native_stream_retries_on_http_503_then_streams(provider_config):
"""First response 503 (closed), second 200 streams; send is called twice."""
async def test_native_stream_retries_on_http_5xx_then_streams(
provider_config, status_code
):
"""First response is retryable 5xx (closed); second 200 streams; send twice."""
GlobalRateLimiter.reset_instance()
try:
provider = NativeProvider(provider_config)
@@ -94,14 +97,14 @@ async def test_native_stream_retries_on_http_503_then_streams(provider_config):
"",
]
ok_response = FakeResponse(lines=ok_lines)
unavailable = FakeResponse(status_code=503, text="Service Unavailable")
bad = FakeResponse(status_code=status_code, text="upstream error")
send_calls = {"n": 0}
async def send_side_effect(*_a, **_kw):
send_calls["n"] += 1
if send_calls["n"] == 1:
return unavailable
return bad
return ok_response
with (
@@ -120,7 +123,7 @@ async def test_native_stream_retries_on_http_503_then_streams(provider_config):
events = [e async for e in provider.stream_response(req)]
assert send_calls["n"] == 2
assert unavailable.is_closed
assert bad.is_closed
assert ok_response.is_closed
assert events == [
"event: message_start\n",
@@ -131,9 +134,18 @@ async def test_native_stream_retries_on_http_503_then_streams(provider_config):
GlobalRateLimiter.reset_instance()
@pytest.mark.parametrize(
("status_code", "substr"),
[
(500, "Provider API request failed"),
(502, "Provider is currently overloaded"),
(503, "Provider is currently overloaded"),
(504, "Provider is currently overloaded"),
],
)
@pytest.mark.asyncio
async def test_native_stream_503_retry_exhausted(provider_config):
"""Repeated HTTP 503 exhausts execute_with_retry; emits overloaded-style message."""
async def test_native_stream_5xx_retry_exhausted(provider_config, status_code, substr):
"""Repeated upstream 5xx exhausts execute_with_retry; user message matches mapping."""
GlobalRateLimiter.reset_instance()
try:
@@ -156,7 +168,7 @@ async def test_native_stream_503_retry_exhausted(provider_config):
provider = NativeProvider(provider_config)
req = MockRequest()
unavailable = FakeResponse(status_code=503, text="Service Unavailable")
bad = FakeResponse(status_code=status_code, text="upstream error")
with (
patch.object(
@@ -166,25 +178,25 @@ async def test_native_stream_503_retry_exhausted(provider_config):
provider._client,
"send",
new_callable=AsyncMock,
return_value=unavailable,
return_value=bad,
) as mock_send,
patch("asyncio.sleep", new_callable=AsyncMock),
):
events = [e async for e in provider.stream_response(req)]
assert mock_send.await_count == 4
assert unavailable.is_closed
assert bad.is_closed
assert_canonical_stream_error_envelope(
events,
user_message_substr="Provider is currently overloaded",
user_message_substr=substr,
)
finally:
GlobalRateLimiter.reset_instance()
@pytest.mark.asyncio
async def test_non_429_http_error_not_retried(provider_config):
"""HTTP 500 from upstream is not retried; single send."""
async def test_non_retryable_4xx_http_error_not_retried(provider_config):
"""HTTP 400 from upstream is not retried; single send (passthrough limiter)."""
GlobalRateLimiter.reset_instance()
try:
@@ -203,7 +215,7 @@ async def test_non_429_http_error_not_retried(provider_config):
provider = NativeProvider(provider_config)
req = MockRequest()
err = FakeResponse(status_code=500, text="Internal Server Error")
err = FakeResponse(status_code=400, text="Bad Request")
with (
patch.object(
@@ -221,7 +233,7 @@ async def test_non_429_http_error_not_retried(provider_config):
mock_send.assert_awaited_once()
assert err.is_closed
assert_canonical_stream_error_envelope(
events, user_message_substr="Provider API request failed"
events, user_message_substr="Invalid request sent to provider"
)
finally:
GlobalRateLimiter.reset_instance()
+19 -7
View File
@@ -80,7 +80,7 @@ class TestMapError:
assert result.status_code == 529
def test_internal_server_error_generic(self):
"""InternalServerError without keywords -> APIError(500)."""
"""InternalServerError without keywords maps to APIError preserving 5xx."""
exc = _make_openai_error(
openai.InternalServerError, message="Unknown error", status_code=500
)
@@ -88,17 +88,29 @@ class TestMapError:
assert isinstance(result, APIError)
assert result.status_code == 500
def test_internal_server_error_503_maps_to_api_error_with_status(self):
"""InternalServerError carrying HTTP 503 retains 503 for stable user messaging."""
@pytest.mark.parametrize(
("status_code", "expect_substr"),
[
(500, "provider api request failed"),
(502, "temporarily unavailable"),
(503, "temporarily unavailable"),
(504, "temporarily unavailable"),
(599, "provider api request failed"),
],
)
def test_internal_server_error_preserves_5xx_status_for_messaging(
self, status_code, expect_substr
):
"""InternalServerError carrying HTTP 5xx retains status for stable user messaging."""
exc = _make_openai_error(
openai.InternalServerError,
message="<html>503</html>",
status_code=503,
message=f"upstream {status_code}",
status_code=status_code,
)
result = map_error(exc)
assert isinstance(result, APIError)
assert result.status_code == 503
assert "temporarily unavailable" in result.message.lower()
assert result.status_code == status_code
assert expect_substr in result.message.lower()
def test_generic_api_error(self):
"""openai.APIError -> APIError with original status_code."""
@@ -1,4 +1,4 @@
"""OpenAI-compat transports: HTTP 503 uses the same execute_with_retry path as 429."""
"""OpenAI-compat transports: upstream 5xx uses the same execute_with_retry path as 429."""
from unittest.mock import AsyncMock, MagicMock, patch
@@ -13,16 +13,17 @@ from providers.rate_limit import GlobalRateLimiter
from tests.providers.test_nvidia_nim import MockRequest
def _internal_503() -> openai.InternalServerError:
def _internal_5xx(code: int) -> openai.InternalServerError:
return openai.InternalServerError(
"unavailable",
response=Response(503, request=Request("POST", "http://x")),
response=Response(code, request=Request("POST", "http://x")),
body={},
)
@pytest.mark.parametrize("status_code", [500, 502, 503, 504])
@pytest.mark.asyncio
async def test_nim_stream_retries_on_openai_503_then_streams():
async def test_nim_stream_retries_on_openai_5xx_then_streams(status_code):
GlobalRateLimiter.reset_instance()
try:
config = ProviderConfig(
@@ -57,7 +58,7 @@ async def test_nim_stream_retries_on_openai_503_then_streams():
) as mock_create,
patch("asyncio.sleep", new_callable=AsyncMock),
):
mock_create.side_effect = [_internal_503(), mock_stream()]
mock_create.side_effect = [_internal_5xx(status_code), mock_stream()]
events = [e async for e in provider.stream_response(req)]
assert mock_create.await_count == 2
@@ -66,8 +67,20 @@ async def test_nim_stream_retries_on_openai_503_then_streams():
GlobalRateLimiter.reset_instance()
@pytest.mark.parametrize(
("status_code", "expect_substr"),
[
(500, "provider api request failed"),
(502, "temporarily unavailable"),
(503, "temporarily unavailable"),
(504, "temporarily unavailable"),
],
)
@pytest.mark.asyncio
async def test_nim_stream_openai_503_exhausted_emits_user_message():
async def test_nim_stream_openai_5xx_exhausted_emits_user_message(
status_code,
expect_substr,
):
GlobalRateLimiter.reset_instance()
try:
config = ProviderConfig(
@@ -90,11 +103,11 @@ async def test_nim_stream_openai_503_exhausted_emits_user_message():
) as mock_create,
patch("asyncio.sleep", new_callable=AsyncMock),
):
mock_create.side_effect = _internal_503()
mock_create.side_effect = _internal_5xx(status_code)
events = [e async for e in provider.stream_response(req)]
assert mock_create.await_count == 4
blob = "".join(events)
assert "temporarily unavailable" in blob.lower()
assert expect_substr in blob.lower()
finally:
GlobalRateLimiter.reset_instance()
+48 -15
View File
@@ -278,21 +278,22 @@ class TestProviderRateLimiter:
assert result == "ok"
assert call_count == 2
@pytest.mark.parametrize("status_code", [500, 502, 503, 504])
@pytest.mark.asyncio
async def test_execute_with_retry_succeeds_on_openai_internal_server_error_503(
self,
async def test_execute_with_retry_succeeds_on_openai_internal_server_error_5xx(
self, status_code
):
"""503 as openai.InternalServerError then success."""
"""5xx as openai.InternalServerError then success."""
import openai
from httpx import Request, Response
GlobalRateLimiter.reset_instance()
limiter = GlobalRateLimiter.get_instance(rate_limit=100, rate_window=60)
def make_503():
def make_upstream_error():
return openai.InternalServerError(
"unavailable",
response=Response(503, request=Request("POST", "http://x")),
response=Response(status_code, request=Request("POST", "http://x")),
body={},
)
@@ -302,7 +303,7 @@ class TestProviderRateLimiter:
nonlocal call_count
call_count += 1
if call_count == 1:
raise make_503()
raise make_upstream_error()
return "ok"
result = await limiter.execute_with_retry(
@@ -311,9 +312,10 @@ class TestProviderRateLimiter:
assert result == "ok"
assert call_count == 2
@pytest.mark.parametrize("status_code", [500, 502, 503, 504])
@pytest.mark.asyncio
async def test_execute_with_retry_succeeds_on_httpx_503(self):
"""HTTP 503 as httpx.HTTPStatusError then success."""
async def test_execute_with_retry_succeeds_on_httpx_5xx(self, status_code):
"""HTTP 5xx as httpx.HTTPStatusError then success."""
import httpx
from httpx import Request, Response
@@ -325,9 +327,11 @@ class TestProviderRateLimiter:
nonlocal call_count
call_count += 1
if call_count == 1:
r = Response(503, request=Request("POST", "http://x"), text="busy")
r = Response(
status_code, request=Request("POST", "http://x"), text="error"
)
raise httpx.HTTPStatusError(
"Service Unavailable", request=r.request, response=r
"Server Error", request=r.request, response=r
)
return "ok"
@@ -337,9 +341,10 @@ class TestProviderRateLimiter:
assert result == "ok"
assert call_count == 2
@pytest.mark.parametrize("status_code", [500, 502, 503, 504])
@pytest.mark.asyncio
async def test_execute_with_retry_exhaust_openai_503_raises(self):
"""When all 503 retries exhausted (openai), last InternalServerError is raised."""
async def test_execute_with_retry_exhaust_openai_5xx_raises(self, status_code):
"""When all 5xx retries exhausted (OpenAI SDK), last InternalServerError is raised."""
import openai
from httpx import Request, Response
@@ -348,18 +353,46 @@ class TestProviderRateLimiter:
exc = openai.InternalServerError(
"unavailable",
response=Response(503, request=Request("POST", "http://x")),
response=Response(status_code, request=Request("POST", "http://x")),
body={},
)
async def always_503():
async def always_fail():
raise exc
with pytest.raises(openai.InternalServerError):
await limiter.execute_with_retry(
always_503, max_retries=2, base_delay=0.01, max_delay=0.1, jitter=0
always_fail, max_retries=2, base_delay=0.01, max_delay=0.1, jitter=0
)
@pytest.mark.asyncio
async def test_execute_with_retry_httpx_400_raises_immediately(self):
"""Non-retryable 4xx is not wrapped by execute_with_retry loop."""
import httpx
from httpx import Request, Response
GlobalRateLimiter.reset_instance()
limiter = GlobalRateLimiter.get_instance(rate_limit=100, rate_window=60)
call_count = 0
async def bad_request():
nonlocal call_count
call_count += 1
r = Response(400, request=Request("POST", "http://x"), text="bad request")
raise httpx.HTTPStatusError("Bad Request", request=r.request, response=r)
with pytest.raises(httpx.HTTPStatusError):
await limiter.execute_with_retry(
bad_request,
max_retries=2,
base_delay=0.01,
max_delay=0.1,
jitter=0,
)
assert call_count == 1
@pytest.mark.asyncio
async def test_max_concurrency_zero_raises(self):
"""max_concurrency <= 0 raises ValueError."""