Learn how to efficiently handle bulk data operations in ThemisDB. Master batching strategies, performance optimization, and error handling for high-throughput scenarios.
- ✅ Batch insert, update, and delete operations
- ✅ Optimal batch sizing strategies
- ✅ Transaction batching patterns
- ✅ Error handling in batch operations
- ✅ Performance optimization techniques
- ✅ Streaming large datasets
Prerequisites: CRUD Tutorial
Time Required: 25 minutes
Difficulty: Intermediate
- Why Batch Operations?
- Batch Insert
- Batch Update
- Batch Delete
- Optimal Batch Sizing
- Error Handling
- Performance Optimization
Individual Operations (Bad):
# 1,000 individual inserts: ~60 seconds
for i in {1..1000}; do
curl -X PUT http://localhost:8080/entities/item:$i \
-d "{\"blob\": \"{\\\"value\\\": $i}\"}"
doneBatch Operation (Good):
# 1,000 batched inserts: ~0.6 seconds (100x faster!)
curl -X POST http://localhost:8080/batch/create \
-d '{
"entities": [/* 1000 entities */]
}'Why So Fast?
- Single network round-trip
- Single transaction
- Optimized internal processing
- Reduced lock contention
curl -X POST http://localhost:8080/batch/create \
-H "Content-Type: application/json" \
-d '{
"entities": [
{
"entity_id": "products:item-001",
"blob": "{\"name\":\"Widget A\",\"price\":10.99,\"stock\":100}"
},
{
"entity_id": "products:item-002",
"blob": "{\"name\":\"Widget B\",\"price\":15.99,\"stock\":50}"
},
{
"entity_id": "products:item-003",
"blob": "{\"name\":\"Widget C\",\"price\":12.99,\"stock\":75}"
}
]
}'Expected Output:
{
"status": "success",
"created": 3,
"failed": 0,
"results": [
{"entity": "products:item-001", "version": 1, "status": "created"},
{"entity": "products:item-002", "version": 1, "status": "created"},
{"entity": "products:item-003", "version": 1, "status": "created"}
],
"duration_ms": 45
}# Create JSON file with entities
cat > products.json << 'EOF'
{
"entities": [
{"entity_id": "products:1001", "blob": "{\"name\":\"Product 1\"}"},
{"entity_id": "products:1002", "blob": "{\"name\":\"Product 2\"}"},
{"entity_id": "products:1003", "blob": "{\"name\":\"Product 3\"}"}
]
}
EOF
# Upload batch
curl -X POST http://localhost:8080/batch/create \
-H "Content-Type: application/json" \
-d @products.jsonPython Example:
import requests
import json
def batch_insert_products(count, batch_size=100):
base_url = "http://localhost:8080"
for start in range(0, count, batch_size):
entities = []
for i in range(start, min(start + batch_size, count)):
entity = {
"entity_id": f"products:item-{i:06d}",
"blob": json.dumps({
"name": f"Product {i}",
"price": 10.0 + (i % 100),
"stock": 50 + (i % 50),
"category": f"category-{i % 10}"
})
}
entities.append(entity)
response = requests.post(
f"{base_url}/batch/create",
json={"entities": entities}
)
if response.status_code == 200:
result = response.json()
print(f"Batch {start//batch_size + 1}: Created {result['created']} entities")
else:
print(f"Error in batch {start//batch_size + 1}: {response.text}")
# Insert 10,000 products in batches of 500
batch_insert_products(10000, batch_size=500)Expected Output:
Batch 1: Created 500 entities
Batch 2: Created 500 entities
...
Batch 20: Created 500 entities
Total time: ~12 seconds (vs. 10+ minutes individually!)
curl -X POST http://localhost:8080/batch/update \
-H "Content-Type: application/json" \
-d '{
"updates": [
{
"entity_id": "products:item-001",
"attributes": {"price": 11.99, "stock": 95}
},
{
"entity_id": "products:item-002",
"attributes": {"price": 16.99, "stock": 45}
},
{
"entity_id": "products:item-003",
"attributes": {"price": 13.99, "stock": 70}
}
]
}'Expected Output:
{
"status": "success",
"updated": 3,
"failed": 0,
"results": [
{"entity": "products:item-001", "version": 2, "status": "updated"},
{"entity": "products:item-002", "version": 2, "status": "updated"},
{"entity": "products:item-003", "version": 2, "status": "updated"}
]
}# Update only if version matches (optimistic locking)
curl -X POST http://localhost:8080/batch/update \
-H "Content-Type: application/json" \
-d '{
"updates": [
{
"entity_id": "products:item-001",
"expected_version": 1,
"attributes": {"price": 12.99}
}
],
"on_conflict": "skip"
}'on_conflict options:
skip- Skip conflicting updatesabort- Abort entire batch on first conflictretry- Retry with latest version
# Update only specific fields, leave others unchanged
curl -X POST http://localhost:8080/batch/patch \
-H "Content-Type: application/json" \
-d '{
"patches": [
{
"entity_id": "products:item-001",
"attributes": {"stock": 90}
},
{
"entity_id": "products:item-002",
"attributes": {"stock": 40}
}
]
}'Python:
import requests
import json
def bulk_price_update(category, discount_percent):
"""Apply discount to all products in a category"""
base_url = "http://localhost:8080"
# First, query products in category
query_response = requests.post(
f"{base_url}/query",
json={
"table": "products",
"predicates": [{"column": "category", "value": category}],
"return": "entities"
}
)
products = query_response.json()["entities"]
# Prepare batch update
updates = []
for product in products:
data = json.loads(product["blob"])
old_price = data["price"]
new_price = old_price * (1 - discount_percent / 100)
updates.append({
"entity_id": product["entity_id"],
"attributes": {
"price": round(new_price, 2),
"discount_applied": discount_percent
}
})
# Execute batch update
update_response = requests.post(
f"{base_url}/batch/update",
json={"updates": updates}
)
result = update_response.json()
print(f"Updated {result['updated']} products with {discount_percent}% discount")
# Apply 20% discount to all electronics
bulk_price_update("electronics", 20)curl -X POST http://localhost:8080/batch/delete \
-H "Content-Type: application/json" \
-d '{
"entity_ids": [
"products:item-001",
"products:item-002",
"products:item-003"
]
}'Expected Output:
{
"status": "success",
"deleted": 3,
"failed": 0,
"results": [
{"entity": "products:item-001", "status": "deleted"},
{"entity": "products:item-002", "status": "deleted"},
{"entity": "products:item-003", "status": "deleted"}
]
}# Delete only if version matches
curl -X POST http://localhost:8080/batch/delete \
-H "Content-Type: application/json" \
-d '{
"deletes": [
{
"entity_id": "products:item-001",
"expected_version": 2
}
],
"on_conflict": "skip"
}'Python:
import requests
def delete_out_of_stock_products():
"""Delete all products with zero stock"""
base_url = "http://localhost:8080"
# Query products with stock = 0
query_response = requests.post(
f"{base_url}/query",
json={
"table": "products",
"predicates": [{"column": "stock", "value": 0}],
"return": "ids_only"
}
)
entity_ids = query_response.json()["entity_ids"]
if not entity_ids:
print("No out-of-stock products found")
return
# Batch delete
delete_response = requests.post(
f"{base_url}/batch/delete",
json={"entity_ids": entity_ids}
)
result = delete_response.json()
print(f"Deleted {result['deleted']} out-of-stock products")
delete_out_of_stock_products()Batch size affects:
- Network latency - Larger batches = fewer round-trips
- Memory usage - Larger batches = more memory
- Transaction size - Very large batches may timeout
| Operation Type | Recommended Size | Max Size |
|---|---|---|
| Create (small entities) | 500-1000 | 5000 |
| Create (large entities) | 100-500 | 1000 |
| Update | 500-1000 | 5000 |
| Delete | 1000-2000 | 10000 |
Python:
import requests
import time
def adaptive_batch_insert(entities, target_time_ms=1000):
"""Automatically adjust batch size based on performance"""
base_url = "http://localhost:8080"
batch_size = 100 # Start with small batch
i = 0
while i < len(entities):
batch = entities[i:i+batch_size]
start = time.time()
response = requests.post(
f"{base_url}/batch/create",
json={"entities": batch}
)
duration_ms = (time.time() - start) * 1000
if response.status_code == 200:
result = response.json()
print(f"Batch size {batch_size}: {result['created']} entities in {duration_ms:.0f}ms")
# Adjust batch size based on performance
if duration_ms < target_time_ms * 0.5:
batch_size = min(batch_size * 2, 5000) # Double if too fast
elif duration_ms > target_time_ms * 1.5:
batch_size = max(batch_size // 2, 10) # Halve if too slow
i += len(batch)
else:
print(f"Error: {response.text}")
batch_size = max(batch_size // 2, 10) # Reduce on error
print(f"Completed. Final batch size: {batch_size}")curl -X POST http://localhost:8080/batch/create \
-H "Content-Type: application/json" \
-d '{
"entities": [
{"entity_id": "valid:001", "blob": "{\"data\": \"value1\"}"},
{"entity_id": "valid:002", "blob": "{\"data\": \"value2\"}"},
{"entity_id": "duplicate:001", "blob": "{\"data\": \"exists\"}"}
],
"continue_on_error": true
}'Response with partial failure:
{
"status": "partial_success",
"created": 2,
"failed": 1,
"results": [
{"entity": "valid:001", "status": "created"},
{"entity": "valid:002", "status": "created"},
{"entity": "duplicate:001", "status": "error", "error": "Entity already exists"}
]
}Python:
import requests
import time
from typing import List, Dict
def batch_create_with_retry(entities: List[Dict], max_retries=3):
"""Batch create with exponential backoff retry"""
base_url = "http://localhost:8080"
failed_entities = entities.copy()
for attempt in range(max_retries):
if not failed_entities:
break
response = requests.post(
f"{base_url}/batch/create",
json={"entities": failed_entities, "continue_on_error": true}
)
if response.status_code == 200:
result = response.json()
print(f"Attempt {attempt + 1}: Created {result['created']}, Failed {result['failed']}")
# Collect failed entities for retry
failed_entities = [
entity for i, entity in enumerate(failed_entities)
if result['results'][i]['status'] == 'error'
]
if failed_entities and attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Retrying {len(failed_entities)} failed entities in {wait_time}s...")
time.sleep(wait_time)
else:
print(f"HTTP Error: {response.status_code}")
break
return len(failed_entities) == 0
# Usage
success = batch_create_with_retry(entities)
if success:
print("All entities created successfully!")
else:
print(f"Failed to create {len(failed_entities)} entities after retries")# Batch operation in transaction (all or nothing)
tx_response=$(curl -X POST http://localhost:8080/tx/begin)
tx_id=$(echo $tx_response | jq -r '.tx_id')
curl -X POST http://localhost:8080/tx/$tx_id/batch/create \
-d '{
"entities": [/* large batch */]
}'
# Commit or rollback
if [ $? -eq 0 ]; then
curl -X POST http://localhost:8080/tx/$tx_id/commit
else
curl -X POST http://localhost:8080/tx/$tx_id/rollback
fi# Enable gzip compression for large batches
curl -X POST http://localhost:8080/batch/create \
-H "Content-Type: application/json" \
-H "Content-Encoding: gzip" \
-H "Accept-Encoding: gzip" \
--data-binary @products.json.gzSavings: 70-90% bandwidth reduction!
Python with threading:
import requests
import concurrent.futures
from typing import List
def process_batch(batch: List, batch_num: int):
"""Process a single batch"""
response = requests.post(
"http://localhost:8080/batch/create",
json={"entities": batch}
)
return batch_num, response.json()
def parallel_batch_insert(all_entities: List, batch_size=500, workers=4):
"""Process multiple batches in parallel"""
batches = [
all_entities[i:i+batch_size]
for i in range(0, len(all_entities), batch_size)
]
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = [
executor.submit(process_batch, batch, i)
for i, batch in enumerate(batches)
]
for future in concurrent.futures.as_completed(futures):
batch_num, result = future.result()
print(f"Batch {batch_num}: {result['created']} entities created")
# Process 10,000 entities with 4 parallel workers
parallel_batch_insert(entities, batch_size=500, workers=4)Performance: 3-4x faster on multi-core systems!
# Stream large CSV to batch inserts
cat large_dataset.csv | while IFS=, read -r id name price; do
echo "{\"entity_id\":\"products:$id\",\"blob\":\"{\\\"name\\\":\\\"$name\\\",\\\"price\\\":$price}\"}"
done | jq -s '{entities: .}' | \
curl -X POST http://localhost:8080/batch/create \
-H "Content-Type: application/json" \
-d @-# Check batch operation metrics
curl http://localhost:4318/metrics | grep batch_operations
# Example output:
# batch_operations_total{operation="create"} 150
# batch_operations_duration_ms{operation="create",p99="850"}
# batch_operations_size_avg{operation="create"} 512-
Use batch operations for bulk data
# Good: Batch 1000 entities batch_insert(entities)
-
Choose appropriate batch sizes
# 500-1000 for most use cases batch_size = 500
-
Handle partial failures gracefully
response.json()["failed"] > 0 # Check for failures
-
Use compression for large batches
-H "Content-Encoding: gzip" -
Monitor performance metrics
curl /metrics | grep batch_
-
Don't use individual ops for bulk data
# Bad: 1000 individual requests for item in items: create_entity(item)
-
Don't use excessive batch sizes
# Bad: 100,000 entities in one batch batch_size = 100000
-
Don't ignore errors
# Bad: Assuming all succeeded batch_create(entities)
-
Don't retry infinitely
# Bad: Infinite retry loop while True: try_again()
| Operation | Individual (ops/sec) | Batch (ops/sec) | Speedup |
|---|---|---|---|
| Insert | 150 | 15,000 | 100x |
| Update | 200 | 18,000 | 90x |
| Delete | 300 | 25,000 | 83x |
Benchmarked on: 8-core CPU, SSD, 1KB entities
- ✅ Batch operations are 50-100x faster
- ✅ Optimal batch sizes: 500-1000 for most cases
- ✅ Always handle partial failures
- ✅ Use parallel processing for massive datasets
- ✅ Monitor performance metrics
- Schema Design Tutorial - Design for batch efficiency
- Best Practices Guide - Production patterns
- Try Examples: IoT Sensor Network
Questions? See FAQ or ask in Discussions