Hotdry.
systems-engineering

声明式渐进数据构造:基于pydantic-resolve的复杂数据结构分步构建模式

探索声明式渐进数据构造模式,通过pydantic-resolve实现复杂数据结构的分步构建与验证,避免一次性完整定义的开销,提升API数据构建的可维护性和性能。

在现代应用开发中,构建复杂的 API 响应数据是一个常见但棘手的挑战。传统的命令式方法往往导致代码臃肿、难以维护,而 GraphQL 等方案虽然提供了声明式查询能力,但在后处理和数据转换方面存在局限性。本文将深入探讨一种基于 Pydantic 的声明式渐进数据构造模式,通过pydantic-resolve库实现复杂数据结构的分步构建与验证。

声明式渐进数据构造的核心思想

声明式渐进数据构造模式的核心在于将数据构建过程分解为多个可组合的步骤,每个步骤都通过声明式的方式定义,而不是通过命令式的胶水代码连接。这种模式具有以下关键优势:

  1. 按需构建:只构建客户端实际需要的数据字段,避免不必要的数据传输
  2. 渐进验证:在每个构建步骤中进行数据验证,及早发现问题
  3. 关注点分离:数据定义、数据获取、数据转换逻辑分离
  4. 可组合性:小的构建块可以组合成复杂的数据结构

pydantic-resolve正是基于这一理念设计的工具,它允许开发者声明式地定义数据结构和数据获取逻辑,然后渐进式地构建完整的数据响应。

pydantic-resolve 的核心设计模式

1. DefineSubset:字段选择与复用

DefineSubsetpydantic-resolve的基础构建块,它允许从现有的 Pydantic 模型中选择需要的字段,生成新的模型类。这种方式避免了重复定义相似的数据结构,同时保持了类型安全。

from pydantic_resolve import DefineSubset
import app.team.schema as team_schema

class Team(DefineSubset):
    __subset__ = (team_schema.Team, ('id', 'name'))

在这个例子中,Team类只选择了原始team_schema.Team模型中的idname字段。这种选择性继承使得 API 可以根据不同端点的需求返回不同的字段组合,而不需要为每个变体创建完全独立的模型。

2. resolve_方法:按需数据获取

resolve_方法是pydantic-resolve的核心机制,它定义了如何获取关联数据。每个以resolve_开头的方法都对应一个需要延迟加载的字段。

from pydantic_resolve import Loader, Resolver
import app.sprint.loader as sprint_loader

class Team(DefineSubset):
    __subset__ = (team_schema.Team, ('id', 'name'))
    
    sprints: list[Sprint] = []
    def resolve_sprints(self, loader=Loader(sprint_loader.team_to_sprint_loader)):
        return loader.load(self.id)

resolve_sprints方法使用DataLoader模式批量加载数据,这显著提高了性能,特别是在处理 N+1 查询问题时。DataLoader会自动收集所有需要加载的 ID,然后执行一次批量查询。

3. DataLoader:批量优化与缓存

pydantic-resolve内置了对DataLoader的支持,这是从 GraphQL 社区借鉴的最佳实践。DataLoader 的主要优势包括:

  • 批量加载:将多个独立的加载请求合并为单个批量请求
  • 请求去重:自动合并相同的请求
  • 缓存机制:在同一请求周期内缓存已加载的数据
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from pydantic_resolve import build_list

async def story_to_task_loader(story_ids: list[int]):
    async with db.async_session() as session:
        tasks = await batch_get_tasks_by_ids(session, story_ids)
        return build_list(tasks, story_ids, lambda u: u.story_id)

4. ErDiagram:实体关系声明

从 v2 版本开始,pydantic-resolve引入了ErDiagram支持,允许开发者声明应用级别的实体关系图。这大大提高了代码的可维护性和可读性。

from pydantic_resolve import base_entity, Relationship

BaseEntity = base_entity()

class Sprint(BaseModel, BaseEntity):
    __relationships__ = [
        Relationship(field='id', target_kls=list['Story'], 
                    loader=story_loader.sprint_to_story_loader)
    ]
    
    id: int
    name: str
    status: str
    team_id: int

通过ErDiagram,数据关系在模型层面就得到了清晰的表达,而不是隐藏在业务逻辑中。这使得新开发者能够快速理解数据模型,也便于工具进行静态分析。

5. post_方法:后处理与数据转换

数据获取完成后,通常还需要进行各种转换和计算。pydantic-resolvepost_方法提供了在数据解析完成后执行后处理的钩子。

class Story(DefineSubset):
    __subset__ = (BaseStory, ('id', 'name', 'owner_id'))
    
    tasks: Annotated[list[Task], LoadBy('id')] = []
    
    total_estimate: int = 0
    def post_total_estimate(self):
        return sum(task.estimate for task in self.tasks)

post_方法在所有的resolve_方法执行完成后运行,因此可以安全地访问所有已解析的数据。这使得计算派生字段、数据格式化等操作变得非常简单。

跨层数据传输:ExposeAs 与 SendTo

在复杂的数据结构中,经常需要在不同层级的节点之间传递数据。pydantic-resolve提供了两种机制来实现这一需求:

1. ExposeAs:向下传递数据

ExposeAs允许父节点将数据暴露给所有后代节点使用。这在需要将上下文信息传递给深层嵌套结构时特别有用。

from pydantic_resolve import ExposeAs

class Story(BaseModel):
    id: int
    name: Annotated[str, ExposeAs('story_name')]  # 暴露为story_name
    owner_id: int
    sprint_id: int
    
    tasks: Annotated[list[Task], LoadBy('id')] = []
    
    model_config = ConfigDict(from_attributes=True)

class Task(BaseTask):
    user: Annotated[Optional[BaseUser], LoadBy('owner_id')] = None
    
    fullname: str = ''
    def post_fullname(self, ancestor_context):
        # 从祖先上下文访问story.name
        return f'{ancestor_context["story_name"]} - {self.name}'

2. SendTo:向上收集数据

SendTo允许后代节点将数据发送给祖先节点收集。这在需要从多个子节点聚合信息时非常有用。

from pydantic_resolve import SendTo

class Task1(BaseTask):
    user: Annotated[Optional[BaseUser], LoadBy('owner_id'), SendTo('related_users')] = None

class Story1(DefineSubset):
    __subset__ = (BaseStory, ('id', 'name', 'owner_id'))
    
    tasks: Annotated[list[Task1], LoadBy('id')] = []
    
    related_users: list[BaseUser] = []
    def post_related_users(self, collector=Collector(alias='related_users')):
        return collector.values()

实际应用场景与最佳实践

场景 1:团队项目管理 API

假设我们需要构建一个团队项目管理系统的 API,需要返回团队信息、相关的冲刺计划、每个冲刺中的故事以及故事的任务。

from pydantic_resolve import DefineSubset, Loader, Resolver, LoadBy
from typing import Annotated, Optional

# 定义基础模型
class BaseTeam(BaseModel):
    id: int
    name: str
    description: str

class BaseSprint(BaseModel):
    id: int
    name: str
    status: str
    team_id: int

class BaseStory(BaseModel):
    id: int
    name: str
    owner_id: int
    sprint_id: int

class BaseTask(BaseModel):
    id: int
    name: str
    estimate: int
    story_id: int
    owner_id: int

# 构建API响应模型
class TaskDetail(DefineSubset):
    __subset__ = (BaseTask, ('id', 'name', 'estimate', 'owner_id'))
    
    assignee: Annotated[Optional[BaseUser], LoadBy('owner_id')] = None

class StoryDetail(DefineSubset):
    __subset__ = (BaseStory, ('id', 'name', 'owner_id'))
    
    tasks: Annotated[list[TaskDetail], LoadBy('id')] = []
    assignee: Annotated[Optional[BaseUser], LoadBy('owner_id')] = None
    
    total_estimate: int = 0
    def post_total_estimate(self):
        return sum(task.estimate for task in self.tasks)

class SprintDetail(DefineSubset):
    __subset__ = (BaseSprint, ('id', 'name', 'status'))
    
    stories: Annotated[list[StoryDetail], LoadBy('id')] = []
    
    story_count: int = 0
    def post_story_count(self):
        return len(self.stories)

class TeamDetail(DefineSubset):
    __subset__ = (BaseTeam, ('id', 'name', 'description'))
    
    sprints: Annotated[list[SprintDetail], LoadBy('id')] = []
    
    active_sprints: list[SprintDetail] = []
    def post_active_sprints(self):
        return [s for s in self.sprints if s.status == 'active']

场景 2:性能优化与注意事项

在使用pydantic-resolve时,有几个性能相关的注意事项:

  1. 会话管理:在 FastAPI + SQLAlchemy 场景中,需要注意会话生命周期管理,避免死锁。
@router.get("/team/{team_id}/stories-with-detail", response_model=List[StoryDetail])
async def stories_with_detail_get(
        team_id: int,
        session: AsyncSession = Depends(get_async_session)):
    
    rows = await sq.get_stories(team_id=team_id, session=session)
    
    # 立即释放会话,避免死锁
    await session.close()
    
    items = [StoryDetail.model_validate(r) for r in rows]
    items = await Resolver().resolve(items)  # dataloader会在内部创建新会话
    return items
  1. DataLoader 设计:合理设计 DataLoader 的批量查询逻辑,避免过大的 IN 查询。
async def batch_get_users_by_ids(session: AsyncSession, user_ids: list[int]):
    # 分批处理,避免IN查询参数过多
    batch_size = 100
    all_users = []
    for i in range(0, len(user_ids), batch_size):
        batch = user_ids[i:i+batch_size]
        users = (await session.execute(
            select(User).where(User.id.in_(batch))
        )).scalars().all()
        all_users.extend(users)
    return all_users

与 GraphQL 的对比

pydantic-resolve在设计上借鉴了 GraphQL 的许多优点,但也解决了一些 GraphQL 的痛点:

特性 GraphQL pydantic-resolve
声明式查询
按需字段选择
批量加载优化 ✅ (通过 DataLoader) ✅ (内置 DataLoader)
后处理能力 ❌ (需要在 resolver 中手动处理) ✅ (通过 post_方法)
跨层数据传输 ❌ (复杂) ✅ (通过 ExposeAs/SendTo)
类型安全 ✅ (通过 GraphQL Schema) ✅ (通过 Pydantic)
与现有代码集成 中等 简单 (基于 Pydantic)

总结

声明式渐进数据构造模式通过pydantic-resolve库提供了一个强大而灵活的解决方案,用于构建复杂的 API 数据响应。它的核心优势在于:

  1. 声明式定义:通过简单的声明定义数据结构和获取逻辑
  2. 渐进构建:分步构建复杂数据结构,避免一次性完整定义的开销
  3. 性能优化:内置 DataLoader 支持,自动优化批量查询
  4. 后处理能力:强大的后处理钩子,支持复杂的数据转换
  5. 跨层通信:灵活的跨层数据传输机制

对于需要构建复杂 API 响应的 Python 项目,特别是那些已经使用 Pydantic 和 FastAPI 的项目,pydantic-resolve提供了一个优雅且高效的解决方案。它不仅提高了开发效率,还通过清晰的声明式代码提高了项目的可维护性。

随着现代应用对 API 数据构建的要求越来越复杂,声明式渐进数据构造模式将成为重要的架构模式之一。pydantic-resolve作为这一模式的优秀实现,值得在合适的场景中深入探索和应用。

参考资料

  1. pydantic-resolve GitHub 仓库
  2. pydantic-resolve 官方文档
  3. 组合导向开发模式示例
查看归档