Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 17 additions & 19 deletions langfuse/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,33 +190,31 @@ def parse_reference_string(reference_string: str) -> ParsedMediaReference:
content_type=cast(MediaContentType, parsed_data["type"]),
)

# Strict regex for RFC 2397 base64 data URIs: data:[<mediatype>][;params];base64,<data>
_BASE64_DATA_URI_RE = re.compile(
r"^data:"
r"(?P<content_type>[a-zA-Z0-9][a-zA-Z0-9!#$&\-^_.+]*/[a-zA-Z0-9][a-zA-Z0-9!#$&\-^_.+]*)?"
r"(?:;[^;,]+)*" # optional parameters (e.g., ;charset=utf-8)
r";base64,"
r"(?P<data>[A-Za-z0-9+/\r\n]+=*)\s*$"
)

def _parse_base64_data_uri(
self, data: str
) -> Tuple[Optional[bytes], Optional[MediaContentType]]:
# Example data URI: ...
try:
if not data or not isinstance(data, str):
raise ValueError("Data URI is not a string")

if not data.startswith("data:"):
raise ValueError("Data URI does not start with 'data:'")

header, actual_data = data[5:].split(",", 1)
if not header or not actual_data:
raise ValueError("Invalid URI")
if not data or not isinstance(data, str):
return None, None

# Split header into parts and check for base64
header_parts = header.split(";")
if "base64" not in header_parts:
raise ValueError("Data is not base64 encoded")
match = self._BASE64_DATA_URI_RE.match(data)
if not match:
return None, None

# Content type is the first part
content_type = header_parts[0]
if not content_type:
raise ValueError("Content type is empty")
try:
content_type = match.group("content_type") or "text/plain"
actual_data = match.group("data")

return base64.b64decode(actual_data), cast(MediaContentType, content_type)

except Exception as e:
self._log.error("Error parsing base64 data URI", exc_info=e)

Expand Down
121 changes: 121 additions & 0 deletions tests/test_issue_5659.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Test for issue #5659: _parse_base64_data_uri misidentifies SSE data as base64 media."""

import base64
import logging

from langfuse.media import LangfuseMedia


def _make_media():
"""Create a LangfuseMedia instance for testing _parse_base64_data_uri."""
return LangfuseMedia(
content_bytes=b"dummy", content_type="application/octet-stream"
)


def test_sse_data_is_not_parsed_as_base64(caplog):
"""Verify SSE data strings return (None, None) without error logging."""
media = _make_media()
with caplog.at_level(logging.ERROR, logger="langfuse.media"):
result = media._parse_base64_data_uri("data: {'foo': 'bar'}")

assert result == (None, None)
assert caplog.records == [], (
f"Expected no error logs, got: {[r.message for r in caplog.records]}"
)


def test_sse_data_with_json(caplog):
"""Verify SSE data with JSON payload returns (None, None) without error logging."""
media = _make_media()
with caplog.at_level(logging.ERROR, logger="langfuse.media"):
result = media._parse_base64_data_uri(
'data: {"event": "message", "data": "hello"}'
)

assert result == (None, None)
assert caplog.records == [], (
f"Expected no error logs, got: {[r.message for r in caplog.records]}"
)


def test_valid_base64_data_uri_still_works():
"""Verify a proper base64 data URI is parsed correctly."""
original_bytes = b"hello world"
encoded = base64.b64encode(original_bytes).decode("utf-8")
data_uri = f"data:text/plain;base64,{encoded}"

media = _make_media()
content_bytes, content_type = media._parse_base64_data_uri(data_uri)

assert content_bytes == original_bytes
assert content_type == "text/plain"


def test_data_uri_without_base64_returns_none(caplog):
"""Verify a data URI without ;base64 encoding returns (None, None)."""
media = _make_media()
with caplog.at_level(logging.ERROR, logger="langfuse.media"):
result = media._parse_base64_data_uri("data:text/plain,hello")

assert result == (None, None)
assert caplog.records == []


def test_empty_string_returns_none(caplog):
"""Verify an empty string returns (None, None) without error logging."""
media = _make_media()
with caplog.at_level(logging.ERROR, logger="langfuse.media"):
result = media._parse_base64_data_uri("")

assert result == (None, None)
assert caplog.records == []


def test_non_data_uri_returns_none(caplog):
"""Verify a regular string returns (None, None) without error logging."""
media = _make_media()
with caplog.at_level(logging.ERROR, logger="langfuse.media"):
result = media._parse_base64_data_uri("just a regular string")

assert result == (None, None)
assert caplog.records == []


def test_valid_image_data_uri():
"""Verify a valid image data URI parses correctly."""
pixel_bytes = b"\x89PNG\r\n"
encoded = base64.b64encode(pixel_bytes).decode("utf-8")
data_uri = f"data:image/png;base64,{encoded}"

media = _make_media()
content_bytes, content_type = media._parse_base64_data_uri(data_uri)

assert content_bytes == pixel_bytes
assert content_type == "image/png"


def test_data_uri_with_mime_params():
"""Verify a data URI with extra MIME parameters (e.g. charset) parses correctly."""
original_bytes = b"hello world"
encoded = base64.b64encode(original_bytes).decode("utf-8")
data_uri = f"data:text/plain;charset=utf-8;base64,{encoded}"

media = _make_media()
content_bytes, content_type = media._parse_base64_data_uri(data_uri)

assert content_bytes == original_bytes
assert content_type == "text/plain"


def test_data_uri_without_mime_type():
"""Verify a data URI without MIME type defaults to text/plain per RFC 2397."""
original_bytes = b"hello world"
encoded = base64.b64encode(original_bytes).decode("utf-8")
data_uri = f"data:;base64,{encoded}"

media = _make_media()
content_bytes, content_type = media._parse_base64_data_uri(data_uri)

assert content_bytes == original_bytes
assert content_type == "text/plain"