Skip to content

Commit d6c6cb1

Browse files
authored
Only decode JSON input buffer in Anthropic Claude streaming (#497)
_decode_tool_use was only used when _tool_json_input_buf was found, but we were decoding the entire _content_block after adding _tool_json_input_buf to it. The _content_block overall which could contain non-JSON elements (e.g. {}), causing failures. To fix this, we have removed _decode_tool_use helper function and inlined JSON decoding logic directly into content_block_stop handler in _process_anthropic_claude_chunk, where we only use it to decode _tool_json_input_buf before appending to _content_block. Patch based on open-telemetry/opentelemetry-python-contrib#3875 with code copied directly from https://github.com/open-telemetry/opentelemetry-python-contrib/blob/v0.54b1/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/extensions/bedrock_utils.py#L289 Repeated testing in open-telemetry/opentelemetry-python-contrib#3875 to confirm this works By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent bf434f4 commit d6c6cb1

File tree

3 files changed

+187
-5
lines changed

3 files changed

+187
-5
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,7 @@ If your change does not need a CHANGELOG entry, add the "skip changelog" label t
1313
## Unreleased
1414
- Add botocore instrumentation extension for Bedrock AgentCore services with span attributes
1515
([#490](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/490))
16+
- [PATCH] Only decode JSON input buffer in Anthropic Claude streaming
17+
([#497](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/497))
1618
- Fix timeout handling for exceeded deadline in retry logic in OTLPAwsLogsExporter
1719
([#501](https://github.com/aws-observability/aws-otel-python-instrumentation/pull/501))

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,89 @@ def patched_extract_tool_calls(
334334
tool_calls.append(tool_call)
335335
return tool_calls
336336

337+
# TODO: The following code is to patch a bedrock bug that was fixed in
338+
# opentelemetry-instrumentation-botocore==0.60b0 in:
339+
# https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3875
340+
# Remove this code once we've bumped opentelemetry-instrumentation-botocore dependency to 0.60b0
341+
def patched_process_anthropic_claude_chunk(self, chunk):
342+
# pylint: disable=too-many-return-statements,too-many-branches
343+
if not (message_type := chunk.get("type")):
344+
return
345+
346+
if message_type == "message_start":
347+
# {'type': 'message_start', 'message': {'id': 'id', 'type': 'message', 'role': 'assistant',
348+
# 'model': 'claude-2.0', 'content': [], 'stop_reason': None, 'stop_sequence': None,
349+
# 'usage': {'input_tokens': 18, 'output_tokens': 1}}}
350+
if chunk.get("message", {}).get("role") == "assistant":
351+
self._record_message = True
352+
message = chunk["message"]
353+
self._message = {
354+
"role": message["role"],
355+
"content": message.get("content", []),
356+
}
357+
return
358+
359+
if message_type == "content_block_start":
360+
# {'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}}
361+
# {'type': 'content_block_start', 'index': 1, 'content_block':
362+
# {'type': 'tool_use', 'id': 'id', 'name': 'func_name', 'input': {}}}
363+
if self._record_message:
364+
block = chunk.get("content_block", {})
365+
if block.get("type") == "text":
366+
self._content_block = block
367+
elif block.get("type") == "tool_use":
368+
self._content_block = block
369+
return
370+
371+
if message_type == "content_block_delta":
372+
# {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Here'}}
373+
# {'type': 'content_block_delta', 'index': 1, 'delta': {'type': 'input_json_delta', 'partial_json': ''}}
374+
if self._record_message:
375+
delta = chunk.get("delta", {})
376+
if delta.get("type") == "text_delta":
377+
self._content_block["text"] += delta.get("text", "")
378+
elif delta.get("type") == "input_json_delta":
379+
self._tool_json_input_buf += delta.get("partial_json", "")
380+
return
381+
382+
if message_type == "content_block_stop":
383+
# {'type': 'content_block_stop', 'index': 0}
384+
if self._tool_json_input_buf:
385+
try:
386+
self._content_block["input"] = json.loads(self._tool_json_input_buf)
387+
except json.JSONDecodeError:
388+
self._content_block["input"] = self._tool_json_input_buf
389+
self._message["content"].append(self._content_block)
390+
self._content_block = {}
391+
self._tool_json_input_buf = ""
392+
return
393+
394+
if message_type == "message_delta":
395+
# {'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None},
396+
# 'usage': {'output_tokens': 123}}
397+
if (stop_reason := chunk.get("delta", {}).get("stop_reason")) is not None:
398+
self._response["stopReason"] = stop_reason
399+
return
400+
401+
if message_type == "message_stop":
402+
# {'type': 'message_stop', 'amazon-bedrock-invocationMetrics':
403+
# {'inputTokenCount': 18, 'outputTokenCount': 123, 'invocationLatency': 5250, 'firstByteLatency': 290}}
404+
if invocation_metrics := chunk.get("amazon-bedrock-invocationMetrics"):
405+
self._process_invocation_metrics(invocation_metrics)
406+
407+
if self._record_message:
408+
self._response["output"] = {"message": self._message}
409+
self._record_message = False
410+
self._message = None
411+
412+
self._stream_done_callback(self._response)
413+
return
414+
337415
bedrock_utils.ConverseStreamWrapper.__init__ = patched_init
338416
bedrock_utils.ConverseStreamWrapper._process_event = patched_process_event
417+
bedrock_utils.InvokeModelWithResponseStreamWrapper._process_anthropic_claude_chunk = (
418+
patched_process_anthropic_claude_chunk
419+
)
339420
bedrock_utils.extract_tool_calls = patched_extract_tool_calls
340421

341422
# END The OpenTelemetry Authors code

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/patches/test_instrumentation_patch.py

Lines changed: 104 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,6 @@ def _run_patch_behaviour_tests(self):
120120
self._test_unpatched_botocore_propagator()
121121
self._test_unpatched_gevent_instrumentation()
122122
self._test_unpatched_starlette_instrumentation()
123-
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
124-
# Bedrock Runtime tests
125-
self._test_unpatched_converse_stream_wrapper()
126-
self._test_unpatched_extract_tool_calls()
127123

128124
# Apply patches
129125
apply_instrumentation_patches()
@@ -222,6 +218,16 @@ def _test_unpatched_botocore_instrumentation(self):
222218
# DynamoDB
223219
self.assertTrue("dynamodb" in _KNOWN_EXTENSIONS, "Upstream has removed a DynamoDB extension")
224220

221+
# Bedrock Runtime tests
222+
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
223+
self._test_unpatched_converse_stream_wrapper()
224+
self._test_unpatched_extract_tool_calls()
225+
226+
# TODO: remove these tests once we bump botocore instrumentation version to 0.60b0
227+
self._test_unpatched_process_anthropic_claude_chunk({"location": "Seattle"}, {"location": "Seattle"})
228+
self._test_unpatched_process_anthropic_claude_chunk(None, None)
229+
self._test_unpatched_process_anthropic_claude_chunk({}, {})
230+
225231
def _test_unpatched_gevent_instrumentation(self):
226232
self.assertFalse(gevent.monkey.is_module_patched("os"), "gevent os module has been patched")
227233
self.assertFalse(gevent.monkey.is_module_patched("thread"), "gevent thread module has been patched")
@@ -267,10 +273,14 @@ def _test_patched_botocore_instrumentation(self):
267273
# Bedrock Agent Operation
268274
self._test_patched_bedrock_agent_instrumentation()
269275

270-
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
271276
# Bedrock Runtime
277+
# TODO: remove these tests once we bump botocore instrumentation version to 0.56b0
272278
self._test_patched_converse_stream_wrapper()
273279
self._test_patched_extract_tool_calls()
280+
# TODO: remove these tests once we bump botocore instrumentation version to 0.60b0
281+
self._test_patched_process_anthropic_claude_chunk({"location": "Seattle"}, {"location": "Seattle"})
282+
self._test_patched_process_anthropic_claude_chunk(None, None)
283+
self._test_patched_process_anthropic_claude_chunk({}, {})
274284

275285
# Bedrock Agent Runtime
276286
self.assertTrue("bedrock-agent-runtime" in _KNOWN_EXTENSIONS)
@@ -679,6 +689,95 @@ def _test_patched_extract_tool_calls(self):
679689
result = bedrock_utils.extract_tool_calls(message_with_string_content, True)
680690
self.assertIsNone(result)
681691

692+
# Test with toolUse format to exercise the for loop
693+
message_with_tool_use = {"role": "assistant", "content": [{"toolUse": {"toolUseId": "id1", "name": "func1"}}]}
694+
result = bedrock_utils.extract_tool_calls(message_with_tool_use, True)
695+
self.assertEqual(len(result), 1)
696+
697+
# Test with tool_use format to exercise the for loop
698+
message_with_type_tool_use = {
699+
"role": "assistant",
700+
"content": [{"type": "tool_use", "id": "id2", "name": "func2"}],
701+
}
702+
result = bedrock_utils.extract_tool_calls(message_with_type_tool_use, True)
703+
self.assertEqual(len(result), 1)
704+
705+
def _test_patched_process_anthropic_claude_chunk(
706+
self, input_value: Dict[str, str], expected_output: Dict[str, str]
707+
):
708+
self._test_process_anthropic_claude_chunk(input_value, expected_output, False)
709+
710+
def _test_unpatched_process_anthropic_claude_chunk(
711+
self, input_value: Dict[str, str], expected_output: Dict[str, str]
712+
):
713+
self._test_process_anthropic_claude_chunk(input_value, expected_output, True)
714+
715+
def _test_process_anthropic_claude_chunk(
716+
self, input_value: Dict[str, str], expected_output: Dict[str, str], expect_exception: bool
717+
):
718+
"""Test that _process_anthropic_claude_chunk handles various tool_use input formats."""
719+
wrapper = bedrock_utils.InvokeModelWithResponseStreamWrapper(
720+
stream=MagicMock(),
721+
stream_done_callback=MagicMock,
722+
stream_error_callback=MagicMock,
723+
model_id="anthropic.claude-3-5-sonnet-20240620-v1:0",
724+
)
725+
726+
# Simulate message_start
727+
wrapper._process_anthropic_claude_chunk(
728+
{
729+
"type": "message_start",
730+
"message": {
731+
"role": "assistant",
732+
"content": [],
733+
},
734+
}
735+
)
736+
737+
# Simulate content_block_start with specified input
738+
content_block = {
739+
"type": "tool_use",
740+
"id": "test_id",
741+
"name": "test_tool",
742+
}
743+
if input_value is not None:
744+
content_block["input"] = input_value
745+
746+
wrapper._process_anthropic_claude_chunk(
747+
{
748+
"type": "content_block_start",
749+
"index": 0,
750+
"content_block": content_block,
751+
}
752+
)
753+
754+
# Simulate content_block_stop
755+
try:
756+
wrapper._process_anthropic_claude_chunk({"type": "content_block_stop", "index": 0})
757+
except TypeError:
758+
if expect_exception:
759+
return
760+
else:
761+
raise
762+
763+
# Verify the message content
764+
self.assertEqual(len(wrapper._message["content"]), 1)
765+
tool_block = wrapper._message["content"][0]
766+
self.assertEqual(tool_block["type"], "tool_use")
767+
self.assertEqual(tool_block["id"], "test_id")
768+
self.assertEqual(tool_block["name"], "test_tool")
769+
770+
if expected_output is not None:
771+
self.assertEqual(tool_block["input"], expected_output)
772+
self.assertIsInstance(tool_block["input"], dict)
773+
else:
774+
self.assertNotIn("input", tool_block)
775+
776+
# Just adding this to do basic sanity checks and increase code coverage
777+
wrapper._process_anthropic_claude_chunk({"type": "content_block_delta", "index": 0})
778+
wrapper._process_anthropic_claude_chunk({"type": "message_delta"})
779+
wrapper._process_anthropic_claude_chunk({"type": "message_stop"})
780+
682781
def _test_patched_bedrock_agent_instrumentation(self):
683782
"""For bedrock-agent service, both extract_attributes and on_success provides attributes,
684783
the attributes depend on the API being invoked."""

0 commit comments

Comments
 (0)