您好,登錄后才能下訂單哦!
這篇文章主要介紹“pydantic resolve怎么解決嵌套數據結構生成問題”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“pydantic resolve怎么解決嵌套數據結構生成問題”文章能幫助大家解決問題。
以論壇為例,有個接口返回帖子(posts)信息,然后呢,來了新需求,說需要顯示帖子的 author 信息。
此時會有兩種選擇:
在 posts 的 query 中 join 查詢 author 信息,在返回 post 中添加諸如 author_id, author_name 之類的字段。
{'post': 'v2ex', 'author_name': 'tangkikodo'}
根據 posts 的 ids , 單獨查詢 author 列表,然后把 author 對象循環添加到 post 對象中。
{'post':'v2ex', 'author': {'name': 'tangkikod'}}
方法 1 中,需要去修改 query, 還需要修改post的schema. 如果未來要加新字段,例如用戶頭像的話,會需要修改兩處。
方法 2 需要手動做一次拼接。之后增減字段都是在 author 對象的范圍內修改。
所以相對來說, 方法 2 在未來的可維護性會比較好。用嵌套對象的方式可以更好的擴展和維護。
方法2 的返回結構
[ { "id": 1, "post": "v2ex", "author": { "name": "tangkikodo", "id": 1 } }, { "id": 2, "post": "v3ex", "author": { "name": "tangkikodo2", "id": 1 } } ]
然而需求總是會變化,突然來了一個新的且奇怪的需求,要在 author 信息中添加數據,顯示他最近瀏覽過的帖子。返回體變成了:
[ { "id": 1, "post": "v2ex", "author": { "name": "tangkikodo", "recent_views": [ { "id": 2, "post": "v3ex" }, { "id": 3, "post": "v4ex" } ] } } ]
那這個時候該怎么弄呢?血壓是不是有點上來了。
根據之前的方法 2, 通常的操作是在獲取到authors信息后, 關聯查找author的recent_posts, 拼接回authors, 再將 authors 拼接回posts。 流程類似層層查找再層層回拼。 偽代碼類似:
# posts query posts = query_all_posts() # authors query authors_ids = fetch_unique_author_id(posts) authors = query_author(author_ids) recent_view_posts = fetch_recent_review_posts(author_ids) # 新需求 recent_view_maps = calc_view_mapping(recent_view_posts) # 新需求 # authors attach authors = [attach_posts(a, recent_view_maps) for a in authors] author_map = calc_author_mapping(authors) # posts attach posts = [attach_author(p, author_map) for p in posts]
莫名的會聯想到callback hell, 添加新的層級都會在代碼中間部分切入。
反正想想就挺麻煩的對吧。要是哪天再嵌套一層呢? 代碼改起來有點費勁, 如果你此時血壓有點高,那請繼續往下看。
那,有別的辦法么? 這里有個小輪子也許能幫忙。
祭出一個小輪子: allmonday/pydantic-resolve
以剛才的例子,要做的事情抽象成兩部分:
定義 dataloader ,負責查詢和group數據。前半部分是從數據庫查詢,后半部分是將數據轉成 pydantic 對象后返回。 偽代碼,看個大概意思就好。
class AuthorLoader(DataLoader): async def batch_load_fn(self, author_ids): async with async_session() as session: # query authors res = await session.execute(select(Author).where(Author.id.in_(author_ids))) rows = res.scalars().all() # transform into pydantic object dct = defaultdict(dict) for row in rows: dct[row.author_id] = AuthorSchema.from_orm(row) # order by author_id return [dct.get(k, None) for k in author_ids] class RecentViewPostLoader(DataLoader): async def batch_load_fn(self, view_ids): async with async_session() as session: res = await session.execute(select(Post, PostVisit.visitor_id) # join 瀏覽中間表 .join(PostVist, PostVisit.post_id == Post.id) .where(PostVisit.user_id.in_(view_ids) .where(PostVisit.created_at < some_timestamp))) rows = res.scalars().all() dct = defaultdict(list) for row in rows: dct[row.visitor_id].append(PostSchema.from_orm(row)) # group 到 visitor return [dct.get(k, []) for k in view_ids]
定義 schema, 并且注入依賴的 DataLoaders, LoaderDepend 會管理好loader 的異步上下文緩存。
class RecentPostSchema(BaseModel): id: int name: str class Config: orm_mode = True class AuthorSchema(BaseModel): id: int name: str img_url: str recent_views: Tuple[RecentPostSchema, ...] = tuple() def resolve_recent_views(self, loader=LoaderDepend(RecentViewPostLoader)): return loader.load(self.id) class Config: orm_mode = True class PostSchema(BaseModel): id: int author_id: int name: str author: Optional[AuthorSchema] = None def resolve_author(self, loader=LoaderDepend(AuthorLoader)): return loader.load(self.author_id) class Config: orm_mode = True
然后呢?
然后就沒有了,接下來只要做個 post 的查詢, 再簡單地...resolve 一下,任務就完成了。
posts = (await session.execute(select(Post))).scalars().all() posts = [PostSchema.from_orm(p) for p in posts] results = await Resolver().resolve(posts)
在拆分了 loader 和 schema 之后,對數據地任意操作都很簡單,添加任意新的schema 都不會破壞原有的代碼。
完整的案例可以查看 6_sqlalchemy_loaderdepend_global_filter.py
如果之前使用過aiodataloader 的話會知道,開發需要手動維護loader在每個request 中的初始化過程 , 但在 pydantic-resolve
中你完全不用操心異步上下文的創建,不用維護DataLoader的實例化, 一切都在pydantic-resolve
的管理之中。
就完事了。如果必須說有啥缺點的話。。必須用 async await 可能算一個。
關于“pydantic resolve怎么解決嵌套數據結構生成問題”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。