目 录CONTENT

文章目录

学习:Mongodb 异步 by Motor and Beanie

Administrator
2024-12-25 / 0 评论 / 0 点赞 / 286 阅读 / 13442 字

A guide to using MongoDB and asyncio with Motor

  • mysql对比 传统的关系数据库一般由数据库(database)、表(table)、记录(record)三个层次概念组成

  • MongoDB是由数据库(database)、集合(collection)、文档对象(document)三个层次组成

4个对象层级

  • AsyncIOMotorClient represents a mongod process, or a cluster of them. You explicitly create one of these client objects, connect it to a running mongod or mongods, and use it for the lifetime of your application.

  • AsyncIOMotorDatabase: Each mongod has a set of databases (distinct sets of data files on disk). You can get a reference to a database from a client.

  • AsyncIOMotorCollection: A database has a set of collections, which contain documents; you get a reference to a collection from a database.

  • AsyncIOMotorCursor: Executing find() on an AsyncIOMotorCollection gets an AsyncIOMotorCursor, which represents the set of documents matching a query.

client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017")

# db
db = client.test_database
db = client["test_database"]

# collection
collection = db.test_collection
collection = db["test_collection"]

# document
async def do_insert():
    document = {"key": "value"}
    result = await db.test_collection.insert_one(document)
    print("result %s" % repr(result.inserted_id))


import asyncio
# loop = client.get_io_loop(): 获取 MongoDB 客户端的事件循环
loop = client.get_io_loop()
# loop.run_until_complete(do_insert()): 在事件循环中运行异步函数 do_insert,直到完成
loop.run_until_complete(do_insert())
result ObjectId('...')

# 大批量文本插入
async def do_insert():
    result = await db.test_collection.insert_many([{"i": i} for i in range(2000)])
    print("inserted %d docs" % (len(result.inserted_ids),))

loop = client.get_io_loop()
loop.run_until_complete(do_insert())
inserted 2000 docs

Beanie:MongoDB的Python对象文档映射器

BeanieDocument- 是 PydanticBaseModel的抽象,允许在应用程序级别使用 Python 对象和在数据库级别使用 JSON 对象。

在一般情况下,一个 MongoDB 集合与一个 BeanieDocument相关联

# example
import asyncio
from typing import Optional

from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel

from beanie import Document, Indexed, init_beanie


class Category(BaseModel):
    name: str
    description: str


class Product(Document):
    name: str                          # You can use normal types just like in pydantic
    description: Optional[str] = None
    price: Indexed(float)              # You can also specify that a field should correspond to an index
    category: Category                 # You can include pydantic models as well


# This is an asynchronous example, so we will access it from an async function
async def example():
    # Beanie uses Motor async client under the hood 
    client = AsyncIOMotorClient("mongodb://user:pass@host:27017")

    # Initialize beanie with the Product document class
    await init_beanie(database=client.db_name, document_models=[Product])

    chocolate = Category(name="Chocolate", description="A preparation of roasted and ground cacao seeds.")
    # Beanie documents work just like pydantic models
    tonybar = Product(name="Tony's", price=5.95, category=chocolate)
    # And can be inserted into the database
    await tonybar.insert() 
    
    # You can find documents with pythonic syntax
    product = await Product.find_one(Product.price < 10)
    
    # And update them
    await product.set({Product.name:"Gold bar"})


if __name__ == "__main__":
    asyncio.run(example())

Beanie和Motor异同

BeanieMotor 都是用于与 MongoDB 进行交互的 Python 库,但它们在设计理念、使用方式和功能上有一些区别。以下是它们的主要区别:

主要特点:

1. ORM 风格:

  • Beanie 允许你定义模型类,这些类映射到 MongoDB 的集合。你可以通过这些类来创建、读取、更新和删除文档。

例如:

     from beanie import Document

     class User(Document):

         name: str

         age: int

     # 创建文档

     user = User(name="John", age=30)

     await user.insert()

2. 自动生成 ID:

  • Beanie 会自动为每个文档生成 _id,除非你手动指定。

3. 类型安全:

  • Beanie 使用 Pydantic 进行数据验证和序列化,提供了类型安全的操作。

4. 异步支持:

  • Beanie 完全支持异步操作,适合在异步框架(如 FastAPI)中使用。

Motor

Motor 是一个异步 MongoDB 驱动程序,提供了对 MongoDB 的异步访问。它是一个低级别的库,提供了直接与 MongoDB 交互的 API

主要特点:

1. 低级别 API:

  • Motor 提供了直接操作 MongoDB 的 API,没有 ORM 的抽象层。你需要手动处理集合、文档和查询。

例如:

     from motor.motor_asyncio import AsyncIOMotorClient

     client = AsyncIOMotorClient('mongodb://localhost:27017')

     db = client.test_database

     collection = db.test_collection

     document = {"name": "John", "age": 30}

     result = await collection.insert_one(document)

     print(result.inserted_id)

2. 异步支持:

  • Motor 完全支持异步操作,适合在异步环境中使用。

3. 灵活性:

  • 由于没有 ORM 的抽象层,Motor 提供了更高的灵活性和控制力,适合需要精细控制数据库操作的场景。

总结

  • Beanie:

    • 适合需要 ORM 风格操作的场景,提供了类型安全和自动生成 ID 的功能。

    • 使用类和对象来操作 MongoDB,适合在异步框架(如 FastAPI)中使用。

  • Motor:

    • 适合需要低级别 API 和更高灵活性的场景。

    • 提供了直接操作 MongoDB 的 API,适合需要精细控制数据库操作的场景。

FastAPI使用beanie

init db

在session.py文件里定义init_db

from beanie.odm.fields import Link
from typing import List, Optional
from app.model.auto_increment import AutoIncrementDocument

class ObjectAttr(AutoIncrementDocument):
    # 属性名,唯一标识
    unique_name: str

    class Settings:
        # 集合名为object_attr
        name = "object_attr"
        validate_on_save = True


async def init_db():
    logger.debug("Will get Motor async client")
    client = AsyncIOMotorClient(settings.MONGODB_DATABASE_URI)
    database = client[settings.DB_NAME]
    logger.debug("Initialize beanie with the document class")
    await init_beanie(database=database,
                      document_models=[
                          CounterDocument, AutoIncrementDocument,ObjectAttr
                      ])

crud base

定义CRUDBase

from typing import Any, Dict, Generic, Optional, Type, TypeVar, Union
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from beanie.odm.fields import DeleteRules
from beanie import Document
from psycopg2.extensions import register_adapter, AsIs
from pydantic.networks import IPv4Address, IPv6Address


def adapt_pydantic_ip_address(ip):
    return AsIs(repr(ip.exploded))


register_adapter(IPv4Address, adapt_pydantic_ip_address)
register_adapter(IPv6Address, adapt_pydantic_ip_address)

ModelType = TypeVar("ModelType", bound=Document)
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
# 定义了三个泛型类型变量,用于CRUD操作的类型提示
# ModelType:限定为继承自Document的类(通常用于MongoDB文档模型)
# CreateSchemaType和UpdateSchemaType:限定为继承自BaseModel的类(Pydantic模型)


class CRUDBase(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):

    def __init__(self, model: Type[ModelType]):
        """
        CRUD object with default methods to Create, Read, Update, Delete (CRUD).

        **Parameters**

        * `model`: A SQLAlchemy model class
        * `schema`: A Pydantic model (schema) class
        """
        self.model = model

    async def get(self, id: int) -> Optional[ModelType]:
        result = await self.model.get(document_id=id)
        return result

    async def get_all(self) -> Optional[ModelType]:
        result = await self.model.find().to_list()
        return result

    async def get_by_uq_name(
        self,
        *,
        unique_name: str,
    ) -> Optional[ModelType]:
        result = await self.model.find(self.model.unique_name == unique_name).first_or_none()
        return result

    async def update(self, *, db_obj: ModelType, obj_in: Union[UpdateSchemaType,
                                                               Dict[str, Any]]) -> ModelType:
        obj_data = jsonable_encoder(db_obj)
        if isinstance(obj_in, dict):
            update_data = obj_in
        else:
            update_data = obj_in.dict(exclude_unset=True)
        for field in obj_data:
            if field in update_data:
                setattr(db_obj, field, update_data[field])
        db_obj = await db_obj.save()
        return db_obj

    async def remove(self, *, db_obj: ModelType) -> ModelType:
        db_obj = await db_obj.delete(link_rule=DeleteRules.DELETE_LINKS)
        return db_obj

crud object attr

定义具体object attr的方法

class CRUDObjectAttr(CRUDBase[ObjectAttr, ObjectAttrCreate, ObjectAttrUpdate]):

    async def create(self, *, attr_group: AttrGroup,
                     obj_in: ObjectAttrCreate) -> Optional[ObjectAttr]:
        object_attr = ObjectAttr(**jsonable_encoder(obj_in))
        # 创建object_attr
        object_attr = await object_attr.insert(link_rule=WriteRules.WRITE)
        return object_attr

object_attr = CRUDObjectAttr(ObjectAttr)

insert data

创建数据到mongodb

@router.post("/", response_model=ObjectAttr)
async def create_object_attr(*, attr_group_id: int, obj_in: ObjectAttrCreate):
  await crud.object_attr.create(attr_group=attr_group, obj_in=obj_in)

0
  1. 支付宝打赏

    qrcode alipay
  2. 微信打赏

    qrcode weixin

评论区