diff --git a/.env.example b/.env.example index 10b98f5..7f71072 100644 --- a/.env.example +++ b/.env.example @@ -136,6 +136,11 @@ WEB_FETCH_ALLOWED_SCHEMES=http,https WEB_FETCH_ALLOW_PRIVATE_NETWORKS=false +# Structured TRACE logs: lines with `"trace": true` merge ingress/routing/cli/provider/egress +# stages. Conversation text is logged in those payloads (verbatim). Values under keys named +# like ``api_key`` / ``authorization`` are redacted. Raw transport payloads still require +# the LOG_RAW_* toggles below. +# # Verbose diagnostics (avoid logging raw prompts / SSE bodies in production) DEBUG_PLATFORM_EDITS=false DEBUG_SUBAGENT_STACK=false @@ -144,7 +149,7 @@ LOG_RAW_API_PAYLOADS=false LOG_RAW_SSE_EVENTS=false # When true, log full exception text and tracebacks for unhandled errors (may leak request-derived data). LOG_API_ERROR_TRACEBACKS=false -# When true, log message/transcription text previews in messaging adapters (may leak user content). +# When true, log message/transcription text previews in messaging adapters only (handler ingress always TRACEs verbatim text separately). LOG_RAW_MESSAGING_CONTENT=false # When true, log full Claude CLI stderr, non-JSON stdout lines, and parser error text. LOG_RAW_CLI_DIAGNOSTICS=false diff --git a/README.md b/README.md index fa54046..7586e90 100644 --- a/README.md +++ b/README.md @@ -406,6 +406,8 @@ LOG_MESSAGING_ERROR_DETAILS=false Raw logging flags can expose prompts, tool arguments, paths, and model output. Keep them off unless you are debugging locally. +Structured TRACE rows append fields such as `"trace": true`, `stage`, `event`, and `source` and include conversation context needed to follow Claude Code flows end-to-end. Dictionary keys resembling credentials (for example `api_key` / `authorization` values nested in structured payloads) are redacted; arbitrary prose you type into prompts may still appear verbatim. + ### 6. Local Web Tools ```dotenv diff --git a/api/app.py b/api/app.py index d0aeda2..51f0406 100644 --- a/api/app.py +++ b/api/app.py @@ -13,6 +13,7 @@ from starlette.types import Receive, Scope, Send from config.logging_config import configure_logging from config.settings import get_settings +from core.trace import extract_claude_session_id_from_headers, trace_event from providers.exceptions import ProviderError from .admin_routes import router as admin_router @@ -95,6 +96,18 @@ def create_app(*, lifespan_enabled: bool = True) -> FastAPI: app_kwargs["lifespan"] = lifespan app = FastAPI(**app_kwargs) + @app.middleware("http") + async def trace_http_correlation(request: Request, call_next): + """Attach HTTP identifiers and optional Claude session id to logs.""" + claude_sid = extract_claude_session_id_from_headers(request.headers) + with logger.contextualize( + http_method=request.method, + http_path=request.url.path, + claude_session_id=claude_sid, + ): + response = await call_next(request) + return response + # Register routes app.include_router(admin_router) app.include_router(router) @@ -111,14 +124,16 @@ def create_app(*, lifespan_enabled: bool = True) -> FastAPI: message_summary, tool_names = summarize_request_validation_body(body) - logger.debug( - "Request validation failed: path={} query={} error_locs={} error_types={} message_summary={} tool_names={}", - request.url.path, - str(request.url.query), - [list(error.get("loc", ())) for error in exc.errors()], - [str(error.get("type", "")) for error in exc.errors()], - message_summary, - tool_names, + trace_event( + stage="ingress", + event="server.request.validation_failed", + source="api", + path=request.url.path, + query=dict(request.query_params), + error_locs=[list(error.get("loc", ())) for error in exc.errors()], + error_types=[str(error.get("type", "")) for error in exc.errors()], + message_summary=message_summary, + tool_names=tool_names, ) return await request_validation_exception_handler(request, exc) diff --git a/api/routes.py b/api/routes.py index 63455de..6049494 100644 --- a/api/routes.py +++ b/api/routes.py @@ -5,6 +5,7 @@ from loguru import logger from config.settings import Settings from core.anthropic import get_token_count +from core.trace import trace_event from providers.registry import ProviderRegistry from . import dependencies @@ -231,6 +232,7 @@ async def list_models( _auth=Depends(require_api_key), ): """List the model ids this proxy advertises to Claude-compatible clients.""" + trace_event(stage="ingress", event="api.models.list", source="api") registry = getattr(request.app.state, "provider_registry", None) provider_registry = registry if isinstance(registry, ProviderRegistry) else None return _build_models_list_response(settings, provider_registry) @@ -250,5 +252,11 @@ async def stop_cli(request: Request, _auth=Depends(require_api_key)): raise HTTPException(status_code=503, detail="Messaging system not initialized") count = await handler.stop_all_tasks() + trace_event( + stage="ingress", + event="api.cli.stop_via_handler", + source="api", + cancelled_nodes=count, + ) logger.info("STOP_CLI: source=handler cancelled_count={}", count) return {"status": "stopped", "cancelled_count": count} diff --git a/api/services.py b/api/services.py index b122627..9611e51 100644 --- a/api/services.py +++ b/api/services.py @@ -14,6 +14,7 @@ from loguru import logger from config.settings import Settings from core.anthropic import get_token_count, get_user_facing_error_message from core.anthropic.sse import ANTHROPIC_SSE_RESPONSE_HEADERS +from core.trace import api_messages_request_snapshot, trace_event, traced_async_stream from providers.base import BaseProvider from providers.exceptions import InvalidRequestError, ProviderError @@ -118,7 +119,12 @@ class ClaudeProxyService: input_tokens = self._token_counter( routed.request.messages, routed.request.system, routed.request.tools ) - logger.info("Optimization: Handling Anthropic web server tool") + trace_event( + stage="routing", + event="api.optimization.web_server_tool", + source="api", + model=routed.request.model, + ) egress = WebFetchEgressPolicy( allow_private_network_targets=self._settings.web_fetch_allow_private_networks, allowed_schemes=self._settings.web_fetch_allowed_scheme_set(), @@ -134,6 +140,12 @@ class ClaudeProxyService: optimized = try_optimizations(routed.request, self._settings) if optimized is not None: + trace_event( + stage="routing", + event="api.optimization.short_circuit", + source="api", + model=routed.request.model, + ) return optimized logger.debug("No optimization matched, routing to provider") @@ -143,29 +155,57 @@ class ClaudeProxyService: thinking_enabled=routed.resolved.thinking_enabled, ) - request_id = f"req_{uuid.uuid4().hex[:12]}" - logger.info( - "API_REQUEST: request_id={} model={} messages={}", - request_id, - routed.request.model, - len(routed.request.messages), + trace_event( + stage="routing", + event="api.route.resolved", + source="api", + provider_id=routed.resolved.provider_id, + provider_model=routed.resolved.provider_model, + provider_model_ref=routed.resolved.provider_model_ref, + gateway_model=routed.request.model, + thinking_enabled=routed.resolved.thinking_enabled, ) - if self._settings.log_raw_api_payloads: - logger.debug( - "FULL_PAYLOAD [{}]: {}", request_id, routed.request.model_dump() + + request_id = f"req_{uuid.uuid4().hex[:12]}" + with logger.contextualize(request_id=request_id): + trace_event( + stage="ingress", + event="api.request.received", + source="api", + message_count=len(routed.request.messages), + snapshot=api_messages_request_snapshot(routed.request), ) - input_tokens = self._token_counter( - routed.request.messages, routed.request.system, routed.request.tools - ) - return anthropic_sse_streaming_response( - provider.stream_response( - routed.request, - input_tokens=input_tokens, - request_id=request_id, - thinking_enabled=routed.resolved.thinking_enabled, - ), - ) + if self._settings.log_raw_api_payloads: + logger.debug( + "FULL_PAYLOAD [{}]: {}", request_id, routed.request.model_dump() + ) + + input_tokens = self._token_counter( + routed.request.messages, + routed.request.system, + routed.request.tools, + ) + + streamed = traced_async_stream( + provider.stream_response( + routed.request, + input_tokens=input_tokens, + request_id=request_id, + thinking_enabled=routed.resolved.thinking_enabled, + ), + stage="egress", + source="api", + complete_event="api.response.stream_completed", + interrupted_event="api.response.stream_interrupted", + chunk_event=None, + extra={ + "request_id": request_id, + "provider_id": routed.resolved.provider_id, + "gateway_model": routed.request.model, + }, + ) + return anthropic_sse_streaming_response(streamed) except ProviderError: raise @@ -188,12 +228,23 @@ class ClaudeProxyService: tokens = self._token_counter( routed.request.messages, routed.request.system, routed.request.tools ) - logger.info( - "COUNT_TOKENS: request_id={} model={} messages={} input_tokens={}", - request_id, - routed.request.model, - len(routed.request.messages), - tokens, + trace_event( + stage="routing", + event="api.route.resolved", + source="api", + kind="count_tokens", + provider_id=routed.resolved.provider_id, + provider_model=routed.resolved.provider_model, + provider_model_ref=routed.resolved.provider_model_ref, + gateway_model=routed.request.model, + ) + trace_event( + stage="ingress", + event="api.count_tokens.completed", + source="api", + message_count=len(routed.request.messages), + input_tokens=tokens, + snapshot=api_messages_request_snapshot(routed.request), ) return TokenCountResponse(input_tokens=tokens) except ProviderError: diff --git a/cli/manager.py b/cli/manager.py index cb27419..1c08817 100644 --- a/cli/manager.py +++ b/cli/manager.py @@ -56,8 +56,6 @@ class CLISessionManager: self._real_to_temp: dict[str, str] = {} self._lock = asyncio.Lock() - logger.info("CLISessionManager initialized") - async def get_or_create_session( self, session_id: str | None = None ) -> tuple[CLISession, str, bool]: @@ -87,7 +85,6 @@ class CLISessionManager: log_raw_cli_diagnostics=self._log_raw_cli_diagnostics, ) self._pending_sessions[temp_id] = new_session - logger.info(f"Created new session: {temp_id}") return new_session, temp_id, True diff --git a/cli/session.py b/cli/session.py index d007975..bba070d 100644 --- a/cli/session.py +++ b/cli/session.py @@ -9,6 +9,8 @@ from typing import Any from loguru import logger +from core.trace import trace_event + from .process_registry import register_pid, unregister_pid # Cap stderr capture so a runaway child cannot exhaust memory; pipe is still drained. @@ -136,7 +138,6 @@ class CLISession: "--dangerously-skip-permissions", "--verbose", ] - logger.info(f"Resuming Claude session {session_id}") else: cmd = [ self.claude_bin, @@ -147,7 +148,6 @@ class CLISession: "--dangerously-skip-permissions", "--verbose", ] - logger.info("Starting new Claude session") if self.allowed_dirs: for d in self.allowed_dirs: @@ -157,6 +157,22 @@ class CLISession: settings_json = json.dumps({"plansDirectory": self.plans_directory}) cmd.extend(["--settings", settings_json]) + trace_event( + stage="claude_cli", + event="claude_cli.process.launch", + source="claude_cli", + resume_session_id=( + session_id + if session_id and not session_id.startswith("pending_") + else None + ), + fork_session=fork_session, + prompt=prompt, + cwd=self.workspace, + claude_binary=self.claude_bin, + cli_argv=cmd, + ) + try: self.process = await asyncio.create_subprocess_exec( *cmd, diff --git a/config/logging_config.py b/config/logging_config.py index 7c5d8fd..560b198 100644 --- a/config/logging_config.py +++ b/config/logging_config.py @@ -9,14 +9,26 @@ included at top level for easy grep/filter. import json import logging import re +import threading from pathlib import Path from loguru import logger _configured = False -# Context keys we promote to top-level JSON for traceability -_CONTEXT_KEYS = ("request_id", "node_id", "chat_id") +# Loguru ``logger.bind()`` key used by structured TRACE payloads; ``core/trace.py`` +# uses the identical string constant ``TRACE_PAYLOAD_BINDING``. +_TRACE_PAYLOAD_BINDING = "trace_payload" + +# Context keys we promote to top-level JSON for traceability / grep +_CONTEXT_KEYS = ( + "request_id", + "node_id", + "chat_id", + "claude_session_id", + "http_method", + "http_path", +) _TELEGRAM_BOT_RE = re.compile( r"(https?://api\.telegram\.org/)bot([0-9]+:[A-Za-z0-9_-]+)(/?)", @@ -48,9 +60,16 @@ def _serialize_with_context(record) -> str: "function": record["function"], "line": record["line"], } + trace_payload = extra.get(_TRACE_PAYLOAD_BINDING) for key in _CONTEXT_KEYS: if key in extra and extra[key] is not None: out[key] = extra[key] + if isinstance(trace_payload, dict): + for tk, tv in trace_payload.items(): + if tk in out: + continue + out[tk] = tv + out["trace"] = True record["_json"] = json.dumps(out, default=str) return "{_json}\n" @@ -58,20 +77,31 @@ def _serialize_with_context(record) -> str: class InterceptHandler(logging.Handler): """Redirect stdlib logging to loguru.""" + def __init__(self) -> None: + super().__init__() + self._local = threading.local() + def emit(self, record: logging.LogRecord) -> None: + if getattr(self._local, "active", False): + # Avoid deadlock when nested stdlib records fire during a loguru emit. + return + self._local.active = True try: - level = logger.level(record.levelname).name - except ValueError: - level = record.levelno + try: + level = logger.level(record.levelname).name + except ValueError: + level = record.levelno - frame, depth = logging.currentframe(), 2 - while frame is not None and frame.f_code.co_filename == logging.__file__: - frame = frame.f_back - depth += 1 + frame, depth = logging.currentframe(), 2 + while frame is not None and frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 - logger.opt(depth=depth, exception=record.exc_info).log( - level, record.getMessage() - ) + logger.opt(depth=depth, exception=record.exc_info).log( + level, record.getMessage() + ) + finally: + self._local.active = False def configure_logging( @@ -104,6 +134,7 @@ def configure_logging( encoding="utf-8", mode="a", rotation="50 MB", + enqueue=True, ) # Intercept stdlib logging: route all root logger output to loguru diff --git a/core/anthropic/sse.py b/core/anthropic/sse.py index 1a99f6c..00d7bbf 100644 --- a/core/anthropic/sse.py +++ b/core/anthropic/sse.py @@ -187,12 +187,6 @@ class SSEBuilder: event_str = format_sse_event(event_type, data) if self._log_raw_events: logger.debug("SSE_EVENT: {} - {}", event_type, event_str.strip()) - else: - logger.debug( - "SSE_EVENT: event_type={} serialized_bytes={}", - event_type, - len(event_str.encode("utf-8")), - ) return event_str def message_start(self) -> str: diff --git a/core/trace.py b/core/trace.py new file mode 100644 index 0000000..a077d88 --- /dev/null +++ b/core/trace.py @@ -0,0 +1,214 @@ +"""Structured TRACE events for end-to-end request / CLI / provider logging. + +Emitted lines are merged into JSON log rows by ``config.logging_config``. +Conversation and Claude Code prompts are logged verbatim unless values live under +sanitized credential keys (e.g. ``api_key``, ``authorization``). +""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator, Mapping +from typing import Any + +from loguru import logger + +TRACE_PAYLOAD_BINDING = "trace_payload" + +_SECRET_VALUE_KEYS = frozenset( + k.lower() + for k in ( + "authorization", + "x-api-key", + "anthropic-auth-token", + "api_key", + "password", + "secret", + "token", + "bearer_token", + "openapi_token", + "nvidia-api-key", + ) +) + + +def _sanitize_trace_value(obj: Any) -> Any: + """Recursively copy JSON-like structures redacting credential-shaped keys.""" + if isinstance(obj, Mapping): + out: dict[str, Any] = {} + for k, v in obj.items(): + if str(k).lower() in _SECRET_VALUE_KEYS: + out[str(k)] = "" + else: + out[str(k)] = _sanitize_trace_value(v) + return out + if isinstance(obj, tuple | list): + return [_sanitize_trace_value(x) for x in obj] + return obj + + +def trace_event(*, stage: str, event: str, source: str, **fields: Any) -> None: + """Emit one structured TRACE row (merged into JSON by the log sink).""" + payload = _sanitize_trace_value( + { + "stage": stage, + "event": event, + "source": source, + **fields, + }, + ) + logger.bind(trace_payload=payload).info("TRACE {}", event) + + +def api_messages_request_snapshot(req: Any) -> dict[str, Any]: + """Return a sanitized snapshot of an Anthropic ``MessagesRequest``-like body.""" + if hasattr(req, "model_dump"): + data = req.model_dump(mode="python") + elif isinstance(req, Mapping): + data = dict(req) + else: + data = {} + + snapshot: dict[str, Any] = {} + for key in ( + "model", + "messages", + "system", + "tools", + "tool_choice", + "max_tokens", + "thinking", + "temperature", + "top_p", + "top_k", + "stop_sequences", + "metadata", + "stream", + "thinking_enabled", + ): + if key in data and data[key] is not None: + snapshot[key] = data[key] + return _sanitize_trace_value(snapshot) + + +def extract_claude_session_id_from_headers(headers: Mapping[str, str]) -> str | None: + """Best-effort session id forwarded by Claude Code / SDK via HTTP.""" + lowered = {str(k).lower(): v for k, v in headers.items() if isinstance(v, str)} + for key in ( + "anthropic-session-id", + "x-anthropic-session-id", + "claude-session-id", + "x-claude-session-id", + ): + candidate = lowered.get(key) + if candidate: + return candidate + return None + + +async def traced_async_stream( + agen: AsyncIterator[str], + *, + stage: str, + source: str, + complete_event: str, + interrupted_event: str, + chunk_event: str | None = None, + chunk_interval: int = 250, + extra: Mapping[str, Any] | None = None, +) -> AsyncIterator[str]: + """Emit TRACE rows when a text stream completes, fails, cancels, or periodically.""" + common = dict(extra or {}) + count = 0 + nbytes = 0 + interrupted = False + try: + async for chunk in agen: + count += 1 + nbytes += len(chunk.encode("utf-8", errors="replace")) + if chunk_event and chunk_interval > 0 and count % chunk_interval == 0: + trace_event( + stage=stage, + event=chunk_event, + source=source, + stream_chunks_so_far=count, + stream_bytes_so_far=nbytes, + **common, + ) + yield chunk + except asyncio.CancelledError: + interrupted = True + trace_event( + stage=stage, + event=interrupted_event, + source=source, + stream_chunks=count, + stream_bytes=nbytes, + outcome="cancelled", + **common, + ) + raise + except BaseExceptionGroup as grp: + interrupted = True + trace_event( + stage=stage, + event=interrupted_event, + source=source, + stream_chunks=count, + stream_bytes=nbytes, + outcome="exception_group", + note=str(grp), + **common, + ) + raise + except BaseException as exc: + interrupted = True + trace_event( + stage=stage, + event=interrupted_event, + source=source, + stream_chunks=count, + stream_bytes=nbytes, + outcome="error", + exc_type=type(exc).__name__, + **common, + ) + raise + + if not interrupted: + trace_event( + stage=stage, + event=complete_event, + source=source, + stream_chunks=count, + stream_bytes=nbytes, + outcome="ok", + **common, + ) + + +def provider_chat_body_snapshot(body: Mapping[str, Any]) -> dict[str, Any]: + """Sanitized OpenAI-compat chat body subset for traces (conversation text verbatim).""" + keys = ("model", "messages", "tools", "tool_choice", "temperature", "max_tokens") + snap = {k: body[k] for k in keys if k in body and body[k] is not None} + return _sanitize_trace_value(snap) + + +def provider_native_messages_body_snapshot(body: Mapping[str, Any]) -> dict[str, Any]: + """Sanitized Anthropic Messages API body subset for traces.""" + keys = ( + "model", + "messages", + "system", + "tools", + "tool_choice", + "max_tokens", + "thinking", + "metadata", + "temperature", + "top_p", + "top_k", + "stop_sequences", + ) + snap = {k: body[k] for k in keys if k in body and body[k] is not None} + return _sanitize_trace_value(snap) diff --git a/messaging/handler.py b/messaging/handler.py index 0a5e4e0..21e8997 100644 --- a/messaging/handler.py +++ b/messaging/handler.py @@ -11,6 +11,7 @@ import asyncio from loguru import logger from core.anthropic import format_user_error_preview, get_user_facing_error_message +from core.trace import trace_event from .cli_event_constants import STATUS_MESSAGE_PREFIXES from .command_dispatcher import ( @@ -102,26 +103,17 @@ class ClaudeMessageHandler: Determines if this is a new conversation or reply, creates/extends the message tree, and queues for processing. """ - raw = incoming.text or "" - if self._log_raw_messaging_content: - text_preview = raw[:80] - if len(raw) > 80: - text_preview += "..." - logger.info( - "HANDLER_ENTRY: chat_id={} message_id={} reply_to={} text_preview={!r}", - incoming.chat_id, - incoming.message_id, - incoming.reply_to_message_id, - text_preview, - ) - else: - logger.info( - "HANDLER_ENTRY: chat_id={} message_id={} reply_to={} text_len={}", - incoming.chat_id, - incoming.message_id, - incoming.reply_to_message_id, - len(raw), - ) + platform_name = getattr(self.platform, "name", "messaging") + trace_event( + stage="ingress", + event="turn.received", + source=platform_name, + chat_id=incoming.chat_id, + platform_message_id=incoming.message_id, + reply_to_message_id=incoming.reply_to_message_id, + thread_id=getattr(incoming, "message_thread_id", None), + message_text=incoming.text or "", + ) with logger.contextualize( chat_id=incoming.chat_id, node_id=incoming.message_id @@ -240,8 +232,16 @@ class ClaudeMessageHandler: ) if was_queued and status_msg_id: - # Update status to show queue position queue_size = self.tree_queue.get_queue_size(node_id) + trace_event( + stage="routing", + event="turn.queued", + source=getattr(self.platform, "name", "messaging"), + chat_id=incoming.chat_id, + platform_message_id=node_id, + status_message_id=status_msg_id, + queue_size=queue_size, + ) await self.platform.queue_edit_message( incoming.chat_id, status_msg_id, @@ -343,10 +343,18 @@ class ClaudeMessageHandler: last_status: str | None = None parent_session_id = None + platform_nm = getattr(self.platform, "name", "messaging") if tree and node.parent_id: parent_session_id = tree.get_parent_session_id(node_id) if parent_session_id: - logger.info(f"Will fork from parent session: {parent_session_id}") + trace_event( + stage="claude_cli", + event="claude_cli.fork.from_parent_session", + source=platform_nm, + chat_id=chat_id, + node_id=node_id, + parent_session_id=parent_session_id, + ) editor = ThrottledTranscriptEditor( platform=self.platform, @@ -377,6 +385,33 @@ class ClaudeMessageHandler: temp_session_id = session_or_temp_id else: captured_session_id = session_or_temp_id + + sess_evt = ( + "claude_cli.session.pending_created" + if is_new + else "claude_cli.session.reused" + ) + trace_event( + stage="claude_cli", + event=sess_evt, + source=platform_nm, + chat_id=chat_id, + node_id=node_id, + status_message_id=status_msg_id, + session_handle=str(session_or_temp_id), + parent_resume_session_id=parent_session_id, + fork_requested=bool(parent_session_id), + ) + trace_event( + stage="claude_cli", + event="claude_cli.request.sent", + source=platform_nm, + chat_id=chat_id, + node_id=node_id, + prompt=incoming.text, + fork_session_arg=bool(parent_session_id), + resume_session_arg=parent_session_id, + ) except RuntimeError as e: error_message = get_user_facing_error_message(e) transcript.apply({"type": "error", "message": error_message}) @@ -390,10 +425,15 @@ class ClaudeMessageHandler: MessageState.ERROR, error_message=error_message, ) + trace_event( + stage="claude_cli", + event="claude_cli.session.limit_reached", + source=platform_nm, + chat_id=chat_id, + node_id=node_id, + ) return - logger.info(f"HANDLER: Starting CLI task processing for node {node_id}") - event_count = 0 async for event_data in cli_session.start_task( incoming.text, session_id=parent_session_id, @@ -404,9 +444,6 @@ class ClaudeMessageHandler: f"HANDLER: Non-dict event received: {type(event_data)}" ) continue - event_count += 1 - if event_count % 10 == 0: - logger.debug(f"HANDLER: Processed {event_count} events so far") ( captured_session_id, @@ -426,7 +463,6 @@ class ClaudeMessageHandler: parsed_list = parse_cli_event( event_data, log_raw_cli=self._log_raw_cli_diagnostics ) - logger.debug(f"HANDLER: Parsed {len(parsed_list)} events from CLI") for parsed in parsed_list: ( @@ -448,6 +484,13 @@ class ClaudeMessageHandler: ) except asyncio.CancelledError: + trace_event( + stage="claude_cli", + event="turn.processor.cancelled", + source=platform_nm, + chat_id=chat_id, + node_id=node_id, + ) logger.warning(f"HANDLER: Task cancelled for node {node_id}") cancel_reason = None if isinstance(node.context, dict): @@ -466,6 +509,14 @@ class ClaudeMessageHandler: node_id, MessageState.ERROR, error_message="Cancelled by user" ) except Exception as e: + trace_event( + stage="claude_cli", + event="turn.processor.exception", + source=platform_nm, + chat_id=chat_id, + node_id=node_id, + exc_type=type(e).__name__, + ) logger.error( "HANDLER: Task failed with exception: {}", format_exception_for_log( @@ -480,7 +531,14 @@ class ClaudeMessageHandler: node_id, error_msg, "Parent task failed" ) finally: - logger.info(f"HANDLER: _process_node completed for node {node_id}") + trace_event( + stage="routing", + event="turn.processor.finished", + source=platform_nm, + chat_id=chat_id, + node_id=node_id, + claude_session_id=captured_session_id or temp_session_id, + ) # Free the session-manager slot. Session IDs are persisted in the tree and # can be resumed later by ID; we don't need to keep a CLISession instance # around after this node completes. diff --git a/messaging/node_event_pipeline.py b/messaging/node_event_pipeline.py index 6f233e2..8481643 100644 --- a/messaging/node_event_pipeline.py +++ b/messaging/node_event_pipeline.py @@ -7,6 +7,8 @@ from typing import Any from loguru import logger +from core.trace import trace_event + from .cli_event_constants import TRANSCRIPT_EVENT_TYPES, get_status_for_event from .platforms.base import SessionManagerInterface from .safe_diagnostics import text_len_hint @@ -34,6 +36,15 @@ async def handle_session_info_event( return captured_session_id, temp_session_id await cli_manager.register_real_session_id(temp_session_id, real_session_id) + trace_event( + stage="claude_cli", + event="claude_cli.session.registered", + source="claude_cli", + node_id=node_id, + temp_session_id=temp_session_id, + real_session_id=real_session_id, + tree_root_id=tree.root_id if tree else None, + ) if tree and real_session_id: await tree.update_state( node_id, @@ -76,7 +87,13 @@ async def process_parsed_cli_event( elif ptype == "complete": if not had_transcript_events: transcript.apply({"type": "text_chunk", "text": "Done."}) - logger.info("HANDLER: Task complete, updating UI") + trace_event( + stage="claude_cli", + event="turn.completed", + source="cli_event", + node_id=node_id, + claude_session_id=captured_session_id, + ) await update_ui(format_status("✅", "Complete"), force=True) if tree and captured_session_id: await tree.update_state( @@ -87,15 +104,22 @@ async def process_parsed_cli_event( session_store.save_tree(tree.root_id, tree.to_dict()) elif ptype == "error": error_msg = parsed.get("message", "Unknown error") + em = error_msg if isinstance(error_msg, str) else str(error_msg) + trace_event( + stage="claude_cli", + event="turn.failed", + source="cli_event", + node_id=node_id, + claude_session_id=captured_session_id, + cli_error_message=em, + ) if log_messaging_error_details: logger.error("HANDLER: Error event received: {}", error_msg) else: - em = error_msg if isinstance(error_msg, str) else str(error_msg) logger.error( "HANDLER: Error event received: message_chars={}", text_len_hint(em), ) - logger.info("HANDLER: Updating UI with error status") await update_ui(format_status("❌", "Error"), force=True) if tree: await propagate_error_to_children(node_id, error_msg, "Parent task failed") diff --git a/providers/anthropic_messages.py b/providers/anthropic_messages.py index e63f659..ee7ca34 100644 --- a/providers/anthropic_messages.py +++ b/providers/anthropic_messages.py @@ -21,6 +21,7 @@ from core.anthropic.native_sse_block_policy import ( NativeSseBlockPolicyState, transform_native_sse_block_event, ) +from core.trace import provider_native_messages_body_snapshot, trace_event from providers.base import BaseProvider, ProviderConfig from providers.error_mapping import ( map_error, @@ -338,13 +339,16 @@ class AnthropicMessagesTransport(BaseProvider): body = self._build_request_body(request, thinking_enabled=thinking_enabled) thinking_enabled = self._is_thinking_enabled(request, thinking_enabled) - logger.info( - "{}_STREAM:{} natively passing Anthropic request model={} msgs={} tools={}", - tag, - req_tag, - body.get("model"), - len(body.get("messages", [])), - len(body.get("tools", [])), + trace_event( + stage="provider", + event="provider.request.sent", + source="provider", + provider=self._provider_name, + gateway_model=request.model, + downstream_model=body.get("model"), + message_count=len(body.get("messages", [])), + tool_count=len(body.get("tools", [])), + body=provider_native_messages_body_snapshot(body), ) response: httpx.Response | None = None @@ -373,28 +377,48 @@ class AnthropicMessagesTransport(BaseProvider): _validated_stream_send ) + chunk_count = 0 + chunk_bytes = 0 + async for chunk in self._iter_stream_chunks( response, state=state, thinking_enabled=thinking_enabled, ): + chunk_count += 1 + chunk_bytes += len(chunk.encode("utf-8", errors="replace")) sent_any_event = True emitted_tracker.feed(chunk) yield chunk + trace_event( + stage="provider", + event="provider.response.completed", + source="provider", + provider=self._provider_name, + gateway_model=request.model, + sse_chunks_out=chunk_count, + sse_bytes_out=chunk_bytes, + ) + except Exception as error: if not isinstance(error, httpx.HTTPStatusError): - self._log_stream_transport_error(tag, req_tag, error) + self._log_stream_transport_error( + tag, req_tag, error, request_id=request_id + ) error_message = self._get_error_message(error, request_id) if response is not None and not response.is_closed: await response.aclose() - logger.info( - "{}_STREAM: Emitting native SSE error event for {}{}", - tag, - type(error).__name__, - req_tag, + trace_event( + stage="provider", + event="provider.response.error", + source="provider", + provider=self._provider_name, + error_message=error_message, + exc_type=type(error).__name__, + mid_stream=sent_any_event, ) if sent_any_event: for event in emitted_tracker.iter_close_unclosed_blocks(): diff --git a/providers/base.py b/providers/base.py index a009ff1..8f918ad 100644 --- a/providers/base.py +++ b/providers/base.py @@ -80,26 +80,43 @@ class BaseProvider(ABC): build(request, thinking_enabled=thinking_enabled) def _log_stream_transport_error( - self, tag: str, req_tag: str, error: Exception + self, + tag: str, + req_tag: str, + error: Exception, + *, + request_id: str | None = None, ) -> None: """Log streaming transport failures (metadata-only unless verbose is enabled).""" from loguru import logger + from core.trace import trace_event + + response = getattr(error, "response", None) + http_status = ( + getattr(response, "status_code", None) if response is not None else None + ) + trace_event( + stage="provider", + event="provider.response.transport_error", + source="provider", + provider=tag, + request_id=request_id, + exc_type=type(error).__name__, + http_status=http_status, + ) + if self._config.log_api_error_tracebacks: logger.error( "{}_ERROR:{} {}: {}", tag, req_tag, type(error).__name__, error ) return - response = getattr(error, "response", None) - status_code = ( - getattr(response, "status_code", None) if response is not None else None - ) logger.error( "{}_ERROR:{} exc_type={} http_status={}", tag, req_tag, type(error).__name__, - status_code, + http_status, ) @abstractmethod diff --git a/providers/openai_compat.py b/providers/openai_compat.py index 071b827..384930e 100644 --- a/providers/openai_compat.py +++ b/providers/openai_compat.py @@ -23,6 +23,7 @@ from core.anthropic import ( append_request_id, map_stop_reason, ) +from core.trace import provider_chat_body_snapshot, trace_event from providers.base import BaseProvider, ProviderConfig from providers.error_mapping import ( map_error, @@ -353,13 +354,16 @@ class OpenAIChatTransport(BaseProvider): body = self._build_request_body(request, thinking_enabled=thinking_enabled) thinking_enabled = self._is_thinking_enabled(request, thinking_enabled) req_tag = f" request_id={request_id}" if request_id else "" - logger.info( - "{}_STREAM:{} model={} msgs={} tools={}", - tag, - req_tag, - body.get("model"), - len(body.get("messages", [])), - len(body.get("tools", [])), + trace_event( + stage="provider", + event="provider.request.sent", + source="provider", + provider=self._provider_name, + gateway_model=request.model, + downstream_model=body.get("model"), + message_count=len(body.get("messages", [])), + tool_count=len(body.get("tools", [])), + body=provider_chat_body_snapshot(body), ) yield sse.message_start() @@ -455,7 +459,7 @@ class OpenAIChatTransport(BaseProvider): except asyncio.CancelledError, GeneratorExit: raise except Exception as e: - self._log_stream_transport_error(tag, req_tag, e) + self._log_stream_transport_error(tag, req_tag, e, request_id=request_id) mapped_e = map_error(e, rate_limiter=self._global_rate_limiter) base_message = user_visible_message_for_mapped_provider_error( mapped_e, @@ -463,11 +467,13 @@ class OpenAIChatTransport(BaseProvider): read_timeout_s=self._config.http_read_timeout, ) error_message = append_request_id(base_message, request_id) - logger.info( - "{}_STREAM: Emitting SSE error event for {}{}", - tag, - type(e).__name__, - req_tag, + trace_event( + stage="provider", + event="provider.response.error", + source="provider", + provider=tag, + error_message=error_message, + mapped_error_type=type(mapped_e).__name__, ) for event in sse.close_all_blocks(): yield event @@ -552,5 +558,14 @@ class OpenAIChatTransport(BaseProvider): provider_input, provider_input - input_tokens, ) + trace_event( + stage="provider", + event="provider.response.completed", + source="provider", + provider=self._provider_name, + finish_reason=(None if finish_reason is None else str(finish_reason)), + output_tokens=output_tokens, + prompt_tokens_estimate=input_tokens, + ) yield sse.message_delta(map_stop_reason(finish_reason), output_tokens) yield sse.message_stop() diff --git a/tests/api/test_safe_logging.py b/tests/api/test_safe_logging.py index 1096516..7c3f9bc 100644 --- a/tests/api/test_safe_logging.py +++ b/tests/api/test_safe_logging.py @@ -70,11 +70,7 @@ def test_sse_builder_default_debug_has_no_serialized_json_content(): sse = SSEBuilder("msg_x", "m", 1, log_raw_events=False) sse.message_start() - assert mock_debug.call_count == 1 - message = str(mock_debug.call_args) - assert "serialized_bytes=" in message - assert "role" not in message - assert "assistant" not in message + assert mock_debug.call_count == 0 def test_sse_builder_raw_logging_includes_event_body_when_enabled(): diff --git a/tests/core/test_trace.py b/tests/core/test_trace.py new file mode 100644 index 0000000..eb4c147 --- /dev/null +++ b/tests/core/test_trace.py @@ -0,0 +1,38 @@ +"""Structured TRACE logging assertions.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from loguru import logger + +from config.logging_config import configure_logging +from core.trace import TRACE_PAYLOAD_BINDING, trace_event + + +def test_trace_payload_merged_into_json_line(tmp_path) -> None: + log_file = str(tmp_path / "t.log") + configure_logging(log_file, force=True) + trace_event(stage="s", event="e.v1", source="unit", hello="world", n=42) + logger.complete() + text = Path(log_file).read_text(encoding="utf-8").strip().split("\n")[-1] + row = json.loads(text) + assert row["trace"] is True + assert row["stage"] == "s" + assert row["event"] == "e.v1" + assert row["source"] == "unit" + assert row["hello"] == "world" + assert row["n"] == 42 + assert TRACE_PAYLOAD_BINDING == "trace_payload" + + +def test_sanitize_masks_nested_api_key_strings() -> None: + """Credential-shaped keys redact without touching normal message text.""" + from core.trace import _sanitize_trace_value + + out = _sanitize_trace_value( + {"outer": {"api_key": "secret", "text": "visible"}}, + ) + assert out["outer"]["api_key"] == "" + assert out["outer"]["text"] == "visible" diff --git a/tests/messaging/test_handler.py b/tests/messaging/test_handler.py index 2c79bbb..6ea7380 100644 --- a/tests/messaging/test_handler.py +++ b/tests/messaging/test_handler.py @@ -14,10 +14,11 @@ def handler(mock_platform, mock_cli_manager, mock_session_store): @pytest.mark.asyncio -async def test_handle_message_default_logs_text_len_not_content( +async def test_handle_message_turn_trace_includes_full_message_text( mock_platform, mock_cli_manager, mock_session_store, incoming_message_factory ): - secret = "user-secret-content-never-log-default" + """turn.received always records the verbatim user message (local debugging).""" + secret = "user-message-content-visible-in-trace" handler = ClaudeMessageHandler( mock_platform, mock_cli_manager, @@ -27,33 +28,33 @@ async def test_handle_message_default_logs_text_len_not_content( incoming = incoming_message_factory(text=secret) with ( patch.object(handler, "_handle_message_impl", new_callable=AsyncMock), - patch("messaging.handler.logger.info") as log_info, + patch("messaging.handler.trace_event") as trace_mock, ): await handler.handle_message(incoming) - blob = " ".join(str(c) for c in log_info.call_args_list) - assert secret not in blob - assert "text_len=" in blob + kwargs = trace_mock.call_args.kwargs + assert kwargs["event"] == "turn.received" + assert kwargs["message_text"] == secret @pytest.mark.asyncio -async def test_handle_message_raw_content_logging_includes_preview( +async def test_handle_message_log_raw_messaging_does_not_change_turn_received_shape( mock_platform, mock_cli_manager, mock_session_store, incoming_message_factory ): - secret = "visible-preview-xyz" + """LOG_RAW_MESSAGING_CONTENT is adapter-only; ingress TRACE always includes text.""" + text = "visible-either-way" handler = ClaudeMessageHandler( mock_platform, mock_cli_manager, mock_session_store, log_raw_messaging_content=True, ) - incoming = incoming_message_factory(text=secret) + incoming = incoming_message_factory(text=text) with ( patch.object(handler, "_handle_message_impl", new_callable=AsyncMock), - patch("messaging.handler.logger.info") as log_info, + patch("messaging.handler.trace_event") as trace_mock, ): await handler.handle_message(incoming) - blob = " ".join(str(c) for c in log_info.call_args_list) - assert secret in blob + assert trace_mock.call_args.kwargs["message_text"] == text def test_get_initial_status_new_conversation(handler):