Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 221 additions & 0 deletions test/asynchronous/test_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,227 @@ async def test_large_inserts_unordered(self):
self.assertEqual(6, result.inserted_count)
self.assertEqual(6, await self.coll.count_documents({}))

async def test_bulk_write_with_comment(self):
"""Test bulk write operations with comment parameter."""
requests = [
InsertOne({"x": 1}),
UpdateOne({"x": 1}, {"$set": {"y": 1}}),
DeleteOne({"x": 1}),
]
result = await self.coll.bulk_write(requests, comment="bulk_comment")
self.assertEqual(1, result.inserted_count)
self.assertEqual(1, result.modified_count)
self.assertEqual(1, result.deleted_count)

async def test_bulk_write_with_let(self):
"""Test bulk write operations with let parameter."""
if not async_client_context.version.at_least(5, 0):
self.skipTest("let parameter requires MongoDB 5.0+")

await self.coll.insert_one({"x": 1})
requests = [
UpdateOne({"$expr": {"$eq": ["$x", "$$targetVal"]}}, {"$set": {"updated": True}}),
]
result = await self.coll.bulk_write(requests, let={"targetVal": 1})
self.assertEqual(1, result.modified_count)

async def test_bulk_write_all_operation_types(self):
"""Test bulk write with all operation types combined."""
await self.coll.insert_many([{"x": i} for i in range(5)])

requests = [
InsertOne({"x": 100}),
UpdateOne({"x": 0}, {"$set": {"updated": True}}),
UpdateMany({"x": {"$lte": 2}}, {"$set": {"batch_updated": True}}),
ReplaceOne({"x": 3}, {"x": 3, "replaced": True}),
DeleteOne({"x": 4}),
DeleteMany({"x": {"$gt": 50}}),
]
result = await self.coll.bulk_write(requests)

self.assertEqual(1, result.inserted_count)
self.assertGreaterEqual(result.modified_count, 1)
self.assertGreaterEqual(result.deleted_count, 1)

async def test_bulk_write_unordered(self):
"""Test unordered bulk write continues after error."""
await self.coll.create_index([("x", 1)], unique=True)
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])

requests = [
InsertOne({"x": 1}),
InsertOne({"x": 1}), # Duplicate - will error
InsertOne({"x": 2}),
InsertOne({"x": 3}),
]

with self.assertRaises(BulkWriteError) as ctx:
await self.coll.bulk_write(requests, ordered=False)

# With unordered, should have inserted 3 documents
self.assertEqual(3, ctx.exception.details["nInserted"])

async def test_bulk_write_ordered(self):
"""Test ordered bulk write stops on first error."""
await self.coll.create_index([("x", 1)], unique=True)
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])

requests = [
InsertOne({"x": 1}),
InsertOne({"x": 1}), # Duplicate - will error
InsertOne({"x": 2}),
InsertOne({"x": 3}),
]

with self.assertRaises(BulkWriteError) as ctx:
await self.coll.bulk_write(requests, ordered=True)

# With ordered, should have inserted only 1 document
self.assertEqual(1, ctx.exception.details["nInserted"])

async def test_bulk_write_bypass_document_validation(self):
"""Test bulk write with bypass_document_validation."""
if not async_client_context.version.at_least(3, 2):
self.skipTest("bypass_document_validation requires MongoDB 3.2+")

# Create collection with validator
await self.coll.drop()
await self.db.create_collection(
self.coll.name, validator={"$jsonSchema": {"required": ["name"]}}
)

# Without bypass, should fail
with self.assertRaises(BulkWriteError):
await self.coll.bulk_write([InsertOne({"x": 1})])

# With bypass, should succeed
result = await self.coll.bulk_write([InsertOne({"x": 1})], bypass_document_validation=True)
self.assertEqual(1, result.inserted_count)

async def test_bulk_write_result_properties(self):
"""Test all BulkWriteResult properties."""
await self.coll.insert_one({"x": 1})

requests = [
InsertOne({"x": 2}),
UpdateOne({"x": 1}, {"$set": {"updated": True}}),
ReplaceOne({"x": 2}, {"x": 2, "replaced": True}, upsert=True),
DeleteOne({"x": 1}),
]
result = await self.coll.bulk_write(requests)

# Check all properties
self.assertTrue(result.acknowledged)
self.assertEqual(1, result.inserted_count)
self.assertGreaterEqual(result.matched_count, 0)
self.assertGreaterEqual(result.modified_count, 0)
self.assertEqual(1, result.deleted_count)
self.assertIsInstance(result.upserted_count, int)
self.assertIsInstance(result.upserted_ids, dict)

async def test_bulk_write_with_upsert(self):
"""Test bulk write upsert operations."""
requests = [
UpdateOne({"x": 1}, {"$set": {"y": 1}}, upsert=True),
UpdateOne({"x": 2}, {"$set": {"y": 2}}, upsert=True),
ReplaceOne({"x": 3}, {"x": 3, "y": 3}, upsert=True),
]
result = await self.coll.bulk_write(requests)

self.assertEqual(3, result.upserted_count)
self.assertEqual(3, len(result.upserted_ids))

async def test_update_one_with_hint(self):
"""Test UpdateOne with hint parameter."""
await self.coll.create_index([("x", 1)])
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])

await self.coll.insert_one({"x": 1})

requests = [UpdateOne({"x": 1}, {"$set": {"y": 1}}, hint=[("x", 1)])]
result = await self.coll.bulk_write(requests)
self.assertEqual(1, result.modified_count)

async def test_update_many_with_hint(self):
"""Test UpdateMany with hint parameter."""
await self.coll.create_index([("x", 1)])
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])

await self.coll.insert_many([{"x": 1}, {"x": 1}])

requests = [UpdateMany({"x": 1}, {"$set": {"y": 1}}, hint=[("x", 1)])]
result = await self.coll.bulk_write(requests)
self.assertEqual(2, result.modified_count)

async def test_delete_one_with_hint(self):
"""Test DeleteOne with hint parameter."""
await self.coll.create_index([("x", 1)])
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])

await self.coll.insert_one({"x": 1})

requests = [DeleteOne({"x": 1}, hint=[("x", 1)])]
result = await self.coll.bulk_write(requests)
self.assertEqual(1, result.deleted_count)

async def test_delete_many_with_hint(self):
"""Test DeleteMany with hint parameter."""
await self.coll.create_index([("x", 1)])
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])

await self.coll.insert_many([{"x": 1}, {"x": 1}])

requests = [DeleteMany({"x": 1}, hint=[("x", 1)])]
result = await self.coll.bulk_write(requests)
self.assertEqual(2, result.deleted_count)

async def test_update_one_with_array_filters(self):
"""Test UpdateOne with array_filters parameter."""
await self.coll.insert_one({"x": [{"y": 1}, {"y": 2}, {"y": 3}]})

requests = [
UpdateOne({}, {"$set": {"x.$[elem].z": 1}}, array_filters=[{"elem.y": {"$gt": 1}}])
]
result = await self.coll.bulk_write(requests)
self.assertEqual(1, result.modified_count)

doc = await self.coll.find_one()
# Elements with y > 1 should have z = 1
for elem in doc["x"]:
if elem["y"] > 1:
self.assertEqual(1, elem.get("z"))

async def test_replace_one_with_hint(self):
"""Test ReplaceOne with hint parameter."""
await self.coll.create_index([("x", 1)])
self.addAsyncCleanup(self.coll.drop_index, [("x", 1)])

await self.coll.insert_one({"x": 1})

requests = [ReplaceOne({"x": 1}, {"x": 1, "replaced": True}, hint=[("x", 1)])]
result = await self.coll.bulk_write(requests)
self.assertEqual(1, result.modified_count)

async def test_update_with_collation(self):
"""Test update operations with collation."""
await self.coll.insert_many(
[
{"name": "cafe"},
{"name": "Cafe"},
]
)

requests = [
UpdateMany(
{"name": "cafe"},
{"$set": {"updated": True}},
collation={"locale": "en", "strength": 2},
)
]
result = await self.coll.bulk_write(requests)
# With case-insensitive collation, both docs should match
self.assertEqual(2, result.modified_count)


class AsyncBulkAuthorizationTestBase(AsyncBulkTestBase):
@async_client_context.require_auth
Expand Down
117 changes: 117 additions & 0 deletions test/asynchronous/test_change_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1152,5 +1152,122 @@ def asyncTearDown(self):
)


class AsyncTestChangeStreamCoverage(TestAsyncCollectionAsyncChangeStream):
"""Additional tests to improve code coverage for AsyncChangeStream."""

async def test_change_stream_alive_property(self):
"""Test alive property state transitions."""
async with await self.change_stream() as cs:
self.assertTrue(cs.alive)
# After context exit, should be closed
self.assertFalse(cs.alive)

async def test_change_stream_idempotent_close(self):
"""Test that close() can be called multiple times safely."""
cs = await self.change_stream()
await cs.close()
# Second close should not raise
await cs.close()
self.assertFalse(cs.alive)

async def test_change_stream_resume_token_deepcopy(self):
"""Test that resume_token returns a deep copy."""
coll = self.watched_collection()
async with await self.change_stream() as cs:
await coll.insert_one({"x": 1})
await anext(cs) # Consume the change event
token1 = cs.resume_token
token2 = cs.resume_token
# Should be equal but different objects
self.assertEqual(token1, token2)
self.assertIsNot(token1, token2)

async def test_change_stream_with_comment(self):
"""Test change stream with comment parameter."""
client, listener = await self.client_with_listener("aggregate")
try:
async with await self.change_stream_with_client(client, comment="test_comment"):
pass
finally:
await client.close()

# Check that comment was in the aggregate command
self.assertGreater(len(listener.started_events), 0)
cmd = listener.started_events[0].command
self.assertEqual("test_comment", cmd.get("comment"))

async def test_change_stream_with_show_expanded_events(self):
"""Test change stream with show_expanded_events parameter."""
if not async_client_context.version.at_least(6, 0):
self.skipTest("show_expanded_events requires MongoDB 6.0+")

async with await self.change_stream(show_expanded_events=True) as cs:
# Just verify it doesn't error
self.assertTrue(cs.alive)

@async_client_context.require_version_min(6, 0)
async def test_change_stream_with_full_document_before_change(self):
"""Test change stream with full_document_before_change parameter."""
coll = self.watched_collection()
# Need to ensure collection exists with changeStreamPreAndPostImages enabled
await coll.drop()
await self.db.create_collection(coll.name, changeStreamPreAndPostImages={"enabled": True})
await coll.insert_one({"x": 1})

async with await self.change_stream(full_document_before_change="whenAvailable") as cs:
await coll.update_one({"x": 1}, {"$set": {"x": 2}})
change = await anext(cs)
self.assertEqual("update", change["operationType"])
# fullDocumentBeforeChange should be present
self.assertIn("fullDocumentBeforeChange", change)

async def test_change_stream_next_after_close(self):
"""Test that next() on closed stream raises StopAsyncIteration."""
cs = await self.change_stream()
await cs.close()
with self.assertRaises(StopAsyncIteration):
await anext(cs)

async def test_change_stream_try_next_after_close(self):
"""Test that try_next() on closed stream raises StopAsyncIteration."""
cs = await self.change_stream()
await cs.close()
with self.assertRaises(StopAsyncIteration):
await cs.try_next()

async def test_change_stream_pipeline_construction(self):
"""Test change stream pipeline is properly constructed."""
pipeline = [{"$match": {"operationType": "insert"}}]
client, listener = await self.client_with_listener("aggregate")
try:
async with await self.change_stream_with_client(client, pipeline=pipeline):
pass
finally:
await client.close()

cmd = listener.started_events[0].command
agg_pipeline = cmd["pipeline"]
# First stage should be $changeStream
self.assertIn("$changeStream", agg_pipeline[0])
# Second stage should be our match
self.assertEqual({"$match": {"operationType": "insert"}}, agg_pipeline[1])

async def test_change_stream_empty_pipeline(self):
"""Test change stream with empty pipeline."""
async with await self.change_stream(pipeline=[]) as cs:
self.assertTrue(cs.alive)

async def test_change_stream_context_manager_exception(self):
"""Test change stream context manager closes on exception."""
cs = None
try:
async with await self.change_stream() as cs:
raise ValueError("test exception")
except ValueError:
pass
# Stream should be closed
self.assertFalse(cs.alive)


if __name__ == "__main__":
unittest.main()
Loading
Loading