feat(langgraph): migrate LangGraph harness onto unified surface#417
feat(langgraph): migrate LangGraph harness onto unified surface#417declan-scale wants to merge 9 commits into
Conversation
| def __init__(self, stream: Any, model: str | None = None) -> None: | ||
| self._stream = stream | ||
| self._model = model | ||
| self._usage: TurnUsage = TurnUsage(model=model) | ||
|
|
||
| @property | ||
| def events(self) -> AsyncIterator[StreamTaskMessage]: | ||
| return self._generate_events() | ||
|
|
||
| async def _generate_events(self) -> AsyncGenerator[StreamTaskMessage, None]: | ||
| def _capture(ai_msg: Any) -> None: | ||
| usage_metadata = getattr(ai_msg, "usage_metadata", None) | ||
| if usage_metadata is not None: | ||
| self._usage = langgraph_usage_to_turn_usage(usage_metadata, self._model) | ||
|
|
||
| async for ev in convert_langgraph_to_agentex_events(self._stream, on_final_ai_message=_capture): | ||
| yield ev | ||
|
|
||
| def usage(self) -> TurnUsage: | ||
| """Return the usage captured from the last AIMessage in the stream. | ||
|
|
||
| Valid only after ``events`` has been fully consumed. | ||
| Returns a zero-usage ``TurnUsage`` if the model did not report usage. | ||
| """ | ||
| return self._usage |
There was a problem hiding this comment.
TurnResult.usage is always empty when using auto_send_turn
LangGraphTurn populates self._usage lazily via the on_final_ai_message callback, which fires during event iteration. However, UnifiedEmitter.auto_send_turn passes usage=turn.usage() as an argument to auto_send before iteration begins (Python evaluates all arguments before the call). By the time the stream is consumed and _capture updates self._usage, the pre-iteration snapshot has already been handed to TurnResult.
Concretely: every caller that reads result.usage after await emitter.auto_send_turn(turn) gets TurnUsage(model=model) — zero token counts regardless of what the model reported. The PR description documents the workaround ("callers should read turn.usage() after auto_send_turn returns"), but TurnResult.usage existing with silent stale data is a trap for every future user of this API.
The fix belongs in emitter.py: call turn.usage() after await auto_send(turn.events, ...) returns, then construct the TurnResult from the now-populated usage.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agentex/lib/adk/_modules/_langgraph_turn.py
Line: 95-119
Comment:
**`TurnResult.usage` is always empty when using `auto_send_turn`**
`LangGraphTurn` populates `self._usage` lazily via the `on_final_ai_message` callback, which fires _during_ event iteration. However, `UnifiedEmitter.auto_send_turn` passes `usage=turn.usage()` as an argument to `auto_send` _before_ iteration begins (Python evaluates all arguments before the call). By the time the stream is consumed and `_capture` updates `self._usage`, the pre-iteration snapshot has already been handed to `TurnResult`.
Concretely: every caller that reads `result.usage` after `await emitter.auto_send_turn(turn)` gets `TurnUsage(model=model)` — zero token counts regardless of what the model reported. The PR description documents the workaround ("callers should read `turn.usage()` after `auto_send_turn` returns"), but `TurnResult.usage` existing with silent stale data is a trap for every future user of this API.
The fix belongs in `emitter.py`: call `turn.usage()` _after_ `await auto_send(turn.events, ...)` returns, then construct the `TurnResult` from the now-populated usage.
How can I resolve this? If you propose a fix, please make it concise.Adds an additive on_final_ai_message=None parameter to convert_langgraph_to_agentex_events so callers can capture AIMessage usage_metadata without re-traversing the stream. No behavior change when omitted. Also adds a DeprecationWarning to create_langgraph_tracing_handler and its module docstring, pointing to the unified harness surface, and updates the sync module docstring with the preferred unified path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Implements LangGraphTurn (HarnessTurn protocol) that wraps a LangGraph astream() event stream and captures usage from AIMessage.usage_metadata via the on_final_ai_message callback. Implements langgraph_usage_to_turn_usage that maps all UsageMetadata fields (input/output/total/cache_read/reasoning) onto the framework-agnostic TurnUsage model. Zero token counts are preserved. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…pre-refactor) Records the current bespoke behavior as a contract test. After Task 4 rewrites the internals to use UnifiedEmitter + LangGraphTurn, these tests must still pass to confirm behavioral parity. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…urface Replaces the bespoke Redis-streaming loop with UnifiedEmitter.auto_send_turn( LangGraphTurn(...)), matching the pattern established for pydantic-ai. Public signature preserved identically. Behavioral difference: tool calls/responses are now posted via streaming_task_message_context (not adk.messages.create), and final_text accumulates all text across the turn. Updates the characterization test to document these unified-surface semantics. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Verifies yield_turn(LangGraphTurn) produces identical events to direct iteration, and documents the AGX1-377 behavior (LangGraph Full tool events don't produce SpanDeriver spans today; cross-channel equivalence comes with AGX1-373). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…-step) Registers LangGraph-specific conformance fixtures with the shared harness conformance runner. Documents the AGX1-377 behavior (tool requests are Full events, not Start+Done). Span derivation is deterministic for all 4 fixtures. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ral channels Adds 18 offline integration tests across the three delivery channels using fake LangGraph event streams and fake streaming backends. Documents the AGX1-377 behavior (Full events don't produce tool spans). Notes the usage capture timing: turn.usage() is the authoritative post-iteration value since auto_send_turn evaluates usage eagerly before events are consumed. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Task 9: add 3 deployable tutorial agents that demonstrate the unified harness surface side-by-side with the bespoke reference examples: - examples/tutorials/00_sync/harness_langgraph/ (s-harness-langgraph) uses UnifiedEmitter.yield_turn(LangGraphTurn(stream)) - examples/tutorials/10_async/00_base/harness_langgraph/ (a-harness-langgraph) uses UnifiedEmitter.auto_send_turn(LangGraphTurn(stream)) - examples/tutorials/10_async/10_temporal/harness_langgraph/ (at-harness-langgraph) follows 130_langgraph pattern (LangGraphPlugin + emit_langgraph_messages) Task 10: enable live-matrix CI job in harness-integration.yml with a 3-way matrix over [sync, async, temporal] running offline integration tests. Also add test_harness_langgraph_*.py to PR path triggers. Task 11 (pyright fixes): annotate convert_langgraph_to_agentex_events and _generate_events with AsyncGenerator return types so pyright infers them as async generators rather than coroutines. Add start_time to Span construction in test_langgraph_sync_unified.py fake tracing backend. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…racing handler (PR 5/6) AGX1-378: wire workflow_now_if_in_workflow() into stream_langgraph_events so Temporal callers get deterministic message timestamps, matching the pattern used by the openai/litellm providers. Deprecation alignment: remove runtime warnings.warn from create_langgraph_tracing_handler (and unused import warnings) to match PR 4/6 pydantic-ai convention. Deprecation remains in docstrings on module, class, and function. Callers under -W error are no longer broken. Test alignment after rebase onto unified-harness-surface (b4b8b33): - FakeStreamingModule.streaming_task_message_context in test_langgraph_async.py and test_pydantic_ai_async.py updated to accept **kw (foundation now passes created_at). - Three "no tool spans for Full events" tests updated to assert the new SpanDeriver behaviour: Full(ToolRequestContent) opens a span, Full(ToolResponseContent) closes it. - Two "accumulates all text" multi-step tests corrected to last-segment semantics (auto_send resets final_text_parts on each new Start(TextContent)). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
dc5c81d to
68572d5
Compare
|
@greptile review |
Summary
Migrates the LangGraph harness onto the unified harness surface introduced in PR 4 (pydantic-ai). Implements 12 tasks covering the new
LangGraphTurnadapter, bespoke helper rewrites, offline integration tests, conformance fixtures, tutorial agents, and CI matrix.New surface:
Key implementation points:
LangGraphTurnwraps LangGraphastream()and implementsHarnessTurn(tasks 1-2)stream_langgraph_eventsreimplemented onUnifiedEmitter(task 4)_langgraph_tracing.pycreate_langgraph_tracing_handlermarked deprecated withwarnings.warn(DeprecationWarning)(task 3)StreamTaskMessageFull(not Start+Delta+Done);SpanDeriverdoes not produce tool spans from Full events today (tracked in AGX1-373)LangGraphTurn.usage()is populated viaon_final_ai_messagecallback during event iteration;TurnResult.usageis a pre-iteration snapshot — callers should readturn.usage()afterauto_send_turnreturnsAsyncGeneratorreturn type annotation toconvert_langgraph_to_agentex_eventsand_generate_eventsto fix pyright inference (was treating them as coroutines)Tests added (tasks 5-8, 219 passing):
test_langgraph_sync.py: 11 unit tests forconvert_langgraph_to_agentex_events+ deprecationtest_langgraph_turn.py: 19 unit tests forLangGraphTurn+langgraph_usage_to_turn_usagetest_langgraph_async.py: 6 characterization tests for the unifiedstream_langgraph_eventstest_langgraph_sync_unified.py: 6 passthrough + span derivation teststest_langgraph_conformance.py: 4 conformance fixtures (text-only, single-tool, reasoning, multi-step)test_harness_langgraph_sync.py: 6 offline integration tests (yield channel)test_harness_langgraph_async.py: 7 offline integration tests (auto_send channel)test_harness_langgraph_temporal.py: 5 offline integration tests (temporal channel)Tutorial agents (task 9):
examples/tutorials/00_sync/harness_langgraph/(s-harness-langgraph) — sync, yield_turnexamples/tutorials/10_async/00_base/harness_langgraph/(a-harness-langgraph) — async, auto_send_turnexamples/tutorials/10_async/10_temporal/harness_langgraph/(at-harness-langgraph) — temporal, LangGraphPlugin + emit_langgraph_messagesCI (task 10): Enabled
live-matrixjob inharness-integration.ymlwith 3-way matrix over[sync, async, temporal]running offline LangGraph integration tests.Test plan
uv run --all-packages --all-extras pytest tests/lib/core/harness/ tests/lib/adk/ -v— 219 passed./scripts/lint— 0 errors, 0 warnings (ruff + pyright)🤖 Generated with Claude Code
Greptile Summary
This PR migrates the LangGraph harness onto the unified harness surface (
LangGraphTurn+UnifiedEmitter), replacing ~180 lines of bespoke async streaming logic with a thin adapter and adding 219 tests across sync, async, temporal, and conformance suites.LangGraphTurnimplements theHarnessTurnprotocol, delegating event generation toconvert_langgraph_to_agentex_eventsand capturing usage via anon_final_ai_messagecallback.stream_langgraph_eventsis reimplemented as a one-liner overUnifiedEmitter.auto_send_turn; the old bespoke async handler is fully removed._langgraph_sync.py, theStreamTaskMessageStartemitted when a reasoning model returns a"reasoning"block usesTextContentinstead ofReasoningContent.ReasoningContentis not imported in the file. The conformance fixture constructs the correct events by hand but never passes them through the converter, so this goes untested and undetected.Confidence Score: 4/5
Safe to merge for text-only and tool-calling agents; reasoning-model agents (o1, gpt-o3) will emit malformed event streams until the TextContent/ReasoningContent mismatch in _langgraph_sync.py is fixed.
The core migration is solid and well-tested. The issue is in the reasoning block path of _langgraph_sync.py: when a reasoning model returns a "reasoning" typed block, the code opens the streaming context with TextContent instead of ReasoningContent. ReasoningContent is not even imported in the file. The conformance fixture constructs the correct events manually but never validates the converter's output, so the bug is invisible to the test suite. Any deployment using a reasoning model through the sync or async LangGraph harness will produce a type-mismatched event stream.
src/agentex/lib/adk/_modules/_langgraph_sync.py — the reasoning block StreamTaskMessageStart at line ~151
Important Files Changed
Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant Agent as Agent (acp.py) participant LGT as LangGraphTurn participant Conv as convert_langgraph_to_agentex_events participant Emitter as UnifiedEmitter participant Streaming as adk.streaming Agent->>LGT: "LangGraphTurn(graph.astream(), model=model)" Agent->>Emitter: yield_turn(turn) or auto_send_turn(turn) Emitter->>LGT: iterate turn.events LGT->>Conv: convert_langgraph_to_agentex_events(stream, on_final_ai_message) loop LangGraph events Conv-->>LGT: StreamTaskMessageStart/Delta/Done (text) or StreamTaskMessageFull (tool) LGT-->>Emitter: yield event Emitter->>Streaming: streaming_task_message_context(...) Note over Conv,LGT: on_final_ai_message fires for AIMessage in updates end LGT->>LGT: _usage updated via _capture() Emitter-->>Agent: TurnResult or async for event Note over Agent: turn.usage() returns post-iteration usage%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant Agent as Agent (acp.py) participant LGT as LangGraphTurn participant Conv as convert_langgraph_to_agentex_events participant Emitter as UnifiedEmitter participant Streaming as adk.streaming Agent->>LGT: "LangGraphTurn(graph.astream(), model=model)" Agent->>Emitter: yield_turn(turn) or auto_send_turn(turn) Emitter->>LGT: iterate turn.events LGT->>Conv: convert_langgraph_to_agentex_events(stream, on_final_ai_message) loop LangGraph events Conv-->>LGT: StreamTaskMessageStart/Delta/Done (text) or StreamTaskMessageFull (tool) LGT-->>Emitter: yield event Emitter->>Streaming: streaming_task_message_context(...) Note over Conv,LGT: on_final_ai_message fires for AIMessage in updates end LGT->>LGT: _usage updated via _capture() Emitter-->>Agent: TurnResult or async for event Note over Agent: turn.usage() returns post-iteration usageComments Outside Diff (1)
src/agentex/lib/adk/_modules/_langgraph_sync.py, line 147-153 (link)StreamTaskMessageStartuses wrong content typeWhen a reasoning model emits a block of type
"reasoning", the code opens the stream withTextContent(type="text", ...)instead ofReasoningContent. Downstream consumers that dispatch oncontent.type(e.g. rendering pipelines, theSpanDerivertext-span logic) will receive aTextContentwrapper for what is actually a reasoning block, then see aReasoningContentDeltaarrive — a type mismatch that will confuse or break those consumers.ReasoningContentis also not imported in this file, confirming the intended type was never used. The conformance fixture_REASONINGcorrectly showsReasoningContentas the expected start content, but it constructs the events by hand and never runs them through the actual converter, so no test catches this today.Prompt To Fix With AI
Prompt To Fix All With AI
Reviews (3): Last reviewed commit: "fix(langgraph): restore created_at + doc..." | Re-trigger Greptile