Files
Alishahryar1 d501e5223a Fix live provider smoke defaults
Update live smoke model defaults for NIM, OpenRouter, and Gemini; normalize tool-call indexes; downgrade DeepSeek forced tool_choice; and add coverage for the provider smoke fixes.
2026-05-31 13:02:15 -07:00

788 lines
23 KiB
Python

"""Claude Code CLI characterization helpers for provider smoke matrices."""
from __future__ import annotations
import json
import os
import re
import subprocess
import time
import uuid
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any
from smoke.lib.config import ProviderModel, SmokeConfig, redacted
from smoke.lib.server import RunningServer
REGRESSION_CLASSIFICATIONS = frozenset({"harness_bug", "product_failure"})
_HTTP_REGRESSION_PATTERNS = (
r'POST /v1/messages[^"\n]* HTTP/1\.1" 4(?!01|03|04|08|09)\d\d',
r'POST /v1/messages[^"\n]* HTTP/1\.1" 5\d\d',
)
_UPSTREAM_UNAVAILABLE_MARKERS = (
"upstream_unavailable",
"readtimeout",
"connecterror",
"connection refused",
"timed out",
"rate limit",
"overloaded",
"capacity",
"upstream provider",
"provider api request failed",
"httpstatuserror",
)
_HTTP_429_PATTERNS = (
r'HTTP/1\.[01]" 429\b',
r"\bHTTP/1\.[01] 429\b",
r"\bstatus_code=429\b",
r"\bstatus[=:]\s*429\b",
r"\b429 Too Many Requests\b",
)
_MISSING_ENV_MARKERS = (
"api key",
"not logged in",
"authentication",
"permission denied",
)
_EMPTY_MCP_CONFIG = '{"mcpServers":{}}'
_SUBAGENT_SYSTEM_PROMPT = (
"You are a deterministic smoke-test coordinator. Use Agent when asked to "
"use a subagent."
)
@dataclass(frozen=True, slots=True)
class ClaudeCliRun:
command: tuple[str, ...]
returncode: int | None
stdout: str
stderr: str
duration_s: float
timed_out: bool = False
@property
def combined_output(self) -> str:
return f"{self.stdout}\n{self.stderr}"
@dataclass(frozen=True, slots=True)
class CliMatrixOutcome:
model: str
full_model: str
source: str
feature: str
outcome: str
classification: str
duration_s: float
cli_returncode: int | None
token_evidence: dict[str, Any]
request_count: int
log_path: str
stdout_excerpt: str
stderr_excerpt: str
log_excerpt: str
def run_claude_cli(
*,
claude_bin: str,
server: RunningServer,
config: SmokeConfig,
cwd: Path,
prompt: str,
tools: str | None,
bare: bool = True,
pre_tool_args: tuple[str, ...] = (),
extra_args: tuple[str, ...] = (),
session_id: str | None = None,
resume_session_id: str | None = None,
no_session_persistence: bool = True,
) -> ClaudeCliRun:
"""Run Claude Code CLI against the local smoke proxy."""
cwd.mkdir(parents=True, exist_ok=True)
cmd = list(
_build_claude_cli_command(
claude_bin=claude_bin,
prompt=prompt,
tools=tools,
bare=bare,
pre_tool_args=pre_tool_args,
extra_args=extra_args,
session_id=session_id,
resume_session_id=resume_session_id,
no_session_persistence=no_session_persistence,
)
)
env = os.environ.copy()
env["ANTHROPIC_BASE_URL"] = server.base_url
env["ANTHROPIC_API_URL"] = f"{server.base_url}/v1"
env.pop("ANTHROPIC_API_KEY", None)
if config.settings.anthropic_auth_token:
env["ANTHROPIC_AUTH_TOKEN"] = config.settings.anthropic_auth_token
else:
env.pop("ANTHROPIC_AUTH_TOKEN", None)
env["TERM"] = "dumb"
env["NO_COLOR"] = "1"
env["PYTHONIOENCODING"] = "utf-8"
started = time.monotonic()
try:
result = subprocess.run(
cmd,
cwd=cwd,
env=env,
capture_output=True,
text=True,
timeout=config.timeout_s,
check=False,
)
except subprocess.TimeoutExpired as exc:
return ClaudeCliRun(
command=tuple(cmd),
returncode=None,
stdout=_coerce_timeout_text(exc.stdout),
stderr=_coerce_timeout_text(exc.stderr),
duration_s=time.monotonic() - started,
timed_out=True,
)
return ClaudeCliRun(
command=tuple(cmd),
returncode=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
duration_s=time.monotonic() - started,
)
def _build_claude_cli_command(
*,
claude_bin: str,
prompt: str,
tools: str | None,
bare: bool = True,
pre_tool_args: tuple[str, ...] = (),
extra_args: tuple[str, ...] = (),
session_id: str | None = None,
resume_session_id: str | None = None,
no_session_persistence: bool = True,
) -> tuple[str, ...]:
cmd: list[str] = [claude_bin]
if bare:
cmd.append("--bare")
if resume_session_id:
cmd.extend(["--resume", resume_session_id])
if session_id:
cmd.extend(["--session-id", session_id])
cmd.extend(
[
"--output-format",
"stream-json",
"--include-partial-messages",
"--verbose",
"--permission-mode",
"bypassPermissions",
"--dangerously-skip-permissions",
"--model",
"sonnet",
]
)
if no_session_persistence:
cmd.append("--no-session-persistence")
cmd.extend(pre_tool_args)
if tools is not None:
cmd.extend(["--tools", tools])
if tools:
cmd.extend(["--allowedTools", tools])
cmd.extend(extra_args)
cmd.extend(["-p", prompt])
return tuple(cmd)
def run_cli_feature_probes(
*,
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
model_dir: Path,
marker_prefix: str,
) -> list[CliMatrixOutcome]:
return [
_basic_text(
claude_bin, server, smoke_config, provider_model, model_dir, marker_prefix
),
_thinking(
claude_bin, server, smoke_config, provider_model, model_dir, marker_prefix
),
_tool_use_roundtrip(
claude_bin, server, smoke_config, provider_model, model_dir, marker_prefix
),
_interleaved_thinking_tool(
claude_bin, server, smoke_config, provider_model, model_dir, marker_prefix
),
_subagent_task(
claude_bin, server, smoke_config, provider_model, model_dir, marker_prefix
),
_compact_command(
claude_bin, server, smoke_config, provider_model, model_dir, marker_prefix
),
]
def read_log_offset(log_path: Path) -> int:
"""Return the current text length of a smoke server log."""
if not log_path.is_file():
return 0
return len(log_path.read_text(encoding="utf-8", errors="replace"))
def read_log_delta(log_path: Path, offset: int) -> str:
"""Return smoke server log text written after ``offset``."""
if not log_path.is_file():
return ""
text = log_path.read_text(encoding="utf-8", errors="replace")
return text[offset:]
def token_evidence(
*,
feature: str,
marker: str,
run: ClaudeCliRun,
log_delta: str,
) -> dict[str, Any]:
"""Collect compact evidence for a CLI feature probe."""
combined = f"{run.combined_output}\n{log_delta}"
lower = combined.lower()
return {
"feature": feature,
"marker_present": bool(marker and marker in combined),
"thinking_delta_count": combined.count("thinking_delta"),
"tool_use_count": combined.count('"tool_use"'),
"tool_result_count": combined.count('"tool_result"'),
"agent_catalog_present": _tool_catalog_has(log_delta, "Agent"),
"agent_tool_count": _agent_tool_count(combined),
"agent_result_count": _agent_result_count(combined),
"task_tool_count": combined.count('"name": "Task"')
+ combined.count('"name":"Task"'),
"run_in_background_false": "run_in_background" in combined and "false" in lower,
"compact_boundary": "compact_boundary" in combined,
"compact_metadata": "compact_metadata" in combined,
"http_422": 'HTTP/1.1" 422' in combined,
"http_500": bool(re.search(r'HTTP/1\.1" 5\d\d', combined)),
"timed_out": run.timed_out,
}
def classify_probe(
*,
run: ClaudeCliRun,
log_delta: str,
marker: str,
requires_tool_result: bool = False,
requires_agent: bool = False,
requires_task: bool = False,
requires_compact: bool = False,
) -> tuple[str, str]:
"""Classify a probe without failing compatibility characterization failures."""
combined = f"{run.combined_output}\n{log_delta}"
lower = combined.lower()
if _has_proxy_regression(log_delta):
return "failed", "product_failure"
if run.returncode != 0 and any(
marker_text in lower for marker_text in _MISSING_ENV_MARKERS
):
return "skipped", "missing_env"
if run.timed_out:
return "failed", "probe_timeout"
if requires_agent and not _tool_catalog_has(log_delta, "Agent"):
return "failed", "harness_bug"
marker_ok = not marker or marker in combined
tool_ok = not requires_tool_result or '"tool_result"' in combined
agent_ok = not requires_agent or (
_agent_tool_count(combined) > 0 and _agent_result_count(combined) > 0
)
task_ok = not requires_task or (
('"name": "Task"' in combined or '"name":"Task"' in combined)
and "run_in_background" in combined
and "false" in lower
)
compact_ok = not requires_compact or (
"compact_boundary" in combined
or "compact_metadata" in combined
or "/compact" in combined
or "compact" in lower
)
cli_ok = run.returncode == 0
if cli_ok and marker_ok and tool_ok and agent_ok and task_ok and compact_ok:
return "passed", "passed"
if _has_upstream_unavailable_text(combined):
return "failed", "upstream_unavailable"
if not _has_proxy_request(log_delta):
return "failed", "harness_bug"
return "failed", "model_feature_failure"
def make_outcome(
*,
model: str,
full_model: str,
source: str,
feature: str,
marker: str,
run: ClaudeCliRun,
log_delta: str,
log_path: Path,
requires_tool_result: bool = False,
requires_agent: bool = False,
requires_task: bool = False,
requires_compact: bool = False,
) -> CliMatrixOutcome:
"""Build one report outcome from a CLI run and its server log delta."""
outcome, classification = classify_probe(
run=run,
log_delta=log_delta,
marker=marker,
requires_tool_result=requires_tool_result,
requires_agent=requires_agent,
requires_task=requires_task,
requires_compact=requires_compact,
)
evidence = token_evidence(
feature=feature,
marker=marker,
run=run,
log_delta=log_delta,
)
return CliMatrixOutcome(
model=model,
full_model=full_model,
source=source,
feature=feature,
outcome=outcome,
classification=classification,
duration_s=round(run.duration_s, 3),
cli_returncode=run.returncode,
token_evidence=evidence,
request_count=_request_count(log_delta),
log_path=str(log_path),
stdout_excerpt=_excerpt(run.stdout),
stderr_excerpt=_excerpt(run.stderr),
log_excerpt=_excerpt(log_delta),
)
def write_matrix_report(
config: SmokeConfig,
outcomes: list[CliMatrixOutcome],
*,
target: str,
filename_prefix: str,
) -> Path:
"""Write a Claude CLI compatibility matrix report."""
config.results_dir.mkdir(parents=True, exist_ok=True)
path = (
config.results_dir
/ f"{filename_prefix}-matrix-{config.worker_id}-{int(time.time())}.json"
)
payload = {
"started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"worker_id": config.worker_id,
"target": target,
"models": sorted({outcome.full_model for outcome in outcomes}),
"outcomes": [asdict(outcome) for outcome in outcomes],
}
path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
return path
def regression_failures(outcomes: list[CliMatrixOutcome]) -> list[str]:
"""Return report lines for classifications that should fail pytest."""
return [
f"{outcome.full_model} {outcome.feature}: {outcome.classification}"
for outcome in outcomes
if outcome.classification in REGRESSION_CLASSIFICATIONS
]
def _basic_text(
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
model_dir: Path,
marker_prefix: str,
) -> CliMatrixOutcome:
marker = _marker(marker_prefix, "BASIC")
return _run_probe(
claude_bin=claude_bin,
server=server,
smoke_config=smoke_config,
provider_model=provider_model,
workspace=model_dir / "basic_text",
feature="basic_text",
marker=marker,
prompt=f"Reply with exactly {marker} and no other text.",
tools="",
)
def _thinking(
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
model_dir: Path,
marker_prefix: str,
) -> CliMatrixOutcome:
marker = _marker(marker_prefix, "THINK")
return _run_probe(
claude_bin=claude_bin,
server=server,
smoke_config=smoke_config,
provider_model=provider_model,
workspace=model_dir / "thinking",
feature="thinking",
marker=marker,
prompt=(
"Think privately about the request, then reply with exactly "
f"{marker} and no other text."
),
tools="",
extra_args=("--effort", "high"),
)
def _tool_use_roundtrip(
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
model_dir: Path,
marker_prefix: str,
) -> CliMatrixOutcome:
marker = _marker(marker_prefix, "TOOL")
workspace = model_dir / "tool_use_roundtrip"
(workspace / "smoke-read.txt").parent.mkdir(parents=True, exist_ok=True)
(workspace / "smoke-read.txt").write_text(marker, encoding="utf-8")
return _run_probe(
claude_bin=claude_bin,
server=server,
smoke_config=smoke_config,
provider_model=provider_model,
workspace=workspace,
feature="tool_use_roundtrip",
marker=marker,
prompt=(
"Use the Read tool to read smoke-read.txt. Reply with exactly the "
"secret token from that file and no other text."
),
tools="Read",
requires_tool_result=True,
)
def _interleaved_thinking_tool(
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
model_dir: Path,
marker_prefix: str,
) -> CliMatrixOutcome:
marker = _marker(marker_prefix, "INTERLEAVED")
workspace = model_dir / "interleaved_thinking_tool"
(workspace / "smoke-interleaved.txt").parent.mkdir(parents=True, exist_ok=True)
(workspace / "smoke-interleaved.txt").write_text(marker, encoding="utf-8")
return _run_probe(
claude_bin=claude_bin,
server=server,
smoke_config=smoke_config,
provider_model=provider_model,
workspace=workspace,
feature="interleaved_thinking_tool",
marker=marker,
prompt=(
"Think privately, use Read on smoke-interleaved.txt, then reply with "
"exactly the secret token from that file and no other text."
),
tools="Read",
extra_args=("--effort", "high"),
requires_tool_result=True,
)
def _subagent_task(
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
model_dir: Path,
marker_prefix: str,
) -> CliMatrixOutcome:
marker = _marker(marker_prefix, "TASK")
workspace = model_dir / "subagent_task"
(workspace / "smoke-subagent.txt").parent.mkdir(parents=True, exist_ok=True)
(workspace / "smoke-subagent.txt").write_text(marker, encoding="utf-8")
agents = json.dumps(
{
"smoke_reader": {
"description": "Reads one requested file and returns its token.",
"prompt": (
"Read the requested file with Read and return only the token "
"inside it."
),
"tools": ["Read"],
"permissionMode": "bypassPermissions",
"background": False,
}
}
)
bare, tools, pre_tool_args, extra_args = _subagent_probe_options(agents)
return _run_probe(
claude_bin=claude_bin,
server=server,
smoke_config=smoke_config,
provider_model=provider_model,
workspace=workspace,
feature="subagent_task",
marker=marker,
prompt=(
"Use the smoke_reader subagent to read smoke-subagent.txt. After the "
"first agent result, reply with exactly the token and stop. Do not "
"call any other tools."
),
tools=tools,
bare=bare,
pre_tool_args=pre_tool_args,
extra_args=extra_args,
requires_tool_result=True,
requires_agent=True,
)
def _subagent_probe_options(
agents: str,
) -> tuple[bool, str, tuple[str, ...], tuple[str, ...]]:
return (
False,
"Agent,Read",
(
"--setting-sources",
"local",
"--strict-mcp-config",
"--mcp-config",
_EMPTY_MCP_CONFIG,
"--system-prompt",
_SUBAGENT_SYSTEM_PROMPT,
),
("--agents", agents),
)
def _compact_command(
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
model_dir: Path,
marker_prefix: str,
) -> CliMatrixOutcome:
marker = _marker(marker_prefix, "COMPACT")
workspace = model_dir / "compact_command"
session_id = str(uuid.uuid4())
offset = read_log_offset(server.log_path)
first = run_claude_cli(
claude_bin=claude_bin,
server=server,
config=smoke_config,
cwd=workspace,
prompt=f"Remember this smoke token: {marker}. Reply with exactly {marker}.",
tools="",
session_id=session_id,
no_session_persistence=False,
)
second = run_claude_cli(
claude_bin=claude_bin,
server=server,
config=smoke_config,
cwd=workspace,
prompt=f"/compact preserve {marker}",
tools="",
resume_session_id=session_id,
no_session_persistence=False,
)
log_delta = read_log_delta(server.log_path, offset)
run = ClaudeCliRun(
command=(*first.command, "&&", *second.command),
returncode=second.returncode if first.returncode == 0 else first.returncode,
stdout=f"{first.stdout}\n{second.stdout}",
stderr=f"{first.stderr}\n{second.stderr}",
duration_s=first.duration_s + second.duration_s,
timed_out=first.timed_out or second.timed_out,
)
return make_outcome(
model=provider_model.model_name,
full_model=provider_model.full_model,
source=provider_model.source,
feature="compact_command",
marker="",
run=run,
log_delta=log_delta,
log_path=server.log_path,
requires_compact=True,
)
def _run_probe(
*,
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
workspace: Path,
feature: str,
marker: str,
prompt: str,
tools: str | None,
bare: bool = True,
pre_tool_args: tuple[str, ...] = (),
extra_args: tuple[str, ...] = (),
requires_tool_result: bool = False,
requires_agent: bool = False,
requires_task: bool = False,
) -> CliMatrixOutcome:
offset = read_log_offset(server.log_path)
run = run_claude_cli(
claude_bin=claude_bin,
server=server,
config=smoke_config,
cwd=workspace,
prompt=prompt,
tools=tools,
bare=bare,
pre_tool_args=pre_tool_args,
extra_args=extra_args,
)
log_delta = read_log_delta(server.log_path, offset)
return make_outcome(
model=provider_model.model_name,
full_model=provider_model.full_model,
source=provider_model.source,
feature=feature,
marker=marker,
run=run,
log_delta=log_delta,
log_path=server.log_path,
requires_tool_result=requires_tool_result,
requires_agent=requires_agent,
requires_task=requires_task,
)
def _has_proxy_regression(log_delta: str) -> bool:
if "CREATE_MESSAGE_ERROR" in log_delta:
return True
return any(re.search(pattern, log_delta) for pattern in _HTTP_REGRESSION_PATTERNS)
def _has_proxy_request(log_delta: str) -> bool:
return (
"POST /v1/messages" in log_delta
or "API_REQUEST:" in log_delta
or '"event": "api.request.received"' in log_delta
or (
'"http_method": "POST"' in log_delta
and '"http_path": "/v1/messages"' in log_delta
)
)
def _tool_catalog_has(log_delta: str, tool_name: str) -> bool:
catalog = _first_tool_catalog(log_delta)
return (
f"'name': '{tool_name}'" in catalog
or f'"name": "{tool_name}"' in catalog
or f'"name":"{tool_name}"' in catalog
)
def _first_tool_catalog(log_delta: str) -> str:
for line in log_delta.splitlines():
if "FULL_PAYLOAD" not in line:
continue
single_index = line.find("'tools': [")
double_index = line.find('"tools": [')
if single_index == -1 and double_index == -1:
continue
start = single_index if single_index != -1 else double_index
end_candidates = [
index
for marker in ("'tool_choice'", '"tool_choice"', "'thinking'", '"thinking"')
if (index := line.find(marker, start)) != -1
]
end = min(end_candidates) if end_candidates else len(line)
return line[start:end]
return ""
def _agent_tool_count(text: str) -> int:
return (
text.count('"name": "Agent"')
+ text.count('"name":"Agent"')
+ len(
re.findall(
r"'type': 'tool_use'[^}\n]+?'name': 'Agent'",
text,
flags=re.DOTALL,
)
)
)
def _agent_result_count(text: str) -> int:
return text.count("agentId:") + text.count('"agentId"') + text.count("'agentId'")
def _has_upstream_unavailable_text(text: str) -> bool:
lower = text.lower()
if any(marker_text in lower for marker_text in _UPSTREAM_UNAVAILABLE_MARKERS):
return True
return any(
re.search(pattern, text, flags=re.IGNORECASE) for pattern in _HTTP_429_PATTERNS
)
def _request_count(log_delta: str) -> int:
access_log_count = log_delta.count("POST /v1/messages")
service_log_count = log_delta.count("API_REQUEST:")
structured_log_count = log_delta.count('"event": "api.request.received"')
return max(access_log_count, service_log_count, structured_log_count)
def _marker(scope: str, prefix: str) -> str:
return f"FCC_{scope}_{prefix}_{uuid.uuid4().hex[:8].upper()}"
def _excerpt(value: str, *, max_chars: int = 2400) -> str:
if len(value) <= max_chars:
return redacted(value)
return redacted(value[-max_chars:])
def _coerce_timeout_text(value: str | bytes | None) -> str:
if value is None:
return ""
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace")
return value