A guide to using MongoDB and asyncio with Motor
mysql对比 传统的关系数据库一般由数据库(database)、表(table)、记录(record)三个层次概念组成
MongoDB是由数据库(database)、集合(collection)、文档对象(document)三个层次组成
4个对象层级
AsyncIOMotorClient
represents a mongod process, or a cluster of them. You explicitly create one of these client objects, connect it to a running mongod or mongods, and use it for the lifetime of your application.AsyncIOMotorDatabase
: Each mongod has a set of databases (distinct sets of data files on disk). You can get a reference to a database from a client.AsyncIOMotorCollection
: A database has a set of collections, which contain documents; you get a reference to a collection from a database.AsyncIOMotorCursor
: Executingfind()
on anAsyncIOMotorCollection
gets anAsyncIOMotorCursor
, which represents the set of documents matching a query.
client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017")
# db
db = client.test_database
db = client["test_database"]
# collection
collection = db.test_collection
collection = db["test_collection"]
# document
async def do_insert():
document = {"key": "value"}
result = await db.test_collection.insert_one(document)
print("result %s" % repr(result.inserted_id))
import asyncio
# loop = client.get_io_loop(): 获取 MongoDB 客户端的事件循环
loop = client.get_io_loop()
# loop.run_until_complete(do_insert()): 在事件循环中运行异步函数 do_insert,直到完成
loop.run_until_complete(do_insert())
result ObjectId('...')
# 大批量文本插入
async def do_insert():
result = await db.test_collection.insert_many([{"i": i} for i in range(2000)])
print("inserted %d docs" % (len(result.inserted_ids),))
loop = client.get_io_loop()
loop.run_until_complete(do_insert())
inserted 2000 docs
Beanie:MongoDB的Python对象文档映射器
BeanieDocument
- 是 PydanticBaseModel
的抽象,允许在应用程序级别使用 Python 对象和在数据库级别使用 JSON 对象。
在一般情况下,一个 MongoDB 集合与一个 BeanieDocument
相关联
# example
import asyncio
from typing import Optional
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from beanie import Document, Indexed, init_beanie
class Category(BaseModel):
name: str
description: str
class Product(Document):
name: str # You can use normal types just like in pydantic
description: Optional[str] = None
price: Indexed(float) # You can also specify that a field should correspond to an index
category: Category # You can include pydantic models as well
# This is an asynchronous example, so we will access it from an async function
async def example():
# Beanie uses Motor async client under the hood
client = AsyncIOMotorClient("mongodb://user:pass@host:27017")
# Initialize beanie with the Product document class
await init_beanie(database=client.db_name, document_models=[Product])
chocolate = Category(name="Chocolate", description="A preparation of roasted and ground cacao seeds.")
# Beanie documents work just like pydantic models
tonybar = Product(name="Tony's", price=5.95, category=chocolate)
# And can be inserted into the database
await tonybar.insert()
# You can find documents with pythonic syntax
product = await Product.find_one(Product.price < 10)
# And update them
await product.set({Product.name:"Gold bar"})
if __name__ == "__main__":
asyncio.run(example())
Beanie和Motor异同
Beanie
和 Motor
都是用于与 MongoDB 进行交互的 Python 库,但它们在设计理念、使用方式和功能上有一些区别。以下是它们的主要区别:
主要特点:
1. ORM 风格:
Beanie 允许你定义模型类,这些类映射到 MongoDB 的集合。你可以通过这些类来创建、读取、更新和删除文档。
例如:
from beanie import Document
class User(Document):
name: str
age: int
# 创建文档
user = User(name="John", age=30)
await user.insert()
2. 自动生成 ID:
Beanie 会自动为每个文档生成 _id,除非你手动指定。
3. 类型安全:
Beanie 使用 Pydantic 进行数据验证和序列化,提供了类型安全的操作。
4. 异步支持:
Beanie 完全支持异步操作,适合在异步框架(如 FastAPI)中使用。
Motor
Motor 是一个异步 MongoDB 驱动程序,提供了对 MongoDB 的异步访问。它是一个低级别的库,提供了直接与 MongoDB 交互的 API。
主要特点:
1. 低级别 API:
Motor 提供了直接操作 MongoDB 的 API,没有 ORM 的抽象层。你需要手动处理集合、文档和查询。
例如:
from motor.motor_asyncio import AsyncIOMotorClient
client = AsyncIOMotorClient('mongodb://localhost:27017')
db = client.test_database
collection = db.test_collection
document = {"name": "John", "age": 30}
result = await collection.insert_one(document)
print(result.inserted_id)
2. 异步支持:
Motor 完全支持异步操作,适合在异步环境中使用。
3. 灵活性:
由于没有 ORM 的抽象层,Motor 提供了更高的灵活性和控制力,适合需要精细控制数据库操作的场景。
总结
Beanie:
适合需要 ORM 风格操作的场景,提供了类型安全和自动生成 ID 的功能。
使用类和对象来操作 MongoDB,适合在异步框架(如 FastAPI)中使用。
Motor:
适合需要低级别 API 和更高灵活性的场景。
提供了直接操作 MongoDB 的 API,适合需要精细控制数据库操作的场景。
FastAPI使用beanie
init db
在session.py文件里定义init_db
from beanie.odm.fields import Link
from typing import List, Optional
from app.model.auto_increment import AutoIncrementDocument
class ObjectAttr(AutoIncrementDocument):
# 属性名,唯一标识
unique_name: str
class Settings:
# 集合名为object_attr
name = "object_attr"
validate_on_save = True
async def init_db():
logger.debug("Will get Motor async client")
client = AsyncIOMotorClient(settings.MONGODB_DATABASE_URI)
database = client[settings.DB_NAME]
logger.debug("Initialize beanie with the document class")
await init_beanie(database=database,
document_models=[
CounterDocument, AutoIncrementDocument,ObjectAttr
])
crud base
定义CRUDBase
from typing import Any, Dict, Generic, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from beanie.odm.fields import DeleteRules
from beanie import Document
from psycopg2.extensions import register_adapter, AsIs
from pydantic.networks import IPv4Address, IPv6Address
def adapt_pydantic_ip_address(ip):
return AsIs(repr(ip.exploded))
register_adapter(IPv4Address, adapt_pydantic_ip_address)
register_adapter(IPv6Address, adapt_pydantic_ip_address)
ModelType = TypeVar("ModelType", bound=Document)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
# 定义了三个泛型类型变量,用于CRUD操作的类型提示
# ModelType:限定为继承自Document的类(通常用于MongoDB文档模型)
# CreateSchemaType和UpdateSchemaType:限定为继承自BaseModel的类(Pydantic模型)
class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType]):
"""
CRUD object with default methods to Create, Read, Update, Delete (CRUD).
**Parameters**
* `model`: A SQLAlchemy model class
* `schema`: A Pydantic model (schema) class
"""
self.model = model
async def get(self, id: int) -> Optional[ModelType]:
result = await self.model.get(document_id=id)
return result
async def get_all(self) -> Optional[ModelType]:
result = await self.model.find().to_list()
return result
async def get_by_uq_name(
self,
*,
unique_name: str,
) -> Optional[ModelType]:
result = await self.model.find(self.model.unique_name == unique_name).first_or_none()
return result
async def update(self, *, db_obj: ModelType, obj_in: Union[UpdateSchemaType,
Dict[str, Any]]) -> ModelType:
obj_data = jsonable_encoder(db_obj)
if isinstance(obj_in, dict):
update_data = obj_in
else:
update_data = obj_in.dict(exclude_unset=True)
for field in obj_data:
if field in update_data:
setattr(db_obj, field, update_data[field])
db_obj = await db_obj.save()
return db_obj
async def remove(self, *, db_obj: ModelType) -> ModelType:
db_obj = await db_obj.delete(link_rule=DeleteRules.DELETE_LINKS)
return db_obj
crud object attr
定义具体object attr的方法
class CRUDObjectAttr(CRUDBase[ObjectAttr, ObjectAttrCreate, ObjectAttrUpdate]):
async def create(self, *, attr_group: AttrGroup,
obj_in: ObjectAttrCreate) -> Optional[ObjectAttr]:
object_attr = ObjectAttr(**jsonable_encoder(obj_in))
# 创建object_attr
object_attr = await object_attr.insert(link_rule=WriteRules.WRITE)
return object_attr
object_attr = CRUDObjectAttr(ObjectAttr)
insert data
创建数据到mongodb
@router.post("/", response_model=ObjectAttr)
async def create_object_attr(*, attr_group_id: int, obj_in: ObjectAttrCreate):
await crud.object_attr.create(attr_group=attr_group, obj_in=obj_in)
评论区