-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcollection_binding.py
More file actions
44 lines (33 loc) · 1.24 KB
/
collection_binding.py
File metadata and controls
44 lines (33 loc) · 1.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
from datetime import datetime
from typing import Optional
import mongojet
import msgspec
import mongospec
from mongospec import IndexModel, MongoDocument
class User(MongoDocument):
__collection_name__ = "users"
__indexes__ = [IndexModel(keys=[("email", 1)], options={"unique": True})]
name: str
email: str
created_at: datetime = msgspec.field(default_factory=datetime.now)
class Product(MongoDocument):
__collection_name__ = "products"
__indexes__ = [IndexModel(keys=[("sku", 1)], options={"unique": True})]
name: str
price: float
sku: str
description: Optional[str] = None
in_stock: bool = True
created_at: datetime = msgspec.field(default_factory=datetime.now)
updated_at: Optional[datetime] = None
async def main():
mongo_client = await mongojet.create_client("mongodb://localhost:27017")
db = mongo_client.get_database("example_db")
await mongospec.init(db, document_types=[User, Product])
# After initialization, you can use all CRUD methods without extra setup
user = User(name="John", email="john@example.com")
await user.insert() # Collection is already bound
print(f"Inserted user with ID: {user._id}")
await mongospec.close()
asyncio.run(main())