feat(logging): structured TRACE events and end-to-end request correlation

Add core/trace.py with trace_event, traced_async_stream, and payload snapshots.
Merge TRACE fields into JSON logs; promote claude_session_id, http path/method.
Instrument API, messaging/CLI, and OpenAI-compat/native provider paths.
Harden log sink with enqueue and stdlib intercept re-entrancy guard.
Document behavior in .env.example and README; extend tests.
This commit is contained in:
Alishahryar1
2026-05-10 18:24:48 -07:00
parent 1e97dff214
commit 29e7714337
18 changed files with 646 additions and 140 deletions
+6 -1
View File
@@ -136,6 +136,11 @@ WEB_FETCH_ALLOWED_SCHEMES=http,https
WEB_FETCH_ALLOW_PRIVATE_NETWORKS=false
# Structured TRACE logs: lines with `"trace": true` merge ingress/routing/cli/provider/egress
# stages. Conversation text is logged in those payloads (verbatim). Values under keys named
# like ``api_key`` / ``authorization`` are redacted. Raw transport payloads still require
# the LOG_RAW_* toggles below.
#
# Verbose diagnostics (avoid logging raw prompts / SSE bodies in production)
DEBUG_PLATFORM_EDITS=false
DEBUG_SUBAGENT_STACK=false
@@ -144,7 +149,7 @@ LOG_RAW_API_PAYLOADS=false
LOG_RAW_SSE_EVENTS=false
# When true, log full exception text and tracebacks for unhandled errors (may leak request-derived data).
LOG_API_ERROR_TRACEBACKS=false
# When true, log message/transcription text previews in messaging adapters (may leak user content).
# When true, log message/transcription text previews in messaging adapters only (handler ingress always TRACEs verbatim text separately).
LOG_RAW_MESSAGING_CONTENT=false
# When true, log full Claude CLI stderr, non-JSON stdout lines, and parser error text.
LOG_RAW_CLI_DIAGNOSTICS=false
+2
View File
@@ -406,6 +406,8 @@ LOG_MESSAGING_ERROR_DETAILS=false
Raw logging flags can expose prompts, tool arguments, paths, and model output. Keep them off unless you are debugging locally.
Structured TRACE rows append fields such as `"trace": true`, `stage`, `event`, and `source` and include conversation context needed to follow Claude Code flows end-to-end. Dictionary keys resembling credentials (for example `api_key` / `authorization` values nested in structured payloads) are redacted; arbitrary prose you type into prompts may still appear verbatim.
### 6. Local Web Tools
```dotenv
+23 -8
View File
@@ -13,6 +13,7 @@ from starlette.types import Receive, Scope, Send
from config.logging_config import configure_logging
from config.settings import get_settings
from core.trace import extract_claude_session_id_from_headers, trace_event
from providers.exceptions import ProviderError
from .admin_routes import router as admin_router
@@ -95,6 +96,18 @@ def create_app(*, lifespan_enabled: bool = True) -> FastAPI:
app_kwargs["lifespan"] = lifespan
app = FastAPI(**app_kwargs)
@app.middleware("http")
async def trace_http_correlation(request: Request, call_next):
"""Attach HTTP identifiers and optional Claude session id to logs."""
claude_sid = extract_claude_session_id_from_headers(request.headers)
with logger.contextualize(
http_method=request.method,
http_path=request.url.path,
claude_session_id=claude_sid,
):
response = await call_next(request)
return response
# Register routes
app.include_router(admin_router)
app.include_router(router)
@@ -111,14 +124,16 @@ def create_app(*, lifespan_enabled: bool = True) -> FastAPI:
message_summary, tool_names = summarize_request_validation_body(body)
logger.debug(
"Request validation failed: path={} query={} error_locs={} error_types={} message_summary={} tool_names={}",
request.url.path,
str(request.url.query),
[list(error.get("loc", ())) for error in exc.errors()],
[str(error.get("type", "")) for error in exc.errors()],
message_summary,
tool_names,
trace_event(
stage="ingress",
event="server.request.validation_failed",
source="api",
path=request.url.path,
query=dict(request.query_params),
error_locs=[list(error.get("loc", ())) for error in exc.errors()],
error_types=[str(error.get("type", "")) for error in exc.errors()],
message_summary=message_summary,
tool_names=tool_names,
)
return await request_validation_exception_handler(request, exc)
+8
View File
@@ -5,6 +5,7 @@ from loguru import logger
from config.settings import Settings
from core.anthropic import get_token_count
from core.trace import trace_event
from providers.registry import ProviderRegistry
from . import dependencies
@@ -231,6 +232,7 @@ async def list_models(
_auth=Depends(require_api_key),
):
"""List the model ids this proxy advertises to Claude-compatible clients."""
trace_event(stage="ingress", event="api.models.list", source="api")
registry = getattr(request.app.state, "provider_registry", None)
provider_registry = registry if isinstance(registry, ProviderRegistry) else None
return _build_models_list_response(settings, provider_registry)
@@ -250,5 +252,11 @@ async def stop_cli(request: Request, _auth=Depends(require_api_key)):
raise HTTPException(status_code=503, detail="Messaging system not initialized")
count = await handler.stop_all_tasks()
trace_event(
stage="ingress",
event="api.cli.stop_via_handler",
source="api",
cancelled_nodes=count,
)
logger.info("STOP_CLI: source=handler cancelled_count={}", count)
return {"status": "stopped", "cancelled_count": count}
+66 -15
View File
@@ -14,6 +14,7 @@ from loguru import logger
from config.settings import Settings
from core.anthropic import get_token_count, get_user_facing_error_message
from core.anthropic.sse import ANTHROPIC_SSE_RESPONSE_HEADERS
from core.trace import api_messages_request_snapshot, trace_event, traced_async_stream
from providers.base import BaseProvider
from providers.exceptions import InvalidRequestError, ProviderError
@@ -118,7 +119,12 @@ class ClaudeProxyService:
input_tokens = self._token_counter(
routed.request.messages, routed.request.system, routed.request.tools
)
logger.info("Optimization: Handling Anthropic web server tool")
trace_event(
stage="routing",
event="api.optimization.web_server_tool",
source="api",
model=routed.request.model,
)
egress = WebFetchEgressPolicy(
allow_private_network_targets=self._settings.web_fetch_allow_private_networks,
allowed_schemes=self._settings.web_fetch_allowed_scheme_set(),
@@ -134,6 +140,12 @@ class ClaudeProxyService:
optimized = try_optimizations(routed.request, self._settings)
if optimized is not None:
trace_event(
stage="routing",
event="api.optimization.short_circuit",
source="api",
model=routed.request.model,
)
return optimized
logger.debug("No optimization matched, routing to provider")
@@ -143,29 +155,57 @@ class ClaudeProxyService:
thinking_enabled=routed.resolved.thinking_enabled,
)
request_id = f"req_{uuid.uuid4().hex[:12]}"
logger.info(
"API_REQUEST: request_id={} model={} messages={}",
request_id,
routed.request.model,
len(routed.request.messages),
trace_event(
stage="routing",
event="api.route.resolved",
source="api",
provider_id=routed.resolved.provider_id,
provider_model=routed.resolved.provider_model,
provider_model_ref=routed.resolved.provider_model_ref,
gateway_model=routed.request.model,
thinking_enabled=routed.resolved.thinking_enabled,
)
request_id = f"req_{uuid.uuid4().hex[:12]}"
with logger.contextualize(request_id=request_id):
trace_event(
stage="ingress",
event="api.request.received",
source="api",
message_count=len(routed.request.messages),
snapshot=api_messages_request_snapshot(routed.request),
)
if self._settings.log_raw_api_payloads:
logger.debug(
"FULL_PAYLOAD [{}]: {}", request_id, routed.request.model_dump()
)
input_tokens = self._token_counter(
routed.request.messages, routed.request.system, routed.request.tools
routed.request.messages,
routed.request.system,
routed.request.tools,
)
return anthropic_sse_streaming_response(
streamed = traced_async_stream(
provider.stream_response(
routed.request,
input_tokens=input_tokens,
request_id=request_id,
thinking_enabled=routed.resolved.thinking_enabled,
),
stage="egress",
source="api",
complete_event="api.response.stream_completed",
interrupted_event="api.response.stream_interrupted",
chunk_event=None,
extra={
"request_id": request_id,
"provider_id": routed.resolved.provider_id,
"gateway_model": routed.request.model,
},
)
return anthropic_sse_streaming_response(streamed)
except ProviderError:
raise
@@ -188,12 +228,23 @@ class ClaudeProxyService:
tokens = self._token_counter(
routed.request.messages, routed.request.system, routed.request.tools
)
logger.info(
"COUNT_TOKENS: request_id={} model={} messages={} input_tokens={}",
request_id,
routed.request.model,
len(routed.request.messages),
tokens,
trace_event(
stage="routing",
event="api.route.resolved",
source="api",
kind="count_tokens",
provider_id=routed.resolved.provider_id,
provider_model=routed.resolved.provider_model,
provider_model_ref=routed.resolved.provider_model_ref,
gateway_model=routed.request.model,
)
trace_event(
stage="ingress",
event="api.count_tokens.completed",
source="api",
message_count=len(routed.request.messages),
input_tokens=tokens,
snapshot=api_messages_request_snapshot(routed.request),
)
return TokenCountResponse(input_tokens=tokens)
except ProviderError:
-3
View File
@@ -56,8 +56,6 @@ class CLISessionManager:
self._real_to_temp: dict[str, str] = {}
self._lock = asyncio.Lock()
logger.info("CLISessionManager initialized")
async def get_or_create_session(
self, session_id: str | None = None
) -> tuple[CLISession, str, bool]:
@@ -87,7 +85,6 @@ class CLISessionManager:
log_raw_cli_diagnostics=self._log_raw_cli_diagnostics,
)
self._pending_sessions[temp_id] = new_session
logger.info(f"Created new session: {temp_id}")
return new_session, temp_id, True
+18 -2
View File
@@ -9,6 +9,8 @@ from typing import Any
from loguru import logger
from core.trace import trace_event
from .process_registry import register_pid, unregister_pid
# Cap stderr capture so a runaway child cannot exhaust memory; pipe is still drained.
@@ -136,7 +138,6 @@ class CLISession:
"--dangerously-skip-permissions",
"--verbose",
]
logger.info(f"Resuming Claude session {session_id}")
else:
cmd = [
self.claude_bin,
@@ -147,7 +148,6 @@ class CLISession:
"--dangerously-skip-permissions",
"--verbose",
]
logger.info("Starting new Claude session")
if self.allowed_dirs:
for d in self.allowed_dirs:
@@ -157,6 +157,22 @@ class CLISession:
settings_json = json.dumps({"plansDirectory": self.plans_directory})
cmd.extend(["--settings", settings_json])
trace_event(
stage="claude_cli",
event="claude_cli.process.launch",
source="claude_cli",
resume_session_id=(
session_id
if session_id and not session_id.startswith("pending_")
else None
),
fork_session=fork_session,
prompt=prompt,
cwd=self.workspace,
claude_binary=self.claude_bin,
cli_argv=cmd,
)
try:
self.process = await asyncio.create_subprocess_exec(
*cmd,
+33 -2
View File
@@ -9,14 +9,26 @@ included at top level for easy grep/filter.
import json
import logging
import re
import threading
from pathlib import Path
from loguru import logger
_configured = False
# Context keys we promote to top-level JSON for traceability
_CONTEXT_KEYS = ("request_id", "node_id", "chat_id")
# Loguru ``logger.bind()`` key used by structured TRACE payloads; ``core/trace.py``
# uses the identical string constant ``TRACE_PAYLOAD_BINDING``.
_TRACE_PAYLOAD_BINDING = "trace_payload"
# Context keys we promote to top-level JSON for traceability / grep
_CONTEXT_KEYS = (
"request_id",
"node_id",
"chat_id",
"claude_session_id",
"http_method",
"http_path",
)
_TELEGRAM_BOT_RE = re.compile(
r"(https?://api\.telegram\.org/)bot([0-9]+:[A-Za-z0-9_-]+)(/?)",
@@ -48,9 +60,16 @@ def _serialize_with_context(record) -> str:
"function": record["function"],
"line": record["line"],
}
trace_payload = extra.get(_TRACE_PAYLOAD_BINDING)
for key in _CONTEXT_KEYS:
if key in extra and extra[key] is not None:
out[key] = extra[key]
if isinstance(trace_payload, dict):
for tk, tv in trace_payload.items():
if tk in out:
continue
out[tk] = tv
out["trace"] = True
record["_json"] = json.dumps(out, default=str)
return "{_json}\n"
@@ -58,7 +77,16 @@ def _serialize_with_context(record) -> str:
class InterceptHandler(logging.Handler):
"""Redirect stdlib logging to loguru."""
def __init__(self) -> None:
super().__init__()
self._local = threading.local()
def emit(self, record: logging.LogRecord) -> None:
if getattr(self._local, "active", False):
# Avoid deadlock when nested stdlib records fire during a loguru emit.
return
self._local.active = True
try:
try:
level = logger.level(record.levelname).name
except ValueError:
@@ -72,6 +100,8 @@ class InterceptHandler(logging.Handler):
logger.opt(depth=depth, exception=record.exc_info).log(
level, record.getMessage()
)
finally:
self._local.active = False
def configure_logging(
@@ -104,6 +134,7 @@ def configure_logging(
encoding="utf-8",
mode="a",
rotation="50 MB",
enqueue=True,
)
# Intercept stdlib logging: route all root logger output to loguru
-6
View File
@@ -187,12 +187,6 @@ class SSEBuilder:
event_str = format_sse_event(event_type, data)
if self._log_raw_events:
logger.debug("SSE_EVENT: {} - {}", event_type, event_str.strip())
else:
logger.debug(
"SSE_EVENT: event_type={} serialized_bytes={}",
event_type,
len(event_str.encode("utf-8")),
)
return event_str
def message_start(self) -> str:
+214
View File
@@ -0,0 +1,214 @@
"""Structured TRACE events for end-to-end request / CLI / provider logging.
Emitted lines are merged into JSON log rows by ``config.logging_config``.
Conversation and Claude Code prompts are logged verbatim unless values live under
sanitized credential keys (e.g. ``api_key``, ``authorization``).
"""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator, Mapping
from typing import Any
from loguru import logger
TRACE_PAYLOAD_BINDING = "trace_payload"
_SECRET_VALUE_KEYS = frozenset(
k.lower()
for k in (
"authorization",
"x-api-key",
"anthropic-auth-token",
"api_key",
"password",
"secret",
"token",
"bearer_token",
"openapi_token",
"nvidia-api-key",
)
)
def _sanitize_trace_value(obj: Any) -> Any:
"""Recursively copy JSON-like structures redacting credential-shaped keys."""
if isinstance(obj, Mapping):
out: dict[str, Any] = {}
for k, v in obj.items():
if str(k).lower() in _SECRET_VALUE_KEYS:
out[str(k)] = "<redacted>"
else:
out[str(k)] = _sanitize_trace_value(v)
return out
if isinstance(obj, tuple | list):
return [_sanitize_trace_value(x) for x in obj]
return obj
def trace_event(*, stage: str, event: str, source: str, **fields: Any) -> None:
"""Emit one structured TRACE row (merged into JSON by the log sink)."""
payload = _sanitize_trace_value(
{
"stage": stage,
"event": event,
"source": source,
**fields,
},
)
logger.bind(trace_payload=payload).info("TRACE {}", event)
def api_messages_request_snapshot(req: Any) -> dict[str, Any]:
"""Return a sanitized snapshot of an Anthropic ``MessagesRequest``-like body."""
if hasattr(req, "model_dump"):
data = req.model_dump(mode="python")
elif isinstance(req, Mapping):
data = dict(req)
else:
data = {}
snapshot: dict[str, Any] = {}
for key in (
"model",
"messages",
"system",
"tools",
"tool_choice",
"max_tokens",
"thinking",
"temperature",
"top_p",
"top_k",
"stop_sequences",
"metadata",
"stream",
"thinking_enabled",
):
if key in data and data[key] is not None:
snapshot[key] = data[key]
return _sanitize_trace_value(snapshot)
def extract_claude_session_id_from_headers(headers: Mapping[str, str]) -> str | None:
"""Best-effort session id forwarded by Claude Code / SDK via HTTP."""
lowered = {str(k).lower(): v for k, v in headers.items() if isinstance(v, str)}
for key in (
"anthropic-session-id",
"x-anthropic-session-id",
"claude-session-id",
"x-claude-session-id",
):
candidate = lowered.get(key)
if candidate:
return candidate
return None
async def traced_async_stream(
agen: AsyncIterator[str],
*,
stage: str,
source: str,
complete_event: str,
interrupted_event: str,
chunk_event: str | None = None,
chunk_interval: int = 250,
extra: Mapping[str, Any] | None = None,
) -> AsyncIterator[str]:
"""Emit TRACE rows when a text stream completes, fails, cancels, or periodically."""
common = dict(extra or {})
count = 0
nbytes = 0
interrupted = False
try:
async for chunk in agen:
count += 1
nbytes += len(chunk.encode("utf-8", errors="replace"))
if chunk_event and chunk_interval > 0 and count % chunk_interval == 0:
trace_event(
stage=stage,
event=chunk_event,
source=source,
stream_chunks_so_far=count,
stream_bytes_so_far=nbytes,
**common,
)
yield chunk
except asyncio.CancelledError:
interrupted = True
trace_event(
stage=stage,
event=interrupted_event,
source=source,
stream_chunks=count,
stream_bytes=nbytes,
outcome="cancelled",
**common,
)
raise
except BaseExceptionGroup as grp:
interrupted = True
trace_event(
stage=stage,
event=interrupted_event,
source=source,
stream_chunks=count,
stream_bytes=nbytes,
outcome="exception_group",
note=str(grp),
**common,
)
raise
except BaseException as exc:
interrupted = True
trace_event(
stage=stage,
event=interrupted_event,
source=source,
stream_chunks=count,
stream_bytes=nbytes,
outcome="error",
exc_type=type(exc).__name__,
**common,
)
raise
if not interrupted:
trace_event(
stage=stage,
event=complete_event,
source=source,
stream_chunks=count,
stream_bytes=nbytes,
outcome="ok",
**common,
)
def provider_chat_body_snapshot(body: Mapping[str, Any]) -> dict[str, Any]:
"""Sanitized OpenAI-compat chat body subset for traces (conversation text verbatim)."""
keys = ("model", "messages", "tools", "tool_choice", "temperature", "max_tokens")
snap = {k: body[k] for k in keys if k in body and body[k] is not None}
return _sanitize_trace_value(snap)
def provider_native_messages_body_snapshot(body: Mapping[str, Any]) -> dict[str, Any]:
"""Sanitized Anthropic Messages API body subset for traces."""
keys = (
"model",
"messages",
"system",
"tools",
"tool_choice",
"max_tokens",
"thinking",
"metadata",
"temperature",
"top_p",
"top_k",
"stop_sequences",
)
snap = {k: body[k] for k in keys if k in body and body[k] is not None}
return _sanitize_trace_value(snap)
+86 -28
View File
@@ -11,6 +11,7 @@ import asyncio
from loguru import logger
from core.anthropic import format_user_error_preview, get_user_facing_error_message
from core.trace import trace_event
from .cli_event_constants import STATUS_MESSAGE_PREFIXES
from .command_dispatcher import (
@@ -102,25 +103,16 @@ class ClaudeMessageHandler:
Determines if this is a new conversation or reply,
creates/extends the message tree, and queues for processing.
"""
raw = incoming.text or ""
if self._log_raw_messaging_content:
text_preview = raw[:80]
if len(raw) > 80:
text_preview += "..."
logger.info(
"HANDLER_ENTRY: chat_id={} message_id={} reply_to={} text_preview={!r}",
incoming.chat_id,
incoming.message_id,
incoming.reply_to_message_id,
text_preview,
)
else:
logger.info(
"HANDLER_ENTRY: chat_id={} message_id={} reply_to={} text_len={}",
incoming.chat_id,
incoming.message_id,
incoming.reply_to_message_id,
len(raw),
platform_name = getattr(self.platform, "name", "messaging")
trace_event(
stage="ingress",
event="turn.received",
source=platform_name,
chat_id=incoming.chat_id,
platform_message_id=incoming.message_id,
reply_to_message_id=incoming.reply_to_message_id,
thread_id=getattr(incoming, "message_thread_id", None),
message_text=incoming.text or "",
)
with logger.contextualize(
@@ -240,8 +232,16 @@ class ClaudeMessageHandler:
)
if was_queued and status_msg_id:
# Update status to show queue position
queue_size = self.tree_queue.get_queue_size(node_id)
trace_event(
stage="routing",
event="turn.queued",
source=getattr(self.platform, "name", "messaging"),
chat_id=incoming.chat_id,
platform_message_id=node_id,
status_message_id=status_msg_id,
queue_size=queue_size,
)
await self.platform.queue_edit_message(
incoming.chat_id,
status_msg_id,
@@ -343,10 +343,18 @@ class ClaudeMessageHandler:
last_status: str | None = None
parent_session_id = None
platform_nm = getattr(self.platform, "name", "messaging")
if tree and node.parent_id:
parent_session_id = tree.get_parent_session_id(node_id)
if parent_session_id:
logger.info(f"Will fork from parent session: {parent_session_id}")
trace_event(
stage="claude_cli",
event="claude_cli.fork.from_parent_session",
source=platform_nm,
chat_id=chat_id,
node_id=node_id,
parent_session_id=parent_session_id,
)
editor = ThrottledTranscriptEditor(
platform=self.platform,
@@ -377,6 +385,33 @@ class ClaudeMessageHandler:
temp_session_id = session_or_temp_id
else:
captured_session_id = session_or_temp_id
sess_evt = (
"claude_cli.session.pending_created"
if is_new
else "claude_cli.session.reused"
)
trace_event(
stage="claude_cli",
event=sess_evt,
source=platform_nm,
chat_id=chat_id,
node_id=node_id,
status_message_id=status_msg_id,
session_handle=str(session_or_temp_id),
parent_resume_session_id=parent_session_id,
fork_requested=bool(parent_session_id),
)
trace_event(
stage="claude_cli",
event="claude_cli.request.sent",
source=platform_nm,
chat_id=chat_id,
node_id=node_id,
prompt=incoming.text,
fork_session_arg=bool(parent_session_id),
resume_session_arg=parent_session_id,
)
except RuntimeError as e:
error_message = get_user_facing_error_message(e)
transcript.apply({"type": "error", "message": error_message})
@@ -390,10 +425,15 @@ class ClaudeMessageHandler:
MessageState.ERROR,
error_message=error_message,
)
trace_event(
stage="claude_cli",
event="claude_cli.session.limit_reached",
source=platform_nm,
chat_id=chat_id,
node_id=node_id,
)
return
logger.info(f"HANDLER: Starting CLI task processing for node {node_id}")
event_count = 0
async for event_data in cli_session.start_task(
incoming.text,
session_id=parent_session_id,
@@ -404,9 +444,6 @@ class ClaudeMessageHandler:
f"HANDLER: Non-dict event received: {type(event_data)}"
)
continue
event_count += 1
if event_count % 10 == 0:
logger.debug(f"HANDLER: Processed {event_count} events so far")
(
captured_session_id,
@@ -426,7 +463,6 @@ class ClaudeMessageHandler:
parsed_list = parse_cli_event(
event_data, log_raw_cli=self._log_raw_cli_diagnostics
)
logger.debug(f"HANDLER: Parsed {len(parsed_list)} events from CLI")
for parsed in parsed_list:
(
@@ -448,6 +484,13 @@ class ClaudeMessageHandler:
)
except asyncio.CancelledError:
trace_event(
stage="claude_cli",
event="turn.processor.cancelled",
source=platform_nm,
chat_id=chat_id,
node_id=node_id,
)
logger.warning(f"HANDLER: Task cancelled for node {node_id}")
cancel_reason = None
if isinstance(node.context, dict):
@@ -466,6 +509,14 @@ class ClaudeMessageHandler:
node_id, MessageState.ERROR, error_message="Cancelled by user"
)
except Exception as e:
trace_event(
stage="claude_cli",
event="turn.processor.exception",
source=platform_nm,
chat_id=chat_id,
node_id=node_id,
exc_type=type(e).__name__,
)
logger.error(
"HANDLER: Task failed with exception: {}",
format_exception_for_log(
@@ -480,7 +531,14 @@ class ClaudeMessageHandler:
node_id, error_msg, "Parent task failed"
)
finally:
logger.info(f"HANDLER: _process_node completed for node {node_id}")
trace_event(
stage="routing",
event="turn.processor.finished",
source=platform_nm,
chat_id=chat_id,
node_id=node_id,
claude_session_id=captured_session_id or temp_session_id,
)
# Free the session-manager slot. Session IDs are persisted in the tree and
# can be resumed later by ID; we don't need to keep a CLISession instance
# around after this node completes.
+27 -3
View File
@@ -7,6 +7,8 @@ from typing import Any
from loguru import logger
from core.trace import trace_event
from .cli_event_constants import TRANSCRIPT_EVENT_TYPES, get_status_for_event
from .platforms.base import SessionManagerInterface
from .safe_diagnostics import text_len_hint
@@ -34,6 +36,15 @@ async def handle_session_info_event(
return captured_session_id, temp_session_id
await cli_manager.register_real_session_id(temp_session_id, real_session_id)
trace_event(
stage="claude_cli",
event="claude_cli.session.registered",
source="claude_cli",
node_id=node_id,
temp_session_id=temp_session_id,
real_session_id=real_session_id,
tree_root_id=tree.root_id if tree else None,
)
if tree and real_session_id:
await tree.update_state(
node_id,
@@ -76,7 +87,13 @@ async def process_parsed_cli_event(
elif ptype == "complete":
if not had_transcript_events:
transcript.apply({"type": "text_chunk", "text": "Done."})
logger.info("HANDLER: Task complete, updating UI")
trace_event(
stage="claude_cli",
event="turn.completed",
source="cli_event",
node_id=node_id,
claude_session_id=captured_session_id,
)
await update_ui(format_status("", "Complete"), force=True)
if tree and captured_session_id:
await tree.update_state(
@@ -87,15 +104,22 @@ async def process_parsed_cli_event(
session_store.save_tree(tree.root_id, tree.to_dict())
elif ptype == "error":
error_msg = parsed.get("message", "Unknown error")
em = error_msg if isinstance(error_msg, str) else str(error_msg)
trace_event(
stage="claude_cli",
event="turn.failed",
source="cli_event",
node_id=node_id,
claude_session_id=captured_session_id,
cli_error_message=em,
)
if log_messaging_error_details:
logger.error("HANDLER: Error event received: {}", error_msg)
else:
em = error_msg if isinstance(error_msg, str) else str(error_msg)
logger.error(
"HANDLER: Error event received: message_chars={}",
text_len_hint(em),
)
logger.info("HANDLER: Updating UI with error status")
await update_ui(format_status("", "Error"), force=True)
if tree:
await propagate_error_to_children(node_id, error_msg, "Parent task failed")
+37 -13
View File
@@ -21,6 +21,7 @@ from core.anthropic.native_sse_block_policy import (
NativeSseBlockPolicyState,
transform_native_sse_block_event,
)
from core.trace import provider_native_messages_body_snapshot, trace_event
from providers.base import BaseProvider, ProviderConfig
from providers.error_mapping import (
map_error,
@@ -338,13 +339,16 @@ class AnthropicMessagesTransport(BaseProvider):
body = self._build_request_body(request, thinking_enabled=thinking_enabled)
thinking_enabled = self._is_thinking_enabled(request, thinking_enabled)
logger.info(
"{}_STREAM:{} natively passing Anthropic request model={} msgs={} tools={}",
tag,
req_tag,
body.get("model"),
len(body.get("messages", [])),
len(body.get("tools", [])),
trace_event(
stage="provider",
event="provider.request.sent",
source="provider",
provider=self._provider_name,
gateway_model=request.model,
downstream_model=body.get("model"),
message_count=len(body.get("messages", [])),
tool_count=len(body.get("tools", [])),
body=provider_native_messages_body_snapshot(body),
)
response: httpx.Response | None = None
@@ -373,28 +377,48 @@ class AnthropicMessagesTransport(BaseProvider):
_validated_stream_send
)
chunk_count = 0
chunk_bytes = 0
async for chunk in self._iter_stream_chunks(
response,
state=state,
thinking_enabled=thinking_enabled,
):
chunk_count += 1
chunk_bytes += len(chunk.encode("utf-8", errors="replace"))
sent_any_event = True
emitted_tracker.feed(chunk)
yield chunk
trace_event(
stage="provider",
event="provider.response.completed",
source="provider",
provider=self._provider_name,
gateway_model=request.model,
sse_chunks_out=chunk_count,
sse_bytes_out=chunk_bytes,
)
except Exception as error:
if not isinstance(error, httpx.HTTPStatusError):
self._log_stream_transport_error(tag, req_tag, error)
self._log_stream_transport_error(
tag, req_tag, error, request_id=request_id
)
error_message = self._get_error_message(error, request_id)
if response is not None and not response.is_closed:
await response.aclose()
logger.info(
"{}_STREAM: Emitting native SSE error event for {}{}",
tag,
type(error).__name__,
req_tag,
trace_event(
stage="provider",
event="provider.response.error",
source="provider",
provider=self._provider_name,
error_message=error_message,
exc_type=type(error).__name__,
mid_stream=sent_any_event,
)
if sent_any_event:
for event in emitted_tracker.iter_close_unclosed_blocks():
+23 -6
View File
@@ -80,26 +80,43 @@ class BaseProvider(ABC):
build(request, thinking_enabled=thinking_enabled)
def _log_stream_transport_error(
self, tag: str, req_tag: str, error: Exception
self,
tag: str,
req_tag: str,
error: Exception,
*,
request_id: str | None = None,
) -> None:
"""Log streaming transport failures (metadata-only unless verbose is enabled)."""
from loguru import logger
from core.trace import trace_event
response = getattr(error, "response", None)
http_status = (
getattr(response, "status_code", None) if response is not None else None
)
trace_event(
stage="provider",
event="provider.response.transport_error",
source="provider",
provider=tag,
request_id=request_id,
exc_type=type(error).__name__,
http_status=http_status,
)
if self._config.log_api_error_tracebacks:
logger.error(
"{}_ERROR:{} {}: {}", tag, req_tag, type(error).__name__, error
)
return
response = getattr(error, "response", None)
status_code = (
getattr(response, "status_code", None) if response is not None else None
)
logger.error(
"{}_ERROR:{} exc_type={} http_status={}",
tag,
req_tag,
type(error).__name__,
status_code,
http_status,
)
@abstractmethod
+28 -13
View File
@@ -23,6 +23,7 @@ from core.anthropic import (
append_request_id,
map_stop_reason,
)
from core.trace import provider_chat_body_snapshot, trace_event
from providers.base import BaseProvider, ProviderConfig
from providers.error_mapping import (
map_error,
@@ -353,13 +354,16 @@ class OpenAIChatTransport(BaseProvider):
body = self._build_request_body(request, thinking_enabled=thinking_enabled)
thinking_enabled = self._is_thinking_enabled(request, thinking_enabled)
req_tag = f" request_id={request_id}" if request_id else ""
logger.info(
"{}_STREAM:{} model={} msgs={} tools={}",
tag,
req_tag,
body.get("model"),
len(body.get("messages", [])),
len(body.get("tools", [])),
trace_event(
stage="provider",
event="provider.request.sent",
source="provider",
provider=self._provider_name,
gateway_model=request.model,
downstream_model=body.get("model"),
message_count=len(body.get("messages", [])),
tool_count=len(body.get("tools", [])),
body=provider_chat_body_snapshot(body),
)
yield sse.message_start()
@@ -455,7 +459,7 @@ class OpenAIChatTransport(BaseProvider):
except asyncio.CancelledError, GeneratorExit:
raise
except Exception as e:
self._log_stream_transport_error(tag, req_tag, e)
self._log_stream_transport_error(tag, req_tag, e, request_id=request_id)
mapped_e = map_error(e, rate_limiter=self._global_rate_limiter)
base_message = user_visible_message_for_mapped_provider_error(
mapped_e,
@@ -463,11 +467,13 @@ class OpenAIChatTransport(BaseProvider):
read_timeout_s=self._config.http_read_timeout,
)
error_message = append_request_id(base_message, request_id)
logger.info(
"{}_STREAM: Emitting SSE error event for {}{}",
tag,
type(e).__name__,
req_tag,
trace_event(
stage="provider",
event="provider.response.error",
source="provider",
provider=tag,
error_message=error_message,
mapped_error_type=type(mapped_e).__name__,
)
for event in sse.close_all_blocks():
yield event
@@ -552,5 +558,14 @@ class OpenAIChatTransport(BaseProvider):
provider_input,
provider_input - input_tokens,
)
trace_event(
stage="provider",
event="provider.response.completed",
source="provider",
provider=self._provider_name,
finish_reason=(None if finish_reason is None else str(finish_reason)),
output_tokens=output_tokens,
prompt_tokens_estimate=input_tokens,
)
yield sse.message_delta(map_stop_reason(finish_reason), output_tokens)
yield sse.message_stop()
+1 -5
View File
@@ -70,11 +70,7 @@ def test_sse_builder_default_debug_has_no_serialized_json_content():
sse = SSEBuilder("msg_x", "m", 1, log_raw_events=False)
sse.message_start()
assert mock_debug.call_count == 1
message = str(mock_debug.call_args)
assert "serialized_bytes=" in message
assert "role" not in message
assert "assistant" not in message
assert mock_debug.call_count == 0
def test_sse_builder_raw_logging_includes_event_body_when_enabled():
+38
View File
@@ -0,0 +1,38 @@
"""Structured TRACE logging assertions."""
from __future__ import annotations
import json
from pathlib import Path
from loguru import logger
from config.logging_config import configure_logging
from core.trace import TRACE_PAYLOAD_BINDING, trace_event
def test_trace_payload_merged_into_json_line(tmp_path) -> None:
log_file = str(tmp_path / "t.log")
configure_logging(log_file, force=True)
trace_event(stage="s", event="e.v1", source="unit", hello="world", n=42)
logger.complete()
text = Path(log_file).read_text(encoding="utf-8").strip().split("\n")[-1]
row = json.loads(text)
assert row["trace"] is True
assert row["stage"] == "s"
assert row["event"] == "e.v1"
assert row["source"] == "unit"
assert row["hello"] == "world"
assert row["n"] == 42
assert TRACE_PAYLOAD_BINDING == "trace_payload"
def test_sanitize_masks_nested_api_key_strings() -> None:
"""Credential-shaped keys redact without touching normal message text."""
from core.trace import _sanitize_trace_value
out = _sanitize_trace_value(
{"outer": {"api_key": "secret", "text": "visible"}},
)
assert out["outer"]["api_key"] == "<redacted>"
assert out["outer"]["text"] == "visible"
+13 -12
View File
@@ -14,10 +14,11 @@ def handler(mock_platform, mock_cli_manager, mock_session_store):
@pytest.mark.asyncio
async def test_handle_message_default_logs_text_len_not_content(
async def test_handle_message_turn_trace_includes_full_message_text(
mock_platform, mock_cli_manager, mock_session_store, incoming_message_factory
):
secret = "user-secret-content-never-log-default"
"""turn.received always records the verbatim user message (local debugging)."""
secret = "user-message-content-visible-in-trace"
handler = ClaudeMessageHandler(
mock_platform,
mock_cli_manager,
@@ -27,33 +28,33 @@ async def test_handle_message_default_logs_text_len_not_content(
incoming = incoming_message_factory(text=secret)
with (
patch.object(handler, "_handle_message_impl", new_callable=AsyncMock),
patch("messaging.handler.logger.info") as log_info,
patch("messaging.handler.trace_event") as trace_mock,
):
await handler.handle_message(incoming)
blob = " ".join(str(c) for c in log_info.call_args_list)
assert secret not in blob
assert "text_len=" in blob
kwargs = trace_mock.call_args.kwargs
assert kwargs["event"] == "turn.received"
assert kwargs["message_text"] == secret
@pytest.mark.asyncio
async def test_handle_message_raw_content_logging_includes_preview(
async def test_handle_message_log_raw_messaging_does_not_change_turn_received_shape(
mock_platform, mock_cli_manager, mock_session_store, incoming_message_factory
):
secret = "visible-preview-xyz"
"""LOG_RAW_MESSAGING_CONTENT is adapter-only; ingress TRACE always includes text."""
text = "visible-either-way"
handler = ClaudeMessageHandler(
mock_platform,
mock_cli_manager,
mock_session_store,
log_raw_messaging_content=True,
)
incoming = incoming_message_factory(text=secret)
incoming = incoming_message_factory(text=text)
with (
patch.object(handler, "_handle_message_impl", new_callable=AsyncMock),
patch("messaging.handler.logger.info") as log_info,
patch("messaging.handler.trace_event") as trace_mock,
):
await handler.handle_message(incoming)
blob = " ".join(str(c) for c in log_info.call_args_list)
assert secret in blob
assert trace_mock.call_args.kwargs["message_text"] == text
def test_get_initial_status_new_conversation(handler):