From 13a4f5c4a2a959bfbae11634bd8f8dbe47e00805 Mon Sep 17 00:00:00 2001 From: Sangjin Moon <1128msj@naver.com> Date: Fri, 13 Feb 2026 11:10:37 +0900 Subject: [PATCH] fix: use strict RFC 2397 regex in _parse_base64_data_uri to avoid misidentifying SSE data _parse_base64_data_uri previously used a loose startswith("data:") check, which caused SSE data (e.g., "data: {...}") to be incorrectly processed as base64 data URIs, resulting in spurious error logs. Replace the manual parsing with a strict regex that requires the full data:[][;params];base64, format. Non-matching inputs now return (None, None) cleanly without error logging. Closes https://github.com/langfuse/langfuse/issues/5659 --- langfuse/media.py | 36 ++++++------ tests/test_issue_5659.py | 121 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+), 19 deletions(-) create mode 100644 tests/test_issue_5659.py diff --git a/langfuse/media.py b/langfuse/media.py index 6691785af..2df707dca 100644 --- a/langfuse/media.py +++ b/langfuse/media.py @@ -190,33 +190,31 @@ def parse_reference_string(reference_string: str) -> ParsedMediaReference: content_type=cast(MediaContentType, parsed_data["type"]), ) + # Strict regex for RFC 2397 base64 data URIs: data:[][;params];base64, + _BASE64_DATA_URI_RE = re.compile( + r"^data:" + r"(?P[a-zA-Z0-9][a-zA-Z0-9!#$&\-^_.+]*/[a-zA-Z0-9][a-zA-Z0-9!#$&\-^_.+]*)?" + r"(?:;[^;,]+)*" # optional parameters (e.g., ;charset=utf-8) + r";base64," + r"(?P[A-Za-z0-9+/\r\n]+=*)\s*$" + ) + def _parse_base64_data_uri( self, data: str ) -> Tuple[Optional[bytes], Optional[MediaContentType]]: # Example data URI: data:image/jpeg;base64,/9j/4AAQ... - try: - if not data or not isinstance(data, str): - raise ValueError("Data URI is not a string") - - if not data.startswith("data:"): - raise ValueError("Data URI does not start with 'data:'") - - header, actual_data = data[5:].split(",", 1) - if not header or not actual_data: - raise ValueError("Invalid URI") + if not data or not isinstance(data, str): + return None, None - # Split header into parts and check for base64 - header_parts = header.split(";") - if "base64" not in header_parts: - raise ValueError("Data is not base64 encoded") + match = self._BASE64_DATA_URI_RE.match(data) + if not match: + return None, None - # Content type is the first part - content_type = header_parts[0] - if not content_type: - raise ValueError("Content type is empty") + try: + content_type = match.group("content_type") or "text/plain" + actual_data = match.group("data") return base64.b64decode(actual_data), cast(MediaContentType, content_type) - except Exception as e: self._log.error("Error parsing base64 data URI", exc_info=e) diff --git a/tests/test_issue_5659.py b/tests/test_issue_5659.py new file mode 100644 index 000000000..b2bc723ee --- /dev/null +++ b/tests/test_issue_5659.py @@ -0,0 +1,121 @@ +"""Test for issue #5659: _parse_base64_data_uri misidentifies SSE data as base64 media.""" + +import base64 +import logging + +from langfuse.media import LangfuseMedia + + +def _make_media(): + """Create a LangfuseMedia instance for testing _parse_base64_data_uri.""" + return LangfuseMedia( + content_bytes=b"dummy", content_type="application/octet-stream" + ) + + +def test_sse_data_is_not_parsed_as_base64(caplog): + """Verify SSE data strings return (None, None) without error logging.""" + media = _make_media() + with caplog.at_level(logging.ERROR, logger="langfuse.media"): + result = media._parse_base64_data_uri("data: {'foo': 'bar'}") + + assert result == (None, None) + assert caplog.records == [], ( + f"Expected no error logs, got: {[r.message for r in caplog.records]}" + ) + + +def test_sse_data_with_json(caplog): + """Verify SSE data with JSON payload returns (None, None) without error logging.""" + media = _make_media() + with caplog.at_level(logging.ERROR, logger="langfuse.media"): + result = media._parse_base64_data_uri( + 'data: {"event": "message", "data": "hello"}' + ) + + assert result == (None, None) + assert caplog.records == [], ( + f"Expected no error logs, got: {[r.message for r in caplog.records]}" + ) + + +def test_valid_base64_data_uri_still_works(): + """Verify a proper base64 data URI is parsed correctly.""" + original_bytes = b"hello world" + encoded = base64.b64encode(original_bytes).decode("utf-8") + data_uri = f"data:text/plain;base64,{encoded}" + + media = _make_media() + content_bytes, content_type = media._parse_base64_data_uri(data_uri) + + assert content_bytes == original_bytes + assert content_type == "text/plain" + + +def test_data_uri_without_base64_returns_none(caplog): + """Verify a data URI without ;base64 encoding returns (None, None).""" + media = _make_media() + with caplog.at_level(logging.ERROR, logger="langfuse.media"): + result = media._parse_base64_data_uri("data:text/plain,hello") + + assert result == (None, None) + assert caplog.records == [] + + +def test_empty_string_returns_none(caplog): + """Verify an empty string returns (None, None) without error logging.""" + media = _make_media() + with caplog.at_level(logging.ERROR, logger="langfuse.media"): + result = media._parse_base64_data_uri("") + + assert result == (None, None) + assert caplog.records == [] + + +def test_non_data_uri_returns_none(caplog): + """Verify a regular string returns (None, None) without error logging.""" + media = _make_media() + with caplog.at_level(logging.ERROR, logger="langfuse.media"): + result = media._parse_base64_data_uri("just a regular string") + + assert result == (None, None) + assert caplog.records == [] + + +def test_valid_image_data_uri(): + """Verify a valid image data URI parses correctly.""" + pixel_bytes = b"\x89PNG\r\n" + encoded = base64.b64encode(pixel_bytes).decode("utf-8") + data_uri = f"data:image/png;base64,{encoded}" + + media = _make_media() + content_bytes, content_type = media._parse_base64_data_uri(data_uri) + + assert content_bytes == pixel_bytes + assert content_type == "image/png" + + +def test_data_uri_with_mime_params(): + """Verify a data URI with extra MIME parameters (e.g. charset) parses correctly.""" + original_bytes = b"hello world" + encoded = base64.b64encode(original_bytes).decode("utf-8") + data_uri = f"data:text/plain;charset=utf-8;base64,{encoded}" + + media = _make_media() + content_bytes, content_type = media._parse_base64_data_uri(data_uri) + + assert content_bytes == original_bytes + assert content_type == "text/plain" + + +def test_data_uri_without_mime_type(): + """Verify a data URI without MIME type defaults to text/plain per RFC 2397.""" + original_bytes = b"hello world" + encoded = base64.b64encode(original_bytes).decode("utf-8") + data_uri = f"data:;base64,{encoded}" + + media = _make_media() + content_bytes, content_type = media._parse_base64_data_uri(data_uri) + + assert content_bytes == original_bytes + assert content_type == "text/plain"