Skip to content

Commit 29044df

Browse files
committed
fix(memory): paginate list_messages by converted message count, not raw event count
1 parent f8710fa commit 29044df

4 files changed

Lines changed: 219 additions & 91 deletions

File tree

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
"""Reusable pagination utility for fetching and converting paginated results."""
2+
3+
from typing import Any, Callable, TypeVar
4+
5+
T = TypeVar("T")
6+
R = TypeVar("R")
7+
8+
DEFAULT_PAGE_SIZE = 100
9+
10+
11+
def paginate_for_n_results(
12+
fetch_page: Callable[[dict[str, Any]], tuple[list[R], str | None]],
13+
initial_params: dict[str, Any],
14+
converter: Callable[[list[R]], list[T]],
15+
target_count: int,
16+
) -> list[T]:
17+
"""Paginate an arbitrary API, converting accumulated results after each page.
18+
19+
The full accumulated list is re-converted after each page rather than converting
20+
per-page, because some converters (e.g. events_to_messages) iterate the input in
21+
reverse — converting per-page would produce incorrect ordering.
22+
23+
Args:
24+
fetch_page: Takes params dict, returns (items, next_token). next_token is None when exhausted.
25+
initial_params: Base params for the first call. "nextToken" is added for subsequent pages.
26+
converter: Converts accumulated raw items to desired output type.
27+
target_count: Stop after collecting this many converted items.
28+
"""
29+
all_items: list[R] = []
30+
next_token: str | None = None
31+
32+
while True:
33+
params = {**initial_params}
34+
if next_token is not None:
35+
params["nextToken"] = next_token
36+
37+
items, next_token = fetch_page(params)
38+
all_items.extend(items)
39+
40+
converted = converter(all_items)
41+
if len(converted) >= target_count or next_token is None:
42+
return converted[:target_count]

src/bedrock_agentcore/memory/integrations/strands/session_manager.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from strands.types.session import Session, SessionAgent, SessionMessage
2020
from typing_extensions import override
2121

22+
from bedrock_agentcore._utils.pagination import DEFAULT_PAGE_SIZE, paginate_for_n_results
2223
from bedrock_agentcore.memory.client import MemoryClient
2324
from bedrock_agentcore.memory.models.filters import (
2425
EventMetadataFilter,
@@ -596,15 +597,39 @@ def list_messages(
596597
raise SessionException(f"Session ID mismatch: expected {self.config.session_id}, got {session_id}")
597598

598599
try:
599-
max_results = (limit + offset) if limit else MAX_FETCH_ALL_RESULTS
600600

601-
events = self.memory_client.list_events(
602-
memory_id=self.config.memory_id,
603-
actor_id=self.config.actor_id,
604-
session_id=session_id,
605-
max_results=max_results,
606-
)
607-
messages = self.converter.events_to_messages(events)
601+
def fetch_page(params: dict) -> tuple[list, str | None]:
602+
response = self.memory_client.gmdp_client.list_events(**params)
603+
return response.get("events", []), response.get("nextToken")
604+
605+
initial_params = {
606+
"memoryId": self.config.memory_id,
607+
"actorId": self.config.actor_id,
608+
"sessionId": session_id,
609+
"maxResults": DEFAULT_PAGE_SIZE,
610+
"includePayloads": True,
611+
}
612+
613+
if limit is not None:
614+
# Paginate until we have enough converted messages, so state
615+
# events don't consume slots meant for conversational messages.
616+
target = limit + offset
617+
messages = paginate_for_n_results(
618+
fetch_page=fetch_page,
619+
initial_params=initial_params,
620+
converter=self.converter.events_to_messages,
621+
target_count=target,
622+
)
623+
else:
624+
# No limit — fetch all events in one pass, convert once.
625+
events = self.memory_client.list_events(
626+
memory_id=self.config.memory_id,
627+
actor_id=self.config.actor_id,
628+
session_id=session_id,
629+
max_results=MAX_FETCH_ALL_RESULTS,
630+
)
631+
messages = self.converter.events_to_messages(events)
632+
608633
if self.config.filter_restored_tool_context:
609634
messages = self._filter_restored_tool_context(messages)
610635
if limit is not None:

tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py

Lines changed: 118 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -302,34 +302,18 @@ def test_create_message(self, session_manager, mock_memory_client):
302302

303303
def test_list_messages(self, session_manager, mock_memory_client):
304304
"""Test listing messages."""
305+
user_text = '{"message": {"role": "user", "content": [{"text": "Hello"}]}, "message_id": 1}'
306+
asst_text = '{"message": {"role": "assistant", "content": [{"text": "Hi there"}]}, "message_id": 2}'
305307
mock_memory_client.list_events.return_value = [
306308
{
307309
"eventId": "event-1",
308310
"eventTimestamp": "2024-01-01T12:00:00Z",
309-
"payload": [
310-
{
311-
"conversational": {
312-
"content": {
313-
"text": '{"message": {"role": "user", "content": [{"text": "Hello"}]}, "message_id": 1}'
314-
},
315-
"role": "USER",
316-
}
317-
}
318-
],
311+
"payload": [{"conversational": {"content": {"text": user_text}, "role": "USER"}}],
319312
},
320313
{
321314
"eventId": "event-2",
322315
"eventTimestamp": "2024-01-01T12:00:00Z",
323-
"payload": [
324-
{
325-
"conversational": {
326-
"content": {
327-
"text": '{"message": {"role": "assistant", "content": [{"text": "Hi there"}]}, "message_id": 2}' # noqa E501
328-
},
329-
"role": "ASSISTANT",
330-
}
331-
}
332-
],
316+
"payload": [{"conversational": {"content": {"text": asst_text}, "role": "ASSISTANT"}}],
333317
},
334318
]
335319

@@ -341,34 +325,18 @@ def test_list_messages(self, session_manager, mock_memory_client):
341325

342326
def test_list_messages_returns_values_in_correct_reverse_order(self, session_manager, mock_memory_client):
343327
"""Test listing messages."""
328+
user_text = '{"message": {"role": "user", "content": [{"text": "Hello"}]}, "message_id": 1}'
329+
asst_text = '{"message": {"role": "assistant", "content": [{"text": "Hi there"}]}, "message_id": 2}'
344330
mock_memory_client.list_events.return_value = [
345331
{
346332
"eventId": "event-1",
347333
"eventTimestamp": "2024-01-01T12:00:00Z",
348-
"payload": [
349-
{
350-
"conversational": {
351-
"content": {
352-
"text": '{"message": {"role": "user", "content": [{"text": "Hello"}]}, "message_id": 1}'
353-
},
354-
"role": "USER",
355-
}
356-
}
357-
],
334+
"payload": [{"conversational": {"content": {"text": user_text}, "role": "USER"}}],
358335
},
359336
{
360337
"eventId": "event-2",
361338
"eventTimestamp": "2024-01-01T12:00:00Z",
362-
"payload": [
363-
{
364-
"conversational": {
365-
"content": {
366-
"text": '{"message": {"role": "assistant", "content": [{"text": "Hi there"}]}, "message_id": 2}' # noqa E501
367-
},
368-
"role": "ASSISTANT",
369-
}
370-
}
371-
],
339+
"payload": [{"conversational": {"content": {"text": asst_text}, "role": "ASSISTANT"}}],
372340
},
373341
]
374342

@@ -508,37 +476,39 @@ def test_update_message_wrong_session(self, session_manager):
508476

509477
def test_list_messages_with_limit(self, session_manager, mock_memory_client):
510478
"""Test listing messages with limit."""
511-
mock_memory_client.list_events.return_value = [
512-
{
513-
"eventId": "event-1",
514-
"eventTimestamp": "2024-01-01T12:00:00Z",
515-
"payload": [
516-
{
517-
"conversational": {
518-
"content": {
519-
"text": '{"message": {"role": "user", '
520-
'"content": [{"text": "Message 1"}]}, "message_id": 1}'
521-
},
522-
"role": "USER",
479+
session_manager.memory_client.gmdp_client.list_events.return_value = {
480+
"events": [
481+
{
482+
"eventId": "event-1",
483+
"eventTimestamp": "2024-01-01T12:00:00Z",
484+
"payload": [
485+
{
486+
"conversational": {
487+
"content": {
488+
"text": '{"message": {"role": "user", '
489+
'"content": [{"text": "Message 1"}]}, "message_id": 1}'
490+
},
491+
"role": "USER",
492+
}
523493
}
524-
}
525-
],
526-
},
527-
{
528-
"eventId": "event-2",
529-
"eventTimestamp": "2024-01-01T12:00:00Z",
530-
"payload": [
531-
{
532-
"conversational": {
533-
"content": {
534-
"text": '{"message": {"role": "assistant", "content": [{"text": "Message 2"}]}, "message_id": 2}' # noqa E501
535-
},
536-
"role": "ASSISTANT",
494+
],
495+
},
496+
{
497+
"eventId": "event-2",
498+
"eventTimestamp": "2024-01-01T12:00:00Z",
499+
"payload": [
500+
{
501+
"conversational": {
502+
"content": {
503+
"text": '{"message": {"role": "assistant", "content": [{"text": "Message 2"}]}, "message_id": 2}' # noqa E501
504+
},
505+
"role": "ASSISTANT",
506+
}
537507
}
538-
}
539-
],
540-
},
541-
]
508+
],
509+
},
510+
]
511+
}
542512

543513
messages = session_manager.list_messages("test-session-456", "test-agent-123", limit=1, offset=1)
544514

@@ -1189,25 +1159,90 @@ def test_retrieve_customer_context_filters_by_relevance_score(self, mock_memory_
11891159
assert "Low relevance 1" not in injected_context
11901160
assert "Low relevance 2" not in injected_context
11911161

1192-
def test_list_messages_default_max_results(self, session_manager, mock_memory_client):
1193-
"""Test listing messages without limit uses default max_results=10000."""
1194-
mock_memory_client.list_events.return_value = []
1162+
def test_list_messages_with_limit_skips_state_events(self, session_manager, mock_memory_client):
1163+
"""list_messages with limit returns exactly limit messages even when state events are mixed in.
1164+
1165+
State events (session/agent blobs) share the same actorId as conversational events
1166+
after the metadata-based identification change. If list_messages counts raw events
1167+
toward the limit, state events consume slots and the caller gets fewer messages
1168+
than requested.
1169+
"""
1170+
1171+
def _conv_event(eid, text, role):
1172+
return {
1173+
"eventId": eid,
1174+
"payload": [
1175+
{
1176+
"conversational": {
1177+
"content": {
1178+
"text": f'{{"message": {{"role": "{role}", '
1179+
f'"content": [{{"text": "{text}"}}]}}, "message_id": {eid}}}'
1180+
},
1181+
"role": role.upper(),
1182+
}
1183+
}
1184+
],
1185+
}
11951186

1196-
session_manager.list_messages("test-session-456", "test-agent-123")
1187+
def _state_event(eid):
1188+
return {
1189+
"eventId": eid,
1190+
"payload": [{"blob": '{"session_id": "s", "session_type": "AGENT"}'}],
1191+
"metadata": {"stateType": {"stringValue": "SESSION"}},
1192+
}
11971193

1198-
mock_memory_client.list_events.assert_called_once()
1199-
call_kwargs = mock_memory_client.list_events.call_args[1]
1200-
assert call_kwargs["max_results"] == 10000
1194+
# Page 1: 2 state + 3 conversational (5 raw events, only 3 convert to messages)
1195+
# Page 2: 3 more conversational
1196+
page1 = [
1197+
_state_event("s1"),
1198+
_conv_event(1, "Hello", "user"),
1199+
_conv_event(2, "Hi", "assistant"),
1200+
_state_event("s2"),
1201+
_conv_event(3, "How are you?", "user"),
1202+
]
1203+
page2 = [
1204+
_conv_event(4, "Good", "assistant"),
1205+
_conv_event(5, "Great", "user"),
1206+
_conv_event(6, "Thanks", "assistant"),
1207+
]
12011208

1202-
def test_list_messages_with_limit_calculates_max_results(self, session_manager, mock_memory_client):
1203-
"""Test listing messages with limit calculates max_results correctly."""
1204-
mock_memory_client.list_events.return_value = []
1209+
mock_gmdp = session_manager.memory_client.gmdp_client
1210+
mock_gmdp.list_events.side_effect = [
1211+
{"events": page1, "nextToken": "tok"},
1212+
{"events": page2},
1213+
]
1214+
1215+
messages = session_manager.list_messages("test-session-456", "test-agent-123", limit=5)
12051216

1206-
session_manager.list_messages("test-session-456", "test-agent-123", limit=500, offset=50)
1217+
assert len(messages) == 5
12071218

1208-
mock_memory_client.list_events.assert_called_once()
1209-
call_kwargs = mock_memory_client.list_events.call_args[1]
1210-
assert call_kwargs["max_results"] == 550 # limit + offset
1219+
def test_list_messages_with_limit_returns_fewer_when_not_enough(self, session_manager, mock_memory_client):
1220+
"""list_messages returns all available messages when fewer than limit exist."""
1221+
1222+
def _conv_event(eid, text, role):
1223+
return {
1224+
"eventId": eid,
1225+
"payload": [
1226+
{
1227+
"conversational": {
1228+
"content": {
1229+
"text": f'{{"message": {{"role": "{role}", '
1230+
f'"content": [{{"text": "{text}"}}]}}, "message_id": {eid}}}'
1231+
},
1232+
"role": role.upper(),
1233+
}
1234+
}
1235+
],
1236+
}
1237+
1238+
mock_gmdp = session_manager.memory_client.gmdp_client
1239+
mock_gmdp.list_events.return_value = {
1240+
"events": [_conv_event(1, "Hello", "user"), _conv_event(2, "Hi", "assistant")]
1241+
}
1242+
1243+
messages = session_manager.list_messages("test-session-456", "test-agent-123", limit=10)
1244+
1245+
assert len(messages) == 2
12111246

12121247
def test_append_message_handles_none_from_create_message(self, session_manager, test_agent):
12131248
"""Test that append_message gracefully handles None return from create_message."""

tests_integ/memory/integrations/test_session_manager.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,3 +375,29 @@ def test_agent_multi_turn_with_batching(self, test_memory_stm):
375375
assert len(messages) >= 6
376376

377377
# endregion End-to-end agent with batching tests
378+
379+
def test_list_messages_limit_excludes_state_events(self, test_memory_stm, memory_client):
380+
"""list_messages with limit returns the requested count even when state events are present.
381+
382+
https://github.com/aws/bedrock-agentcore-sdk-python/pull/244 changed state events
383+
(session/agent blobs) to share the same actorId as conversational events, distinguished
384+
only by metadata. If list_messages counts raw events toward the limit, state events
385+
consume slots and the caller gets fewer messages than requested.
386+
"""
387+
session_id = f"test-pagination-{uuid.uuid4().hex[:8]}"
388+
actor_id = f"test-actor-{uuid.uuid4().hex[:8]}"
389+
390+
config = AgentCoreMemoryConfig(
391+
memory_id=test_memory_stm["id"],
392+
session_id=session_id,
393+
actor_id=actor_id,
394+
)
395+
sm = AgentCoreMemorySessionManager(agentcore_memory_config=config, region_name=REGION)
396+
397+
agent = Agent(system_prompt="You are a helpful assistant.", session_manager=sm)
398+
agent("Remember the number 42")
399+
agent("Remember the color blue")
400+
agent("Remember the city Paris")
401+
402+
messages = sm.list_messages(session_id, agent.agent_id, limit=4)
403+
assert len(messages) == 4

0 commit comments

Comments
 (0)