Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 28 additions & 25 deletions docs/core/map.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,20 @@ def square(context: DurableContext, item: int, index: int, items: list[int]) ->
return item * item

@durable_execution
def handler(event: dict, context: DurableContext) -> BatchResult[int]:
def handler(event: dict, context: DurableContext) -> dict:
"""Process a list of items using map operations."""
items = [1, 2, 3, 4, 5]

result = context.map(items, square)
return result
# Convert to dict for JSON serialization (BatchResult is not JSON serializable)
return result.to_dict()
```

When this function runs:
1. Each item is processed in parallel
2. The `square` function is called for each item
3. Each result is checkpointed independently
4. The function returns a `BatchResult` with results `[1, 4, 9, 16, 25]`
4. The function returns a dict with results `[1, 4, 9, 16, 25]`

If the function is interrupted after processing items 0-2, it resumes at item 3 without reprocessing the first three items.

Expand Down Expand Up @@ -165,10 +166,10 @@ def validate_email(
}

@durable_execution
def handler(event: dict, context: DurableContext) -> BatchResult[dict]:
def handler(event: dict, context: DurableContext) -> dict:
emails = ["jane_doe@example.com", "john_doe@example.org", "invalid"]
result = context.map(emails, validate_email)
return result
return result.to_dict()
```

[↑ Back to top](#table-of-contents)
Expand All @@ -194,18 +195,18 @@ def process_item(context: DurableContext, item: int, index: int, items: list[int
return {"item": item, "squared": item * item}

@durable_execution
def handler(event: dict, context: DurableContext) -> BatchResult[dict]:
def handler(event: dict, context: DurableContext) -> dict:
items = list(range(100))

# Configure map operation
config = MapConfig(
max_concurrency=10, # Process 10 items at a time
item_batcher=ItemBatcher(max_items_per_batch=5), # Batch 5 items together
completion_config=CompletionConfig.all_successful(), # Require all to succeed
)

result = context.map(items, process_item, name="process_numbers", config=config)
return result
return result.to_dict()
```

### MapConfig parameters
Expand Down Expand Up @@ -244,14 +245,14 @@ def fetch_data(context: DurableContext, url: str, index: int, urls: list[str]) -
return {"url": url, "data": "..."}

@durable_execution
def handler(event: dict, context: DurableContext) -> BatchResult[dict]:
def handler(event: dict, context: DurableContext) -> dict:
urls = [f"https://example.com/api/{i}" for i in range(100)]

# Process only 5 URLs at a time
config = MapConfig(max_concurrency=5)

result = context.map(urls, fetch_data, config=config)
return result
return result.to_dict()
```

### Batching items
Expand All @@ -272,16 +273,16 @@ def process_batch(
return [{"item": item, "squared": item * item} for item in batch.items]

@durable_execution
def handler(event: dict, context: DurableContext) -> BatchResult[list[dict]]:
def handler(event: dict, context: DurableContext) -> dict:
items = list(range(100))

# Process items in batches of 10
config = MapConfig(
item_batcher=ItemBatcher(max_items_per_batch=10)
)

result = context.map(items, process_batch, config=config)
return result
return result.to_dict()
```

### Custom completion criteria
Expand All @@ -299,19 +300,19 @@ def process_item(context: DurableContext, item: int, index: int, items: list[int
return {"item": item, "processed": True}

@durable_execution
def handler(event: dict, context: DurableContext) -> BatchResult[dict]:
def handler(event: dict, context: DurableContext) -> dict:
items = list(range(20))

# Succeed if at least 15 items succeed, fail after 5 failures
config = MapConfig(
completion_config=CompletionConfig(
min_successful=15,
tolerated_failure_count=5,
)
)

result = context.map(items, process_item, config=config)
return result
return result.to_dict()
```

### Using context operations in map functions
Expand Down Expand Up @@ -344,11 +345,13 @@ def process_user(
return {"user_id": user_id, "notification_sent": notification["sent"]}

@durable_execution
def handler(event: dict, context: DurableContext) -> BatchResult[dict]:
def handler(event: dict, context: DurableContext) -> dict:
"""Process multiple users using context operations within map functions."""
user_ids = ["user_1", "user_2", "user_3"]

result = context.map(user_ids, process_user)
return result
# Convert to dict for JSON serialization (BatchResult is not JSON serializable)
return result.to_dict()
```

### Filtering and transforming results
Expand Down