Add NVIDIA NIM CLI smoke matrix and tool schema aliasing

This commit is contained in:
Alishahryar1
2026-05-09 14:25:50 -07:00
parent 07b30aae31
commit 07497c7ed8
16 changed files with 1729 additions and 16 deletions
+2
View File
@@ -49,6 +49,8 @@ FCC_SMOKE_MODEL_LLAMACPP=
FCC_SMOKE_MODEL_OLLAMA= FCC_SMOKE_MODEL_OLLAMA=
FCC_SMOKE_MODEL_KIMI= FCC_SMOKE_MODEL_KIMI=
FCC_SMOKE_MODEL_WAFER= FCC_SMOKE_MODEL_WAFER=
FCC_SMOKE_NIM_MODELS=
FCC_SMOKE_NIM_EXTRA_MODELS=
# Thinking output # Thinking output
+10
View File
@@ -12,10 +12,12 @@ from providers.defaults import NVIDIA_NIM_DEFAULT_BASE
from providers.openai_compat import OpenAIChatTransport from providers.openai_compat import OpenAIChatTransport
from .request import ( from .request import (
body_without_nim_tool_argument_aliases,
build_request_body, build_request_body,
clone_body_without_chat_template, clone_body_without_chat_template,
clone_body_without_reasoning_budget, clone_body_without_reasoning_budget,
clone_body_without_reasoning_content, clone_body_without_reasoning_content,
nim_tool_argument_aliases_from_body,
) )
@@ -41,6 +43,14 @@ class NvidiaNimProvider(OpenAIChatTransport):
thinking_enabled=self._is_thinking_enabled(request, thinking_enabled), thinking_enabled=self._is_thinking_enabled(request, thinking_enabled),
) )
def _prepare_create_body(self, body: dict[str, Any]) -> dict[str, Any]:
"""Strip private request metadata before calling NVIDIA NIM."""
return body_without_nim_tool_argument_aliases(body)
def _tool_argument_aliases(self, body: dict[str, Any]) -> dict[str, dict[str, str]]:
"""Return NIM tool argument aliases captured while building this request."""
return nim_tool_argument_aliases_from_body(body)
def _get_retry_request_body(self, error: Exception, body: dict) -> dict | None: def _get_retry_request_body(self, error: Exception, body: dict) -> dict | None:
"""Retry once with a downgraded body when NIM rejects a known field.""" """Retry once with a downgraded body when NIM rejects a known field."""
status_code = getattr(error, "status_code", None) status_code = getattr(error, "status_code", None)
+167
View File
@@ -34,6 +34,9 @@ _SCHEMA_LIST_KEYS = frozenset({"allOf", "anyOf", "oneOf", "prefixItems"})
_SCHEMA_MAP_KEYS = frozenset( _SCHEMA_MAP_KEYS = frozenset(
{"properties", "patternProperties", "$defs", "definitions", "dependentSchemas"} {"properties", "patternProperties", "$defs", "definitions", "dependentSchemas"}
) )
NIM_TOOL_ARGUMENT_ALIASES_KEY = "_fcc_nim_tool_argument_aliases"
_NIM_TOOL_PARAMETER_ALIAS_PREFIX = "_fcc_arg_"
_NIM_UNSAFE_TOOL_PARAMETER_NAMES = frozenset({"type"})
def _clone_strip_extra_body( def _clone_strip_extra_body(
@@ -123,12 +126,135 @@ def _sanitize_nim_schema_node(value: Any) -> tuple[bool, Any]:
return True, value return True, value
def _needs_nim_tool_parameter_alias(name: str) -> bool:
return name in _NIM_UNSAFE_TOOL_PARAMETER_NAMES
def _make_nim_tool_parameter_alias(name: str, reserved: set[str]) -> str:
safe_tail = "".join(
character if character.isalnum() or character == "_" else "_"
for character in name
).strip("_")
if not safe_tail:
safe_tail = "arg"
candidate = f"{_NIM_TOOL_PARAMETER_ALIAS_PREFIX}{safe_tail}"
alias = candidate
suffix = 2
while alias in reserved:
alias = f"{candidate}_{suffix}"
suffix += 1
reserved.add(alias)
return alias
def _collect_nim_tool_property_names(value: Any) -> set[str]:
names: set[str] = set()
if isinstance(value, dict):
properties = value.get("properties")
if isinstance(properties, dict):
for property_name, property_schema in properties.items():
if isinstance(property_name, str):
names.add(property_name)
names.update(_collect_nim_tool_property_names(property_schema))
for key, item in value.items():
if key != "properties":
names.update(_collect_nim_tool_property_names(item))
elif isinstance(value, list):
for item in value:
names.update(_collect_nim_tool_property_names(item))
return names
def _alias_nim_schema_property_names(
value: Any,
*,
reserved: set[str],
alias_to_original: dict[str, str],
original_to_alias: dict[str, str],
) -> Any:
if isinstance(value, list):
return [
_alias_nim_schema_property_names(
item,
reserved=reserved,
alias_to_original=alias_to_original,
original_to_alias=original_to_alias,
)
for item in value
]
if not isinstance(value, dict):
return value
local_aliases: dict[str, str] = {}
aliased_value: dict[str, Any] = {}
properties = value.get("properties")
if isinstance(properties, dict):
aliased_properties: dict[str, Any] = {}
for property_name, property_schema in properties.items():
aliased_schema = _alias_nim_schema_property_names(
property_schema,
reserved=reserved,
alias_to_original=alias_to_original,
original_to_alias=original_to_alias,
)
if isinstance(property_name, str) and _needs_nim_tool_parameter_alias(
property_name
):
alias = original_to_alias.get(property_name)
if alias is None:
alias = _make_nim_tool_parameter_alias(property_name, reserved)
alias_to_original[alias] = property_name
original_to_alias[property_name] = alias
local_aliases[property_name] = alias
aliased_properties[alias] = aliased_schema
else:
aliased_properties[property_name] = aliased_schema
aliased_value["properties"] = aliased_properties
for key, item in value.items():
if key == "properties":
continue
if key == "required" and isinstance(item, list):
aliased_value[key] = [
local_aliases.get(required_item, required_item)
if isinstance(required_item, str)
else required_item
for required_item in item
]
continue
aliased_value[key] = _alias_nim_schema_property_names(
item,
reserved=reserved,
alias_to_original=alias_to_original,
original_to_alias=original_to_alias,
)
return aliased_value
def _alias_nim_tool_parameters(
parameters: dict[str, Any],
) -> tuple[dict[str, Any], dict[str, str]]:
alias_to_original: dict[str, str] = {}
original_to_alias: dict[str, str] = {}
reserved = _collect_nim_tool_property_names(parameters)
aliased_parameters = _alias_nim_schema_property_names(
parameters,
reserved=reserved,
alias_to_original=alias_to_original,
original_to_alias=original_to_alias,
)
if not alias_to_original:
return parameters, {}
return aliased_parameters, alias_to_original
def _sanitize_nim_tool_schemas(body: dict[str, Any]) -> None: def _sanitize_nim_tool_schemas(body: dict[str, Any]) -> None:
"""Sanitize only tool parameter schemas, preserving tool calls/history.""" """Sanitize only tool parameter schemas, preserving tool calls/history."""
tools = body.get("tools") tools = body.get("tools")
if not isinstance(tools, list): if not isinstance(tools, list):
return return
tool_argument_aliases: dict[str, dict[str, str]] = {}
sanitized_tools: list[Any] = [] sanitized_tools: list[Any] = []
for tool in tools: for tool in tools:
if not isinstance(tool, dict): if not isinstance(tool, dict):
@@ -141,11 +267,52 @@ def _sanitize_nim_tool_schemas(body: dict[str, Any]) -> None:
parameters = function.get("parameters") parameters = function.get("parameters")
if isinstance(parameters, dict): if isinstance(parameters, dict):
_, sanitized_parameters = _sanitize_nim_schema_node(parameters) _, sanitized_parameters = _sanitize_nim_schema_node(parameters)
sanitized_parameters, argument_aliases = _alias_nim_tool_parameters(
sanitized_parameters
)
sanitized_function["parameters"] = sanitized_parameters sanitized_function["parameters"] = sanitized_parameters
tool_name = function.get("name")
if argument_aliases and isinstance(tool_name, str) and tool_name:
tool_argument_aliases[tool_name] = argument_aliases
sanitized_tool["function"] = sanitized_function sanitized_tool["function"] = sanitized_function
sanitized_tools.append(sanitized_tool) sanitized_tools.append(sanitized_tool)
body["tools"] = sanitized_tools body["tools"] = sanitized_tools
if tool_argument_aliases:
body[NIM_TOOL_ARGUMENT_ALIASES_KEY] = tool_argument_aliases
else:
body.pop(NIM_TOOL_ARGUMENT_ALIASES_KEY, None)
def nim_tool_argument_aliases_from_body(
body: dict[str, Any],
) -> dict[str, dict[str, str]]:
"""Return validated private NIM tool argument aliases from a built body."""
raw_aliases = body.get(NIM_TOOL_ARGUMENT_ALIASES_KEY)
if not isinstance(raw_aliases, dict):
return {}
aliases: dict[str, dict[str, str]] = {}
for tool_name, tool_aliases in raw_aliases.items():
if not isinstance(tool_name, str) or not isinstance(tool_aliases, dict):
continue
sanitized_aliases = {
alias: original
for alias, original in tool_aliases.items()
if isinstance(alias, str) and isinstance(original, str)
}
if sanitized_aliases:
aliases[tool_name] = sanitized_aliases
return aliases
def body_without_nim_tool_argument_aliases(body: dict[str, Any]) -> dict[str, Any]:
"""Return a request body with private alias metadata stripped before upstream I/O."""
if NIM_TOOL_ARGUMENT_ALIASES_KEY not in body:
return body
upstream_body = dict(body)
upstream_body.pop(NIM_TOOL_ARGUMENT_ALIASES_KEY, None)
return upstream_body
def _set_extra( def _set_extra(
+126 -7
View File
@@ -128,11 +128,20 @@ class OpenAIChatTransport(BaseProvider):
"""Return a modified request body for one retry, or None.""" """Return a modified request body for one retry, or None."""
return None return None
def _prepare_create_body(self, body: dict[str, Any]) -> dict[str, Any]:
"""Return the body passed to the upstream OpenAI-compatible client."""
return body
def _tool_argument_aliases(self, body: dict[str, Any]) -> dict[str, dict[str, str]]:
"""Return provider-specific per-tool argument aliases for this request."""
return {}
async def _create_stream(self, body: dict) -> tuple[Any, dict]: async def _create_stream(self, body: dict) -> tuple[Any, dict]:
"""Create a streaming chat completion, optionally retrying once.""" """Create a streaming chat completion, optionally retrying once."""
try: try:
create_body = self._prepare_create_body(body)
stream = await self._global_rate_limiter.execute_with_retry( stream = await self._global_rate_limiter.execute_with_retry(
self._client.chat.completions.create, **body, stream=True self._client.chat.completions.create, **create_body, stream=True
) )
return stream, body return stream, body
except Exception as error: except Exception as error:
@@ -140,13 +149,49 @@ class OpenAIChatTransport(BaseProvider):
if retry_body is None: if retry_body is None:
raise raise
create_retry_body = self._prepare_create_body(retry_body)
stream = await self._global_rate_limiter.execute_with_retry( stream = await self._global_rate_limiter.execute_with_retry(
self._client.chat.completions.create, **retry_body, stream=True self._client.chat.completions.create, **create_retry_body, stream=True
) )
return stream, retry_body return stream, retry_body
def _restore_aliased_tool_arguments(
self, argument_json: str, aliases: dict[str, str]
) -> str | None:
try:
parsed = json.loads(argument_json)
except json.JSONDecodeError:
return None
if not isinstance(parsed, dict):
return argument_json
restored = self._restore_aliased_tool_argument_value(parsed, aliases)
return json.dumps(restored)
def _restore_aliased_tool_argument_value(
self, value: Any, aliases: dict[str, str]
) -> Any:
if isinstance(value, dict):
return {
aliases.get(key, key): self._restore_aliased_tool_argument_value(
item, aliases
)
for key, item in value.items()
}
if isinstance(value, list):
return [
self._restore_aliased_tool_argument_value(item, aliases)
for item in value
]
return value
def _emit_tool_arg_delta( def _emit_tool_arg_delta(
self, sse: SSEBuilder, tc_index: int, args: str self,
sse: SSEBuilder,
tc_index: int,
args: str,
*,
tool_argument_aliases: dict[str, dict[str, str]] | None = None,
tool_argument_alias_buffers: dict[int, str] | None = None,
) -> Iterator[str]: ) -> Iterator[str]:
"""Emit one argument fragment for a started tool block (Task buffer or raw JSON).""" """Emit one argument fragment for a started tool block (Task buffer or raw JSON)."""
if not args: if not args:
@@ -159,9 +204,34 @@ class OpenAIChatTransport(BaseProvider):
if parsed is not None: if parsed is not None:
yield sse.emit_tool_delta(tc_index, json.dumps(parsed)) yield sse.emit_tool_delta(tc_index, json.dumps(parsed))
return return
aliases = (
tool_argument_aliases.get(state.name, {}) if tool_argument_aliases else {}
)
if aliases:
if tool_argument_alias_buffers is None:
restored = self._restore_aliased_tool_arguments(args, aliases)
if restored is not None:
yield sse.emit_tool_delta(tc_index, restored)
return
buffered_args = tool_argument_alias_buffers.get(tc_index, "") + args
restored = self._restore_aliased_tool_arguments(buffered_args, aliases)
if restored is None:
tool_argument_alias_buffers[tc_index] = buffered_args
return
tool_argument_alias_buffers.pop(tc_index, None)
yield sse.emit_tool_delta(tc_index, restored)
return
yield sse.emit_tool_delta(tc_index, args) yield sse.emit_tool_delta(tc_index, args)
def _process_tool_call(self, tc: dict, sse: SSEBuilder) -> Iterator[str]: def _process_tool_call(
self,
tc: dict,
sse: SSEBuilder,
*,
tool_argument_aliases: dict[str, dict[str, str]] | None = None,
tool_argument_alias_buffers: dict[int, str] | None = None,
) -> Iterator[str]:
"""Process a single tool call delta and yield SSE events.""" """Process a single tool call delta and yield SSE events."""
tc_index = tc.get("index", 0) tc_index = tc.get("index", 0)
if tc_index < 0: if tc_index < 0:
@@ -193,7 +263,13 @@ class OpenAIChatTransport(BaseProvider):
if state.pre_start_args: if state.pre_start_args:
pre = state.pre_start_args pre = state.pre_start_args
state.pre_start_args = "" state.pre_start_args = ""
yield from self._emit_tool_arg_delta(sse, tc_index, pre) yield from self._emit_tool_arg_delta(
sse,
tc_index,
pre,
tool_argument_aliases=tool_argument_aliases,
tool_argument_alias_buffers=tool_argument_alias_buffers,
)
state = sse.blocks.tool_states.get(tc_index) state = sse.blocks.tool_states.get(tc_index)
if not arguments: if not arguments:
@@ -204,13 +280,43 @@ class OpenAIChatTransport(BaseProvider):
state.pre_start_args += arguments state.pre_start_args += arguments
return return
yield from self._emit_tool_arg_delta(sse, tc_index, arguments) yield from self._emit_tool_arg_delta(
sse,
tc_index,
arguments,
tool_argument_aliases=tool_argument_aliases,
tool_argument_alias_buffers=tool_argument_alias_buffers,
)
def _flush_task_arg_buffers(self, sse: SSEBuilder) -> Iterator[str]: def _flush_task_arg_buffers(self, sse: SSEBuilder) -> Iterator[str]:
"""Emit buffered Task args as a single JSON delta (best-effort).""" """Emit buffered Task args as a single JSON delta (best-effort)."""
for tool_index, out in sse.blocks.flush_task_arg_buffers(): for tool_index, out in sse.blocks.flush_task_arg_buffers():
yield sse.emit_tool_delta(tool_index, out) yield sse.emit_tool_delta(tool_index, out)
def _flush_tool_argument_alias_buffers(
self,
sse: SSEBuilder,
tool_argument_aliases: dict[str, dict[str, str]],
tool_argument_alias_buffers: dict[int, str],
) -> Iterator[str]:
"""Emit remaining aliased tool args without losing data on malformed JSON."""
for tool_index, buffered_args in list(tool_argument_alias_buffers.items()):
if not buffered_args:
tool_argument_alias_buffers.pop(tool_index, None)
continue
state = sse.blocks.tool_states.get(tool_index)
if state is None or state.name == "Task":
continue
aliases = tool_argument_aliases.get(state.name, {})
if not aliases:
continue
restored = self._restore_aliased_tool_arguments(buffered_args, aliases)
yield sse.emit_tool_delta(
tool_index,
restored if restored is not None else buffered_args,
)
tool_argument_alias_buffers.pop(tool_index, None)
async def stream_response( async def stream_response(
self, self,
request: Any, request: Any,
@@ -262,10 +368,13 @@ class OpenAIChatTransport(BaseProvider):
heuristic_parser = HeuristicToolParser() heuristic_parser = HeuristicToolParser()
finish_reason = None finish_reason = None
usage_info = None usage_info = None
tool_argument_aliases: dict[str, dict[str, str]] = {}
tool_argument_alias_buffers: dict[int, str] = {}
async with self._global_rate_limiter.concurrency_slot(): async with self._global_rate_limiter.concurrency_slot():
try: try:
stream, body = await self._create_stream(body) stream, body = await self._create_stream(body)
tool_argument_aliases = self._tool_argument_aliases(body)
async for chunk in stream: async for chunk in stream:
if getattr(chunk, "usage", None): if getattr(chunk, "usage", None):
usage_info = chunk.usage usage_info = chunk.usage
@@ -335,7 +444,12 @@ class OpenAIChatTransport(BaseProvider):
"arguments": tc.function.arguments, "arguments": tc.function.arguments,
}, },
} }
for event in self._process_tool_call(tc_info, sse): for event in self._process_tool_call(
tc_info,
sse,
tool_argument_aliases=tool_argument_aliases,
tool_argument_alias_buffers=tool_argument_alias_buffers,
):
yield event yield event
except asyncio.CancelledError, GeneratorExit: except asyncio.CancelledError, GeneratorExit:
@@ -409,6 +523,11 @@ class OpenAIChatTransport(BaseProvider):
yield event yield event
yield sse.emit_text_delta(" ") yield sse.emit_text_delta(" ")
for event in self._flush_tool_argument_alias_buffers(
sse, tool_argument_aliases, tool_argument_alias_buffers
):
yield event
for event in self._flush_task_arg_buffers(sse): for event in self._flush_task_arg_buffers(sse):
yield event yield event
+21 -4
View File
@@ -58,10 +58,11 @@ Default targets do not send real bot messages or load voice backends:
| `llamacpp` | local `/models` plus native `/messages` through proxy | running llama-server | | `llamacpp` | local `/models` plus native `/messages` through proxy | running llama-server |
| `ollama` | local `/api/tags` plus native Anthropic messages through proxy | running Ollama server | | `ollama` | local `/api/tags` plus native Anthropic messages through proxy | running Ollama server |
Side-effectful targets are opt-in: Heavy/side-effectful targets are opt-in:
| Target | Product scenarios | Required environment | | Target | Product scenarios | Required environment |
| --- | --- | --- | | --- | --- | --- |
| `nvidia_nim_cli` | Claude Code CLI feature matrix across NIM models | `NVIDIA_NIM_API_KEY`, Claude CLI |
| `telegram` | getMe, send, edit, delete, optional manual inbound | token and chat/user ID | | `telegram` | getMe, send, edit, delete, optional manual inbound | token and chat/user ID |
| `discord` | channel access, send, edit, delete, optional manual inbound | token and channel ID | | `discord` | channel access, send, edit, delete, optional manual inbound | token and channel ID |
| `voice` | generated WAV through local Whisper or NVIDIA NIM transcription | `VOICE_NOTE_ENABLED=true`, `FCC_SMOKE_RUN_VOICE=1` | | `voice` | generated WAV through local Whisper or NVIDIA NIM transcription | `VOICE_NOTE_ENABLED=true`, `FCC_SMOKE_RUN_VOICE=1` |
@@ -88,6 +89,13 @@ $env:FCC_SMOKE_RUN_VOICE = "1"
uv run pytest smoke/product -n 0 -s --tb=short uv run pytest smoke/product -n 0 -s --tb=short
``` ```
```powershell
$env:FCC_LIVE_SMOKE = "1"
$env:FCC_SMOKE_TARGETS = "nvidia_nim_cli"
$env:FCC_SMOKE_NIM_MODELS = "z-ai/glm-5.1,moonshotai/kimi-k2.6,minimaxai/minimax-m2.7,nvidia/nemotron-3-super-120b-a12b,deepseek-ai/deepseek-v4-pro,deepseek-ai/deepseek-v4-flash"
uv run pytest smoke/product -n 0 -s --tb=short
```
```powershell ```powershell
$env:FCC_LIVE_SMOKE = "1" $env:FCC_LIVE_SMOKE = "1"
$env:FCC_SMOKE_TARGETS = "messaging,config,extensibility" $env:FCC_SMOKE_TARGETS = "messaging,config,extensibility"
@@ -106,6 +114,10 @@ uv run pytest smoke/product -n 0 -s --tb=short
`FCC_SMOKE_MODEL_LLAMACPP`, `FCC_SMOKE_MODEL_OLLAMA`: optional per-provider `FCC_SMOKE_MODEL_LLAMACPP`, `FCC_SMOKE_MODEL_OLLAMA`: optional per-provider
smoke model overrides. Values may include the provider prefix or just the model smoke model overrides. Values may include the provider prefix or just the model
name for that provider. name for that provider.
- `FCC_SMOKE_NIM_MODELS`: optional comma-separated NVIDIA NIM CLI matrix models
that replace the default characterization set.
- `FCC_SMOKE_NIM_EXTRA_MODELS`: optional comma-separated NVIDIA NIM CLI matrix
models appended to the default or replacement set.
- `FCC_SMOKE_TIMEOUT_S`: per-request/subprocess timeout, default `45`. - `FCC_SMOKE_TIMEOUT_S`: per-request/subprocess timeout, default `45`.
- `FCC_SMOKE_CLAUDE_BIN`: Claude CLI executable name, default `claude`. - `FCC_SMOKE_CLAUDE_BIN`: Claude CLI executable name, default `claude`.
- `FCC_SMOKE_TELEGRAM_CHAT_ID`: Telegram chat/user ID for send/edit/delete. - `FCC_SMOKE_TELEGRAM_CHAT_ID`: Telegram chat/user ID for send/edit/delete.
@@ -129,10 +141,15 @@ names contain `KEY`, `TOKEN`, `SECRET`, `WEBHOOK`, or `AUTH`.
opt-in flag is absent. opt-in flag is absent.
- `upstream_unavailable`: a real provider, bot API, or local model server is not - `upstream_unavailable`: a real provider, bot API, or local model server is not
reachable. reachable.
- `probe_timeout`: the smoke driver reached the target, but the CLI/probe did
not complete within the smoke timeout.
- `product_failure`: the app accepted the scenario but returned the wrong shape, - `product_failure`: the app accepted the scenario but returned the wrong shape,
crashed, leaked state, or violated the product contract. crashed, leaked state, or violated the product contract.
- `harness_bug`: the smoke test or driver made an invalid assumption. - `harness_bug`: the smoke test or driver made an invalid assumption.
- `target_disabled`: skipped because `FCC_SMOKE_TARGETS` intentionally selected
a different target.
`product_failure` and `harness_bug` are failures. `missing_env` and `product_failure` and `harness_bug` are failures. `missing_env`,
`upstream_unavailable` are skips except when the user explicitly selected a `upstream_unavailable`, and `probe_timeout` are skips except when the user
provider in `FCC_SMOKE_PROVIDER_MATRIX`; selected-but-missing providers fail. explicitly selected a provider in `FCC_SMOKE_PROVIDER_MATRIX`;
selected-but-missing providers fail.
+1 -1
View File
@@ -411,7 +411,7 @@ CAPABILITY_CONTRACTS: tuple[CapabilityContract, ...] = (
"stream-json events and session id mapping", "stream-json events and session id mapping",
"stderr/error event and process cleanup", "stderr/error event and process cleanup",
("tests/cli/test_cli.py",), ("tests/cli/test_cli.py",),
("test_claude_cli_prompt_when_available",), ("test_claude_cli_prompt_when_available", "test_nvidia_nim_cli_matrix_e2e"),
), ),
CapabilityContract( CapabilityContract(
"extensibility", "extensibility",
+5 -3
View File
@@ -72,10 +72,11 @@ FEATURE_INVENTORY: tuple[FeatureCoverage, ...] = (
( (
"test_api_basic_conversation_e2e", "test_api_basic_conversation_e2e",
"test_claude_cli_adaptive_thinking_e2e", "test_claude_cli_adaptive_thinking_e2e",
"test_nvidia_nim_cli_matrix_e2e",
"test_vscode_protocol_e2e", "test_vscode_protocol_e2e",
"test_jetbrains_protocol_e2e", "test_jetbrains_protocol_e2e",
), ),
("api", "cli", "clients"), ("api", "cli", "clients", "nvidia_nim_cli"),
("configured provider", "FCC_SMOKE_CLAUDE_BIN for real Claude CLI"), ("configured provider", "FCC_SMOKE_CLAUDE_BIN for real Claude CLI"),
"skip real CLI when binary is absent; configured providers must pass", "skip real CLI when binary is absent; configured providers must pass",
), ),
@@ -384,9 +385,10 @@ FEATURE_INVENTORY: tuple[FeatureCoverage, ...] = (
( (
"test_claude_cli_adaptive_thinking_e2e", "test_claude_cli_adaptive_thinking_e2e",
"test_claude_cli_multiturn_tool_protocol_e2e", "test_claude_cli_multiturn_tool_protocol_e2e",
"test_nvidia_nim_cli_matrix_e2e",
), ),
("cli",), ("cli", "nvidia_nim_cli"),
("FCC_SMOKE_CLAUDE_BIN", "configured provider"), ("FCC_SMOKE_CLAUDE_BIN", "configured provider", "NVIDIA_NIM_API_KEY"),
"skip only when Claude CLI binary is absent", "skip only when Claude CLI binary is absent",
), ),
FeatureCoverage( FeatureCoverage(
+59 -1
View File
@@ -28,9 +28,11 @@ DEFAULT_TARGETS = frozenset(
} }
) )
SIDE_EFFECT_TARGETS = frozenset({"discord", "telegram", "voice"}) SIDE_EFFECT_TARGETS = frozenset({"discord", "telegram", "voice"})
ALL_TARGETS = DEFAULT_TARGETS | SIDE_EFFECT_TARGETS OPT_IN_TARGETS = frozenset({"nvidia_nim_cli"})
ALL_TARGETS = DEFAULT_TARGETS | SIDE_EFFECT_TARGETS | OPT_IN_TARGETS
TARGET_ALIASES = { TARGET_ALIASES = {
"contract": "api", "contract": "api",
"nim_cli": "nvidia_nim_cli",
"optimizations": "api", "optimizations": "api",
"thinking": "providers", "thinking": "providers",
"vscode": "clients", "vscode": "clients",
@@ -47,6 +49,15 @@ PROVIDER_SMOKE_DEFAULT_MODELS: dict[str, str] = {
"wafer": "wafer/DeepSeek-V4-Pro", "wafer": "wafer/DeepSeek-V4-Pro",
} }
NVIDIA_NIM_CLI_DEFAULT_MODELS: tuple[str, ...] = (
"z-ai/glm-5.1",
"moonshotai/kimi-k2.6",
"minimaxai/minimax-m2.7",
"nvidia/nemotron-3-super-120b-a12b",
"deepseek-ai/deepseek-v4-pro",
"deepseek-ai/deepseek-v4-flash",
)
TARGET_REQUIRED_ENV: dict[str, tuple[str, ...]] = { TARGET_REQUIRED_ENV: dict[str, tuple[str, ...]] = {
"api": (), "api": (),
@@ -62,6 +73,10 @@ TARGET_REQUIRED_ENV: dict[str, tuple[str, ...]] = {
"lmstudio": ("LM_STUDIO_BASE_URL with a running LM Studio server",), "lmstudio": ("LM_STUDIO_BASE_URL with a running LM Studio server",),
"llamacpp": ("LLAMACPP_BASE_URL with a running llama-server",), "llamacpp": ("LLAMACPP_BASE_URL with a running llama-server",),
"ollama": ("OLLAMA_BASE_URL with a running Ollama server",), "ollama": ("OLLAMA_BASE_URL with a running Ollama server",),
"nvidia_nim_cli": (
"NVIDIA_NIM_API_KEY",
"FCC_SMOKE_CLAUDE_BIN or claude on PATH",
),
"telegram": ( "telegram": (
"TELEGRAM_BOT_TOKEN", "TELEGRAM_BOT_TOKEN",
"ALLOWED_TELEGRAM_USER_ID or FCC_SMOKE_TELEGRAM_CHAT_ID", "ALLOWED_TELEGRAM_USER_ID or FCC_SMOKE_TELEGRAM_CHAT_ID",
@@ -161,6 +176,13 @@ class SmokeConfig:
) )
return models return models
def nvidia_nim_cli_models(self) -> list[ProviderModel]:
"""Return the NVIDIA NIM models for Claude Code CLI characterization."""
return [
ProviderModel(provider="nvidia_nim", full_model=full_model, source=source)
for full_model, source in nvidia_nim_cli_model_refs().items()
]
def _include_provider_in_smoke( def _include_provider_in_smoke(
self, provider: str, mapped_providers: set[str] self, provider: str, mapped_providers: set[str]
) -> bool: ) -> bool:
@@ -197,6 +219,12 @@ def _parse_csv(raw: str | None) -> frozenset[str]:
return frozenset(part.strip() for part in raw.split(",") if part.strip()) return frozenset(part.strip() for part in raw.split(",") if part.strip())
def _parse_csv_ordered(raw: str | None) -> tuple[str, ...]:
if not raw:
return ()
return tuple(part.strip() for part in raw.split(",") if part.strip())
def _parse_targets(raw: str | None) -> frozenset[str]: def _parse_targets(raw: str | None) -> frozenset[str]:
if not raw: if not raw:
return DEFAULT_TARGETS return DEFAULT_TARGETS
@@ -237,6 +265,36 @@ def _normalize_provider_model(provider: str, raw_model: str) -> str:
return f"{provider}/{model}" return f"{provider}/{model}"
def nvidia_nim_cli_model_refs(
env: Mapping[str, str] | None = None,
) -> dict[str, str]:
"""Return normalized NIM CLI matrix model refs in deterministic order.
Values are returned as ``full_model -> source`` so callers can preserve both
de-duplicated order and provenance in reports.
"""
source = env if env is not None else os.environ
explicit_models = _parse_csv_ordered(source.get("FCC_SMOKE_NIM_MODELS"))
extra_models = _parse_csv_ordered(source.get("FCC_SMOKE_NIM_EXTRA_MODELS"))
if "FCC_SMOKE_NIM_MODELS" in source and not explicit_models:
raise ValueError("FCC_SMOKE_NIM_MODELS must list at least one model")
models: list[tuple[str, str]] = []
base_models = explicit_models or NVIDIA_NIM_CLI_DEFAULT_MODELS
base_source = (
"FCC_SMOKE_NIM_MODELS" if explicit_models else "nvidia_nim_cli_default"
)
models.extend((model, base_source) for model in base_models)
models.extend((model, "FCC_SMOKE_NIM_EXTRA_MODELS") for model in extra_models)
normalized: dict[str, str] = {}
for raw_model, model_source in models:
full_model = _normalize_provider_model("nvidia_nim", raw_model)
normalized.setdefault(full_model, model_source)
return normalized
def auth_headers(token: str | None = None) -> dict[str, str]: def auth_headers(token: str | None = None) -> dict[str, str]:
settings = get_settings() settings = get_settings()
resolved = token if token is not None else settings.anthropic_auth_token resolved = token if token is not None else settings.anthropic_auth_token
+350
View File
@@ -0,0 +1,350 @@
"""Claude Code CLI characterization helpers for NVIDIA NIM smoke tests."""
from __future__ import annotations
import json
import os
import re
import subprocess
import time
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any
from smoke.lib.config import SmokeConfig, redacted
from smoke.lib.server import RunningServer
REGRESSION_CLASSIFICATIONS = frozenset({"harness_bug", "product_failure"})
_HTTP_REGRESSION_PATTERNS = (
r'POST /v1/messages[^"\n]* HTTP/1\.1" 4(?!01|03|04|08|09)\d\d',
r'POST /v1/messages[^"\n]* HTTP/1\.1" 5\d\d',
)
_UPSTREAM_UNAVAILABLE_MARKERS = (
"upstream_unavailable",
"readtimeout",
"connecterror",
"connection refused",
"timed out",
"rate limit",
"429",
"overloaded",
"capacity",
"upstream provider",
)
_MISSING_ENV_MARKERS = (
"api key",
"not logged in",
"authentication",
"permission denied",
)
@dataclass(frozen=True, slots=True)
class ClaudeCliRun:
command: tuple[str, ...]
returncode: int | None
stdout: str
stderr: str
duration_s: float
timed_out: bool = False
@property
def combined_output(self) -> str:
return f"{self.stdout}\n{self.stderr}"
@dataclass(frozen=True, slots=True)
class NimCliMatrixOutcome:
model: str
full_model: str
source: str
feature: str
outcome: str
classification: str
duration_s: float
cli_returncode: int | None
token_evidence: dict[str, Any]
request_count: int
log_path: str
stdout_excerpt: str
stderr_excerpt: str
log_excerpt: str
def run_claude_cli(
*,
claude_bin: str,
server: RunningServer,
config: SmokeConfig,
cwd: Path,
prompt: str,
tools: str | None,
extra_args: tuple[str, ...] = (),
session_id: str | None = None,
resume_session_id: str | None = None,
no_session_persistence: bool = True,
) -> ClaudeCliRun:
"""Run Claude Code CLI against the local smoke proxy."""
cwd.mkdir(parents=True, exist_ok=True)
cmd: list[str] = [claude_bin, "--bare"]
if resume_session_id:
cmd.extend(["--resume", resume_session_id])
if session_id:
cmd.extend(["--session-id", session_id])
cmd.extend(
[
"--output-format",
"stream-json",
"--include-partial-messages",
"--verbose",
"--permission-mode",
"bypassPermissions",
"--dangerously-skip-permissions",
"--model",
"sonnet",
]
)
if no_session_persistence:
cmd.append("--no-session-persistence")
if tools is not None:
cmd.extend(["--tools", tools])
if tools:
cmd.extend(["--allowedTools", tools])
cmd.extend(extra_args)
cmd.extend(["-p", prompt])
env = os.environ.copy()
env["ANTHROPIC_BASE_URL"] = server.base_url
env["ANTHROPIC_API_URL"] = f"{server.base_url}/v1"
env.setdefault("ANTHROPIC_API_KEY", "sk-smoke-proxy")
if config.settings.anthropic_auth_token:
env["ANTHROPIC_AUTH_TOKEN"] = config.settings.anthropic_auth_token
env["TERM"] = "dumb"
env["NO_COLOR"] = "1"
env["PYTHONIOENCODING"] = "utf-8"
started = time.monotonic()
try:
result = subprocess.run(
cmd,
cwd=cwd,
env=env,
capture_output=True,
text=True,
timeout=config.timeout_s,
check=False,
)
except subprocess.TimeoutExpired as exc:
return ClaudeCliRun(
command=tuple(cmd),
returncode=None,
stdout=_coerce_timeout_text(exc.stdout),
stderr=_coerce_timeout_text(exc.stderr),
duration_s=time.monotonic() - started,
timed_out=True,
)
return ClaudeCliRun(
command=tuple(cmd),
returncode=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
duration_s=time.monotonic() - started,
)
def read_log_offset(log_path: Path) -> int:
"""Return the current text length of a smoke server log."""
if not log_path.is_file():
return 0
return len(log_path.read_text(encoding="utf-8", errors="replace"))
def read_log_delta(log_path: Path, offset: int) -> str:
"""Return smoke server log text written after ``offset``."""
if not log_path.is_file():
return ""
text = log_path.read_text(encoding="utf-8", errors="replace")
return text[offset:]
def token_evidence(
*,
feature: str,
marker: str,
run: ClaudeCliRun,
log_delta: str,
) -> dict[str, Any]:
"""Collect compact evidence for a CLI feature probe."""
combined = f"{run.combined_output}\n{log_delta}"
lower = combined.lower()
return {
"feature": feature,
"marker_present": bool(marker and marker in combined),
"thinking_delta_count": combined.count("thinking_delta"),
"tool_use_count": combined.count('"tool_use"'),
"tool_result_count": combined.count('"tool_result"'),
"task_tool_count": combined.count('"name": "Task"')
+ combined.count('"name":"Task"'),
"run_in_background_false": "run_in_background" in combined and "false" in lower,
"compact_boundary": "compact_boundary" in combined,
"compact_metadata": "compact_metadata" in combined,
"http_422": 'HTTP/1.1" 422' in combined,
"http_500": bool(re.search(r'HTTP/1\.1" 5\d\d', combined)),
"timed_out": run.timed_out,
}
def classify_probe(
*,
run: ClaudeCliRun,
log_delta: str,
marker: str,
requires_tool_result: bool = False,
requires_task: bool = False,
requires_compact: bool = False,
) -> tuple[str, str]:
"""Classify a probe without failing compatibility characterization failures."""
combined = f"{run.combined_output}\n{log_delta}"
lower = combined.lower()
if _has_proxy_regression(log_delta):
return "failed", "product_failure"
if run.returncode != 0 and any(
marker_text in lower for marker_text in _MISSING_ENV_MARKERS
):
return "skipped", "missing_env"
if run.timed_out:
return "failed", "probe_timeout"
marker_ok = not marker or marker in combined
tool_ok = not requires_tool_result or '"tool_result"' in combined
task_ok = not requires_task or (
('"name": "Task"' in combined or '"name":"Task"' in combined)
and "run_in_background" in combined
and "false" in lower
)
compact_ok = not requires_compact or (
"compact_boundary" in combined
or "compact_metadata" in combined
or "/compact" in combined
or "compact" in lower
)
cli_ok = run.returncode == 0
if cli_ok and marker_ok and tool_ok and task_ok and compact_ok:
return "passed", "passed"
if any(marker_text in lower for marker_text in _UPSTREAM_UNAVAILABLE_MARKERS):
return "failed", "upstream_unavailable"
if not _has_proxy_request(log_delta):
return "failed", "harness_bug"
return "failed", "model_feature_failure"
def make_outcome(
*,
model: str,
full_model: str,
source: str,
feature: str,
marker: str,
run: ClaudeCliRun,
log_delta: str,
log_path: Path,
requires_tool_result: bool = False,
requires_task: bool = False,
requires_compact: bool = False,
) -> NimCliMatrixOutcome:
"""Build one report outcome from a CLI run and its server log delta."""
outcome, classification = classify_probe(
run=run,
log_delta=log_delta,
marker=marker,
requires_tool_result=requires_tool_result,
requires_task=requires_task,
requires_compact=requires_compact,
)
evidence = token_evidence(
feature=feature,
marker=marker,
run=run,
log_delta=log_delta,
)
return NimCliMatrixOutcome(
model=model,
full_model=full_model,
source=source,
feature=feature,
outcome=outcome,
classification=classification,
duration_s=round(run.duration_s, 3),
cli_returncode=run.returncode,
token_evidence=evidence,
request_count=_request_count(log_delta),
log_path=str(log_path),
stdout_excerpt=_excerpt(run.stdout),
stderr_excerpt=_excerpt(run.stderr),
log_excerpt=_excerpt(log_delta),
)
def write_matrix_report(
config: SmokeConfig,
outcomes: list[NimCliMatrixOutcome],
) -> Path:
"""Write the NVIDIA NIM CLI compatibility matrix report."""
config.results_dir.mkdir(parents=True, exist_ok=True)
path = (
config.results_dir
/ f"nvidia-nim-cli-matrix-{config.worker_id}-{int(time.time())}.json"
)
payload = {
"started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"worker_id": config.worker_id,
"target": "nvidia_nim_cli",
"models": sorted({outcome.full_model for outcome in outcomes}),
"outcomes": [asdict(outcome) for outcome in outcomes],
}
path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8")
return path
def regression_failures(outcomes: list[NimCliMatrixOutcome]) -> list[str]:
"""Return report lines for classifications that should fail pytest."""
return [
f"{outcome.full_model} {outcome.feature}: {outcome.classification}"
for outcome in outcomes
if outcome.classification in REGRESSION_CLASSIFICATIONS
]
def _has_proxy_regression(log_delta: str) -> bool:
if "CREATE_MESSAGE_ERROR" in log_delta:
return True
return any(re.search(pattern, log_delta) for pattern in _HTTP_REGRESSION_PATTERNS)
def _has_proxy_request(log_delta: str) -> bool:
return "POST /v1/messages" in log_delta or "API_REQUEST:" in log_delta
def _request_count(log_delta: str) -> int:
access_log_count = log_delta.count("POST /v1/messages")
service_log_count = log_delta.count("API_REQUEST:")
return max(access_log_count, service_log_count)
def _excerpt(value: str, *, max_chars: int = 2400) -> str:
if len(value) <= max_chars:
return redacted(value)
return redacted(value[-max_chars:])
def _coerce_timeout_text(value: str | bytes | None) -> str:
if value is None:
return ""
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace")
return value
+2
View File
@@ -69,6 +69,8 @@ def classify_outcome(*, nodeid: str, outcome: str, detail: str) -> str:
text = f"{nodeid}\n{detail}".lower() text = f"{nodeid}\n{detail}".lower()
if outcome == "skipped": if outcome == "skipped":
if "smoke target disabled" in text:
return "target_disabled"
if any( if any(
marker in text marker in text
for marker in ( for marker in (
@@ -0,0 +1,325 @@
from __future__ import annotations
import json
import shutil
import uuid
from pathlib import Path
import pytest
from smoke.lib.config import ProviderModel, SmokeConfig
from smoke.lib.e2e import SmokeServerDriver
from smoke.lib.nvidia_nim_cli import (
ClaudeCliRun,
NimCliMatrixOutcome,
make_outcome,
read_log_delta,
read_log_offset,
regression_failures,
run_claude_cli,
write_matrix_report,
)
from smoke.lib.server import RunningServer
pytestmark = [pytest.mark.live, pytest.mark.smoke_target("nvidia_nim_cli")]
def test_nvidia_nim_cli_matrix_e2e(smoke_config: SmokeConfig, tmp_path: Path) -> None:
if not smoke_config.has_provider_configuration("nvidia_nim"):
pytest.skip("missing_env: NVIDIA_NIM_API_KEY is not configured")
claude_bin = shutil.which(smoke_config.claude_bin)
if not claude_bin:
pytest.skip(f"missing_env: Claude CLI not found: {smoke_config.claude_bin}")
provider_models = smoke_config.nvidia_nim_cli_models()
if not provider_models:
pytest.skip("missing_env: no NVIDIA NIM CLI smoke models configured")
outcomes: list[NimCliMatrixOutcome] = []
for provider_model in provider_models:
with SmokeServerDriver(
smoke_config,
name=f"product-nvidia-nim-cli-{_slug(provider_model.model_name)}",
env_overrides={
"MODEL": provider_model.full_model,
"MESSAGING_PLATFORM": "none",
"ENABLE_MODEL_THINKING": "true",
"LOG_RAW_API_PAYLOADS": "true",
"LOG_RAW_SSE_EVENTS": "true",
},
).run() as server:
model_dir = tmp_path / _slug(provider_model.model_name)
outcomes.extend(
[
_basic_text(
claude_bin, server, smoke_config, provider_model, model_dir
),
_thinking(
claude_bin, server, smoke_config, provider_model, model_dir
),
_tool_use_roundtrip(
claude_bin, server, smoke_config, provider_model, model_dir
),
_interleaved_thinking_tool(
claude_bin, server, smoke_config, provider_model, model_dir
),
_subagent_task(
claude_bin, server, smoke_config, provider_model, model_dir
),
_compact_command(
claude_bin, server, smoke_config, provider_model, model_dir
),
]
)
report_path = write_matrix_report(smoke_config, outcomes)
failures = regression_failures(outcomes)
assert not failures, (
f"NVIDIA NIM CLI matrix regressions written to {report_path}:\n"
+ "\n".join(failures)
)
def _basic_text(
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
model_dir: Path,
) -> NimCliMatrixOutcome:
marker = _marker("BASIC")
return _run_probe(
claude_bin=claude_bin,
server=server,
smoke_config=smoke_config,
provider_model=provider_model,
workspace=model_dir / "basic_text",
feature="basic_text",
marker=marker,
prompt=f"Reply with exactly {marker} and no other text.",
tools="",
)
def _thinking(
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
model_dir: Path,
) -> NimCliMatrixOutcome:
marker = _marker("THINK")
return _run_probe(
claude_bin=claude_bin,
server=server,
smoke_config=smoke_config,
provider_model=provider_model,
workspace=model_dir / "thinking",
feature="thinking",
marker=marker,
prompt=(
"Think privately about the request, then reply with exactly "
f"{marker} and no other text."
),
tools="",
extra_args=("--effort", "high"),
)
def _tool_use_roundtrip(
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
model_dir: Path,
) -> NimCliMatrixOutcome:
marker = _marker("TOOL")
workspace = model_dir / "tool_use_roundtrip"
(workspace / "smoke-read.txt").parent.mkdir(parents=True, exist_ok=True)
(workspace / "smoke-read.txt").write_text(marker, encoding="utf-8")
return _run_probe(
claude_bin=claude_bin,
server=server,
smoke_config=smoke_config,
provider_model=provider_model,
workspace=workspace,
feature="tool_use_roundtrip",
marker=marker,
prompt=(
"Use the Read tool to read smoke-read.txt. Reply with exactly the "
"secret token from that file and no other text."
),
tools="Read",
requires_tool_result=True,
)
def _interleaved_thinking_tool(
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
model_dir: Path,
) -> NimCliMatrixOutcome:
marker = _marker("INTERLEAVED")
workspace = model_dir / "interleaved_thinking_tool"
(workspace / "smoke-interleaved.txt").parent.mkdir(parents=True, exist_ok=True)
(workspace / "smoke-interleaved.txt").write_text(marker, encoding="utf-8")
return _run_probe(
claude_bin=claude_bin,
server=server,
smoke_config=smoke_config,
provider_model=provider_model,
workspace=workspace,
feature="interleaved_thinking_tool",
marker=marker,
prompt=(
"Think privately, use Read on smoke-interleaved.txt, then reply with "
"exactly the secret token from that file and no other text."
),
tools="Read",
extra_args=("--effort", "high"),
requires_tool_result=True,
)
def _subagent_task(
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
model_dir: Path,
) -> NimCliMatrixOutcome:
marker = _marker("TASK")
workspace = model_dir / "subagent_task"
(workspace / "smoke-subagent.txt").parent.mkdir(parents=True, exist_ok=True)
(workspace / "smoke-subagent.txt").write_text(marker, encoding="utf-8")
agents = json.dumps(
{
"smoke_reader": {
"description": "Reads one requested file and returns its token.",
"prompt": (
"Read the requested file with Read and return only the token "
"inside it."
),
}
}
)
return _run_probe(
claude_bin=claude_bin,
server=server,
smoke_config=smoke_config,
provider_model=provider_model,
workspace=workspace,
feature="subagent_task",
marker=marker,
prompt=(
"Use the smoke_reader subagent with Task to read smoke-subagent.txt. "
"Reply with exactly the token the subagent returns and no other text."
),
tools="Task,Read",
extra_args=("--agents", agents),
requires_tool_result=True,
)
def _compact_command(
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
model_dir: Path,
) -> NimCliMatrixOutcome:
marker = _marker("COMPACT")
workspace = model_dir / "compact_command"
session_id = str(uuid.uuid4())
offset = read_log_offset(server.log_path)
first = run_claude_cli(
claude_bin=claude_bin,
server=server,
config=smoke_config,
cwd=workspace,
prompt=f"Remember this smoke token: {marker}. Reply with exactly {marker}.",
tools="",
session_id=session_id,
no_session_persistence=False,
)
second = run_claude_cli(
claude_bin=claude_bin,
server=server,
config=smoke_config,
cwd=workspace,
prompt=f"/compact preserve {marker}",
tools="",
resume_session_id=session_id,
no_session_persistence=False,
)
log_delta = read_log_delta(server.log_path, offset)
run = ClaudeCliRun(
command=(*first.command, "&&", *second.command),
returncode=second.returncode if first.returncode == 0 else first.returncode,
stdout=f"{first.stdout}\n{second.stdout}",
stderr=f"{first.stderr}\n{second.stderr}",
duration_s=first.duration_s + second.duration_s,
timed_out=first.timed_out or second.timed_out,
)
return make_outcome(
model=provider_model.model_name,
full_model=provider_model.full_model,
source=provider_model.source,
feature="compact_command",
marker="",
run=run,
log_delta=log_delta,
log_path=server.log_path,
requires_compact=True,
)
def _run_probe(
*,
claude_bin: str,
server: RunningServer,
smoke_config: SmokeConfig,
provider_model: ProviderModel,
workspace: Path,
feature: str,
marker: str,
prompt: str,
tools: str | None,
extra_args: tuple[str, ...] = (),
requires_tool_result: bool = False,
requires_task: bool = False,
) -> NimCliMatrixOutcome:
offset = read_log_offset(server.log_path)
run = run_claude_cli(
claude_bin=claude_bin,
server=server,
config=smoke_config,
cwd=workspace,
prompt=prompt,
tools=tools,
extra_args=extra_args,
)
log_delta = read_log_delta(server.log_path, offset)
return make_outcome(
model=provider_model.model_name,
full_model=provider_model.full_model,
source=provider_model.source,
feature=feature,
marker=marker,
run=run,
log_delta=log_delta,
log_path=server.log_path,
requires_tool_result=requires_tool_result,
requires_task=requires_task,
)
def _marker(prefix: str) -> str:
return f"FCC_NIM_{prefix}_{uuid.uuid4().hex[:8].upper()}"
def _slug(value: str) -> str:
return "".join(char if char.isalnum() else "-" for char in value).strip("-")
@@ -0,0 +1,196 @@
from __future__ import annotations
import json
from pathlib import Path
from config.settings import Settings
from smoke.lib.config import DEFAULT_TARGETS, SmokeConfig
from smoke.lib.nvidia_nim_cli import (
ClaudeCliRun,
make_outcome,
regression_failures,
write_matrix_report,
)
def _smoke_config(tmp_path: Path) -> SmokeConfig:
return SmokeConfig(
root=tmp_path,
results_dir=tmp_path / ".smoke-results",
live=False,
interactive=False,
targets=DEFAULT_TARGETS,
provider_matrix=frozenset(),
timeout_s=45.0,
prompt="Reply with exactly: FCC_SMOKE_PONG",
claude_bin="claude",
worker_id="test-worker",
settings=Settings.model_construct(anthropic_auth_token=""),
)
def test_nvidia_nim_cli_matrix_report_shape_and_redaction(
tmp_path: Path, monkeypatch
) -> None:
monkeypatch.setenv("NVIDIA_NIM_API_KEY", "secret-nim-key")
run = ClaudeCliRun(
command=("claude", "-p", "redacted"),
returncode=0,
stdout="FCC_NIM_BASIC secret-nim-key",
stderr="",
duration_s=1.25,
)
outcome = make_outcome(
model="z-ai/glm-5.1",
full_model="nvidia_nim/z-ai/glm-5.1",
source="nvidia_nim_cli_default",
feature="basic_text",
marker="FCC_NIM_BASIC",
run=run,
log_delta='POST /v1/messages HTTP/1.1" 200 OK secret-nim-key',
log_path=tmp_path / "server.log",
)
path = write_matrix_report(_smoke_config(tmp_path), [outcome])
payload = json.loads(path.read_text(encoding="utf-8"))
assert path.name.startswith("nvidia-nim-cli-matrix-test-worker-")
assert payload["target"] == "nvidia_nim_cli"
assert payload["models"] == ["nvidia_nim/z-ai/glm-5.1"]
saved = payload["outcomes"][0]
assert saved["feature"] == "basic_text"
assert saved["classification"] == "passed"
assert saved["request_count"] == 1
assert saved["token_evidence"]["marker_present"] is True
assert "secret-nim-key" not in path.read_text(encoding="utf-8")
def test_nvidia_nim_cli_matrix_regression_detection(tmp_path: Path) -> None:
run = ClaudeCliRun(
command=("claude", "-p", "x"),
returncode=0,
stdout="",
stderr="",
duration_s=0.1,
)
outcome = make_outcome(
model="z-ai/glm-5.1",
full_model="nvidia_nim/z-ai/glm-5.1",
source="nvidia_nim_cli_default",
feature="basic_text",
marker="FCC_NIM_BASIC",
run=run,
log_delta='POST /v1/messages HTTP/1.1" 500 Internal Server Error',
log_path=tmp_path / "server.log",
)
assert outcome.classification == "product_failure"
assert regression_failures([outcome]) == [
"nvidia_nim/z-ai/glm-5.1 basic_text: product_failure"
]
def test_nvidia_nim_cli_matrix_model_feature_failures_do_not_regress(
tmp_path: Path,
) -> None:
run = ClaudeCliRun(
command=("claude", "-p", "x"),
returncode=0,
stdout="ordinary answer",
stderr="",
duration_s=0.1,
)
outcome = make_outcome(
model="z-ai/glm-5.1",
full_model="nvidia_nim/z-ai/glm-5.1",
source="nvidia_nim_cli_default",
feature="tool_use_roundtrip",
marker="FCC_NIM_TOOL",
run=run,
log_delta='POST /v1/messages HTTP/1.1" 200 OK',
log_path=tmp_path / "server.log",
requires_tool_result=True,
)
assert outcome.classification == "model_feature_failure"
assert regression_failures([outcome]) == []
def test_nvidia_nim_cli_raw_payload_log_counts_as_proxy_request(
tmp_path: Path,
) -> None:
run = ClaudeCliRun(
command=("claude", "-p", "x"),
returncode=0,
stdout="ordinary answer",
stderr="",
duration_s=0.1,
)
outcome = make_outcome(
model="z-ai/glm-5.1",
full_model="nvidia_nim/z-ai/glm-5.1",
source="nvidia_nim_cli_default",
feature="subagent_task",
marker="FCC_NIM_TASK",
run=run,
log_delta="API_REQUEST: request_id=req_1 model=z-ai/glm-5.1 messages=2",
log_path=tmp_path / "server.log",
requires_task=True,
)
assert outcome.classification == "model_feature_failure"
assert outcome.request_count == 1
assert regression_failures([outcome]) == []
def test_nvidia_nim_cli_timeout_is_not_model_missing(
tmp_path: Path,
) -> None:
run = ClaudeCliRun(
command=("claude", "-p", "x"),
returncode=None,
stdout='{"type":"assistant","content":[{"type":"text","text":"FCC_NIM_TOOL"}]}',
stderr="",
duration_s=45.0,
timed_out=True,
)
outcome = make_outcome(
model="z-ai/glm-5.1",
full_model="nvidia_nim/z-ai/glm-5.1",
source="nvidia_nim_cli_default",
feature="tool_use_roundtrip",
marker="FCC_NIM_TOOL",
run=run,
log_delta="API_REQUEST: request_id=req_1 model=z-ai/glm-5.1 messages=2",
log_path=tmp_path / "server.log",
)
assert outcome.classification == "probe_timeout"
assert outcome.token_evidence["timed_out"] is True
assert regression_failures([outcome]) == []
def test_nvidia_nim_cli_success_beats_verbose_timeout_words(tmp_path: Path) -> None:
run = ClaudeCliRun(
command=("claude", "-p", "x"),
returncode=0,
stdout="FCC_NIM_THINK",
stderr="",
duration_s=0.1,
)
outcome = make_outcome(
model="z-ai/glm-5.1",
full_model="nvidia_nim/z-ai/glm-5.1",
source="nvidia_nim_cli_default",
feature="thinking",
marker="FCC_NIM_THINK",
run=run,
log_delta=(
"API_REQUEST: request_id=req_1 model=z-ai/glm-5.1 messages=1 "
"read_timeout_s=300"
),
log_path=tmp_path / "server.log",
)
assert outcome.classification == "passed"
assert outcome.request_count == 1
+75
View File
@@ -4,10 +4,14 @@ from pathlib import Path
from types import SimpleNamespace from types import SimpleNamespace
from smoke.lib.config import ( from smoke.lib.config import (
ALL_TARGETS,
DEFAULT_TARGETS, DEFAULT_TARGETS,
NVIDIA_NIM_CLI_DEFAULT_MODELS,
OPT_IN_TARGETS,
PROVIDER_SMOKE_DEFAULT_MODELS, PROVIDER_SMOKE_DEFAULT_MODELS,
TARGET_REQUIRED_ENV, TARGET_REQUIRED_ENV,
SmokeConfig, SmokeConfig,
nvidia_nim_cli_model_refs,
) )
@@ -52,6 +56,13 @@ def test_ollama_is_default_smoke_target() -> None:
assert "ollama" in TARGET_REQUIRED_ENV assert "ollama" in TARGET_REQUIRED_ENV
def test_nvidia_nim_cli_is_opt_in_smoke_target() -> None:
assert "nvidia_nim_cli" not in DEFAULT_TARGETS
assert "nvidia_nim_cli" in OPT_IN_TARGETS
assert "nvidia_nim_cli" in ALL_TARGETS
assert "nvidia_nim_cli" in TARGET_REQUIRED_ENV
def test_ollama_provider_configuration_uses_base_url() -> None: def test_ollama_provider_configuration_uses_base_url() -> None:
config = _smoke_config() config = _smoke_config()
@@ -190,3 +201,67 @@ def test_provider_smoke_does_not_include_default_local_urls_when_unmapped(
config = _smoke_config(settings=_settings(model="nvidia_nim/test")) config = _smoke_config(settings=_settings(model="nvidia_nim/test"))
assert config.provider_smoke_models() == [] assert config.provider_smoke_models() == []
def test_nvidia_nim_cli_default_models_are_normalized() -> None:
refs = nvidia_nim_cli_model_refs({})
assert tuple(refs) == tuple(
f"nvidia_nim/{model}" for model in NVIDIA_NIM_CLI_DEFAULT_MODELS
)
assert "nvidia_nim/deepseek-ai/deepseek-v4-pro" in refs
assert "nvidia_nim/deepseek-ai/deepseek-v4-flash" in refs
assert set(refs.values()) == {"nvidia_nim_cli_default"}
def test_nvidia_nim_cli_models_override_and_append() -> None:
refs = nvidia_nim_cli_model_refs(
{
"FCC_SMOKE_NIM_MODELS": "z-ai/glm-5.1,nvidia_nim/custom/model",
"FCC_SMOKE_NIM_EXTRA_MODELS": "moonshotai/kimi-k2.6,z-ai/glm-5.1",
}
)
assert tuple(refs) == (
"nvidia_nim/z-ai/glm-5.1",
"nvidia_nim/custom/model",
"nvidia_nim/moonshotai/kimi-k2.6",
)
assert refs["nvidia_nim/z-ai/glm-5.1"] == "FCC_SMOKE_NIM_MODELS"
assert refs["nvidia_nim/moonshotai/kimi-k2.6"] == ("FCC_SMOKE_NIM_EXTRA_MODELS")
def test_nvidia_nim_cli_models_reject_empty_override() -> None:
try:
nvidia_nim_cli_model_refs({"FCC_SMOKE_NIM_MODELS": " , "})
except ValueError as exc:
assert "FCC_SMOKE_NIM_MODELS" in str(exc)
else:
raise AssertionError("expected empty NVIDIA NIM CLI model override to fail")
def test_nvidia_nim_cli_models_reject_wrong_provider_prefix() -> None:
try:
nvidia_nim_cli_model_refs({"FCC_SMOKE_NIM_MODELS": "open_router/model"})
except ValueError as exc:
assert "nvidia_nim" in str(exc)
else:
raise AssertionError("expected wrong provider prefix to fail")
def test_smoke_config_returns_nvidia_nim_cli_provider_models(monkeypatch) -> None:
monkeypatch.delenv("FCC_SMOKE_NIM_MODELS", raising=False)
monkeypatch.delenv("FCC_SMOKE_NIM_EXTRA_MODELS", raising=False)
config = _smoke_config(
settings=_settings(
model="nvidia_nim/z-ai/glm-5.1",
nvidia_nim_api_key="nim-key",
ollama_base_url="",
)
)
models = config.nvidia_nim_cli_models()
assert models[0].provider == "nvidia_nim"
assert models[0].full_model == "nvidia_nim/z-ai/glm-5.1"
assert models[0].source == "nvidia_nim_cli_default"
+11
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
import json import json
from pathlib import Path from pathlib import Path
from smoke.lib.report import classify_outcome
from smoke.lib.report_summary import format_summary, summarize_reports from smoke.lib.report_summary import format_summary, summarize_reports
@@ -32,3 +33,13 @@ def test_smoke_report_summary_counts_regression_classes(tmp_path: Path) -> None:
assert summary.classifications["product_failure"] == 1 assert summary.classifications["product_failure"] == 1
assert summary.has_regression assert summary.has_regression
assert "status=regression" in format_summary(summary) assert "status=regression" in format_summary(summary)
def test_target_disabled_skip_is_not_missing_env() -> None:
classification = classify_outcome(
nodeid="smoke/product/test_api_product_live.py::test_api_basic_conversation_e2e",
outcome="skipped",
detail="Skipped: smoke target disabled: api",
)
assert classification == "target_disabled"
+230
View File
@@ -8,6 +8,7 @@ from httpx import Request, Response
from config.nim import NimSettings from config.nim import NimSettings
from providers.defaults import NVIDIA_NIM_DEFAULT_BASE from providers.defaults import NVIDIA_NIM_DEFAULT_BASE
from providers.nvidia_nim import NvidiaNimProvider from providers.nvidia_nim import NvidiaNimProvider
from providers.nvidia_nim.request import NIM_TOOL_ARGUMENT_ALIASES_KEY
# Mock data classes # Mock data classes
@@ -47,6 +48,46 @@ class MockRequest:
setattr(self, k, v) setattr(self, k, v)
def _input_json_deltas(events):
deltas = []
for event in events:
if "event: content_block_delta" not in event:
continue
for line in event.splitlines():
if not line.startswith("data: "):
continue
payload = json.loads(line[6:])
delta = payload.get("delta", {})
if delta.get("type") == "input_json_delta":
deltas.append(delta.get("partial_json", ""))
return deltas
def _tool_call_chunk(
*,
name,
arguments,
tool_id="call_1",
index=0,
finish_reason=None,
):
mock_tc = MagicMock()
mock_tc.index = index
mock_tc.id = tool_id
mock_tc.function.name = name
mock_tc.function.arguments = arguments
mock_chunk = MagicMock()
mock_chunk.choices = [
MagicMock(
delta=MagicMock(content=None, reasoning_content="", tool_calls=[mock_tc]),
finish_reason=finish_reason,
)
]
mock_chunk.usage = None
return mock_chunk
def _make_bad_request_error(message: str) -> openai.BadRequestError: def _make_bad_request_error(message: str) -> openai.BadRequestError:
response = Response( response = Response(
status_code=400, status_code=400,
@@ -434,6 +475,195 @@ async def test_tool_call_stream(nim_provider):
assert "search" in starts[0] assert "search" in starts[0]
@pytest.mark.asyncio
async def test_stream_response_restores_aliased_tool_arguments(nim_provider):
"""NIM-safe argument aliases are restored before Anthropic SSE emission."""
req = MockRequest(
tools=[
MockTool(
"Grep",
"Search file contents",
{
"type": "object",
"properties": {
"pattern": {"type": "string"},
"-A": {"type": "number"},
"type": {"type": "string"},
},
"required": ["pattern"],
},
)
]
)
mock_chunk = _tool_call_chunk(
name="Grep",
arguments=json.dumps({"pattern": "needle", "-A": 2, "_fcc_arg_type": "py"}),
)
async def mock_stream():
yield mock_chunk
with patch.object(
nim_provider._client.chat.completions, "create", new_callable=AsyncMock
) as mock_create:
mock_create.return_value = mock_stream()
events = [e async for e in nim_provider.stream_response(req)]
await_args = mock_create.await_args
assert await_args is not None
create_kwargs = await_args.kwargs
assert NIM_TOOL_ARGUMENT_ALIASES_KEY not in create_kwargs
properties = create_kwargs["tools"][0]["function"]["parameters"]["properties"]
assert "-A" in properties
assert "type" not in properties
assert "_fcc_arg_A" not in properties
assert "_fcc_arg_type" in properties
deltas = _input_json_deltas(events)
assert len(deltas) == 1
assert json.loads(deltas[0]) == {"pattern": "needle", "-A": 2, "type": "py"}
assert "_fcc_arg_type" not in deltas[0]
@pytest.mark.asyncio
async def test_stream_response_buffers_chunked_aliased_tool_arguments(nim_provider):
"""Chunked aliased args are emitted once as restored Claude Code args."""
req = MockRequest(
tools=[
MockTool(
"Grep",
"Search file contents",
{
"type": "object",
"properties": {
"pattern": {"type": "string"},
"type": {"type": "string"},
},
"required": ["pattern"],
},
)
]
)
first_chunk = _tool_call_chunk(
name="Grep",
arguments='{"pattern": "needle", ',
tool_id="call_chunked",
)
second_chunk = _tool_call_chunk(
name=None,
arguments='"_fcc_arg_type": "py"}',
tool_id="call_chunked",
)
async def mock_stream():
yield first_chunk
yield second_chunk
with patch.object(
nim_provider._client.chat.completions, "create", new_callable=AsyncMock
) as mock_create:
mock_create.return_value = mock_stream()
events = [e async for e in nim_provider.stream_response(req)]
deltas = _input_json_deltas(events)
assert len(deltas) == 1
assert json.loads(deltas[0]) == {"pattern": "needle", "type": "py"}
@pytest.mark.asyncio
async def test_stream_response_restores_nested_aliased_tool_arguments(nim_provider):
req = MockRequest(
tools=[
MockTool(
"NotionLike",
"Nested type schema",
{
"type": "object",
"properties": {
"parent": {
"type": "object",
"properties": {
"type": {"type": "string"},
"id": {"type": "string"},
},
"required": ["type", "id"],
}
},
"required": ["parent"],
},
)
]
)
mock_chunk = _tool_call_chunk(
name="NotionLike",
arguments=json.dumps(
{"parent": {"_fcc_arg_type": "page_id", "id": "page_123"}}
),
)
async def mock_stream():
yield mock_chunk
with patch.object(
nim_provider._client.chat.completions, "create", new_callable=AsyncMock
) as mock_create:
mock_create.return_value = mock_stream()
events = [e async for e in nim_provider.stream_response(req)]
deltas = _input_json_deltas(events)
assert len(deltas) == 1
assert json.loads(deltas[0]) == {"parent": {"type": "page_id", "id": "page_123"}}
@pytest.mark.asyncio
async def test_stream_response_task_tool_still_forces_background_false(nim_provider):
req = MockRequest(
tools=[
MockTool(
"Task",
"Run a subagent",
{
"type": "object",
"properties": {
"description": {"type": "string"},
"prompt": {"type": "string"},
"run_in_background": {"type": "boolean"},
},
"required": ["description", "prompt"],
},
)
]
)
mock_chunk = _tool_call_chunk(
name="Task",
arguments=json.dumps(
{
"description": "Inspect",
"prompt": "Read the marker",
"run_in_background": True,
}
),
tool_id="call_task",
)
async def mock_stream():
yield mock_chunk
with patch.object(
nim_provider._client.chat.completions, "create", new_callable=AsyncMock
) as mock_create:
mock_create.return_value = mock_stream()
events = [e async for e in nim_provider.stream_response(req)]
deltas = _input_json_deltas(events)
assert len(deltas) == 1
assert json.loads(deltas[0])["run_in_background"] is False
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_stream_response_retries_without_reasoning_budget(nim_provider): async def test_stream_response_retries_without_reasoning_budget(nim_provider):
req = MockRequest() req = MockRequest()
+149
View File
@@ -1,6 +1,8 @@
"""Tests for providers/nvidia_nim/request.py.""" """Tests for providers/nvidia_nim/request.py."""
from copy import deepcopy
from types import SimpleNamespace from types import SimpleNamespace
from typing import Any
from unittest.mock import MagicMock from unittest.mock import MagicMock
import pytest import pytest
@@ -8,12 +10,36 @@ import pytest
from config.nim import NimSettings from config.nim import NimSettings
from core.anthropic import set_if_not_none from core.anthropic import set_if_not_none
from providers.nvidia_nim.request import ( from providers.nvidia_nim.request import (
NIM_TOOL_ARGUMENT_ALIASES_KEY,
_set_extra, _set_extra,
body_without_nim_tool_argument_aliases,
build_request_body, build_request_body,
clone_body_without_chat_template, clone_body_without_chat_template,
clone_body_without_reasoning_content, clone_body_without_reasoning_content,
nim_tool_argument_aliases_from_body,
) )
GREP_SCHEMA_FROM_SERVER_LOG: dict[str, Any] = {
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "The regular expression"},
"path": {"type": "string", "description": "File or directory to search"},
"glob": {"type": "string", "description": "Glob to filter files"},
"output_mode": {
"type": "string",
"enum": ["content", "files_with_matches", "count"],
},
"-A": {"type": "number", "description": "Lines after match"},
"-B": {"type": "number", "description": "Lines before match"},
"-C": {"type": "number", "description": "Lines around match"},
"-i": {"type": "boolean", "description": "Case insensitive"},
"-n": {"type": "boolean", "description": "Show line numbers"},
"type": {"type": "string", "description": "File type to search"},
},
"additionalProperties": False,
"required": ["pattern"],
}
@pytest.fixture @pytest.fixture
def req(): def req():
@@ -121,6 +147,129 @@ class TestBuildRequestBody:
assert tool_schema["additionalProperties"] is False assert tool_schema["additionalProperties"] is False
assert tool_schema["properties"]["nested"]["additionalProperties"] is False assert tool_schema["properties"]["nested"]["additionalProperties"] is False
def test_grep_schema_type_parameter_is_aliased_without_mutating_request(self, req):
tool_schema = deepcopy(GREP_SCHEMA_FROM_SERVER_LOG)
tool_schema["properties"]["_fcc_arg_type"] = {
"type": "string",
"description": "Existing safe property that collides with the alias",
}
tool_schema["required"] = ["pattern", "-A", "_fcc_arg_type"]
original_schema = deepcopy(tool_schema)
req.tools = [
SimpleNamespace(
name="Grep",
description="Search file contents",
input_schema=tool_schema,
)
]
body = build_request_body(req, NimSettings(), thinking_enabled=False)
parameters = body["tools"][0]["function"]["parameters"]
properties = parameters["properties"]
aliases = body[NIM_TOOL_ARGUMENT_ALIASES_KEY]["Grep"]
assert "additionalProperties" not in parameters
assert properties["-A"] == original_schema["properties"]["-A"]
assert properties["-B"] == original_schema["properties"]["-B"]
assert properties["-C"] == original_schema["properties"]["-C"]
assert properties["-i"] == original_schema["properties"]["-i"]
assert properties["-n"] == original_schema["properties"]["-n"]
assert "type" not in properties
assert properties["pattern"] == original_schema["properties"]["pattern"]
assert properties["output_mode"]["enum"] == [
"content",
"files_with_matches",
"count",
]
assert (
properties["_fcc_arg_type"]
== original_schema["properties"]["_fcc_arg_type"]
)
assert aliases == {"_fcc_arg_type_2": "type"}
assert properties["_fcc_arg_type_2"] == original_schema["properties"]["type"]
assert "-A" in parameters["required"]
assert "_fcc_arg_type" in parameters["required"]
assert tool_schema == original_schema
def test_safe_tool_schema_does_not_add_alias_metadata(self, req):
tool_schema = {
"type": "object",
"properties": {
"pattern": {"type": "string"},
"path": {"type": "string"},
"output_mode": {"type": "string", "enum": ["content", "count"]},
},
"required": ["pattern"],
}
req.tools = [
SimpleNamespace(
name="Glob",
description="Find files",
input_schema=tool_schema,
)
]
body = build_request_body(req, NimSettings(), thinking_enabled=False)
assert NIM_TOOL_ARGUMENT_ALIASES_KEY not in body
parameters = body["tools"][0]["function"]["parameters"]
assert parameters["properties"] == tool_schema["properties"]
assert parameters["required"] == ["pattern"]
def test_nested_schema_keyword_properties_are_aliased_without_mutating_request(
self, req
):
tool_schema = {
"type": "object",
"properties": {
"parent": {
"type": "object",
"properties": {
"type": {"type": "string", "enum": ["page_id"]},
"id": {"type": "string"},
},
"required": ["type", "id"],
}
},
"required": ["parent"],
}
original_schema = deepcopy(tool_schema)
req.tools = [
SimpleNamespace(
name="NotionLike",
description="Nested type schema",
input_schema=tool_schema,
)
]
body = build_request_body(req, NimSettings(), thinking_enabled=False)
aliases = body[NIM_TOOL_ARGUMENT_ALIASES_KEY]["NotionLike"]
parent = body["tools"][0]["function"]["parameters"]["properties"]["parent"]
parent_properties = parent["properties"]
assert "type" not in parent_properties
assert parent_properties["_fcc_arg_type"] == {
"type": "string",
"enum": ["page_id"],
}
assert parent["required"] == ["_fcc_arg_type", "id"]
assert aliases == {"_fcc_arg_type": "type"}
assert tool_schema == original_schema
def test_private_alias_metadata_is_stripped_without_mutating_body(self):
body = {
"model": "test",
NIM_TOOL_ARGUMENT_ALIASES_KEY: {"Grep": {"_fcc_arg_A": "-A"}},
}
upstream_body = body_without_nim_tool_argument_aliases(body)
assert NIM_TOOL_ARGUMENT_ALIASES_KEY not in upstream_body
assert body[NIM_TOOL_ARGUMENT_ALIASES_KEY] == {"Grep": {"_fcc_arg_A": "-A"}}
assert nim_tool_argument_aliases_from_body(body) == {
"Grep": {"_fcc_arg_A": "-A"}
}
def test_reasoning_params_in_extra_body(self): def test_reasoning_params_in_extra_body(self):
req = MagicMock() req = MagicMock()
req.model = "test" req.model = "test"