-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Description
import asyncio
from odmantic import AIOEngine, Model
from motor.motor_asyncio import AsyncIOMotorClient
class Item(Model):
name: str
async def main():
client = AsyncIOMotorClient("mongodb://localhost:27017/?replicaSet=rs0")
engine = AIOEngine(motor_client=client, database="mydb")
# Get the underlying Motor collection for your model
coll = engine.get_collection(Item)
pipeline = [
{"$match": {"operationType": {"$in": ["insert", "update", "replace", "delete"]}}}
]
# full_document="updateLookup" fetches the post-update document for updates
async with coll.watch(pipeline, full_document="updateLookup") as stream:
async for change in stream:
print("op:", change["operationType"], "key:", change["documentKey"])
# If you want the updated document (for insert/replace and updateLookup):
doc = change.get("fullDocument")
if doc:
# Turn the raw dict into an ODMantic model instance
item = Item.model_validate(doc)
print("as model:", item)
asyncio.run(main())
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels