Whether the process of data transformation is intuitive is one of the determining factors of project quality.
pydantic-resolve turns pydantic from a static data container into a powerful dynamic computing tool.
It provides major features based on pydantic class:
- pluggable resolve methods and post methods, to define how to fetch and modify nodes.
- transporting field data from ancestor nodes to their descendant nodes.
- collecting data from any descendants nodes to their ancestor nodes.
It supports:
- pydantic v1
- pydantic v2
- dataclass
from pydantic.dataclasses import dataclass
It could be seamlessly integrated with modern Python web frameworks including FastAPI, Litestar, and Django-ninja.
For FastAPI, we can explore the dependencies of schemas with fastapi-voyager
pip install pydantic-resolve
Starting from pydantic-resolve v1.11.0, both pydantic v1 and v2 are supported.
- Documentation: https://allmonday.github.io/pydantic-resolve/
- Demo: https://github.com/allmonday/pydantic-resolve-demo
- Composition-Oriented Pattern: https://github.com/allmonday/composition-oriented-development-pattern
- Resolver Pattern: A Better Alternative to GraphQL in BFF (api-integration).
Here is the root data, a list of BaseStory
.
base_stories = [
BaseStory(id=1, name="story - 1"),
BaseStory(id=2, name="story - 2")
]
let's import Resolver and resolve the base_stories, currently Resolver().resolve
will do nothing becuase pydantic-resolve related configuration is not applied yet.
from pydantic_resolve import Resolver
data = await Resolver().resolve(base_stories)
Then let's define the Story
, which inherit from BaseStory
, add tasks
field.
Now let's define resolve_tasks
method and use StoryTskLoader
to load the tasks (inside DataLoader it will gather the ids and run query in batch)
Let's initialize stories
from base_stories
from pydantic_resolve import Resolver
from biz_models import BaseTask, BaseStory, BaseUser
from biz_services import UserLoader, StoryTaskLoader
class Story(BaseStory):
tasks: list[BaseTask] = []
def resolve_tasks(self, loader=Loader(StoryTaskLoader)): # StoryTaskLoader return list of BaseTasks of each story by story_id
return loader.load(self.id)
stories = [Story.model_validate(s, from_attributes=True) for s in base_stories]
data = await Resolver().resolve(stories)
Here is where magic happens, let's check the data (in json), the tasks
field are fetched automatically:
[
{
"id": 1,
"name": "story - 1",
"tasks": [
{
"id": 1,
"name": "design",
"user_id": 2
}
]
},
{
"id": 2,
"name": "story - 2",
"tasks": [
{
"id": 2,
"name": "add ut",
"user_id": 2
}
]
}
]
Let's continue extend the BaseTask
and replace the return type of Story.tasks
class Task(BaseTask):
user: Optional[BaseUser] = None
def resolve_user(self, loader=Loader(UserLoader)):
return loader.load(self.assignee_id) if self.assignee_id else None
class Story(BaseStory):
tasks: list[Task] = [] # BaseTask -> Task
def resolve_tasks(self, loader=Loader(StoryTaskLoader)):
return loader.load(self.id)
Then user data is available immediately.
[
{
"id": 1,
"name": "story - 1",
"tasks": [
{
"id": 1,
"name": "design",
"user_id": 1,
"user": {
"id": 1,
"name": "tangkikodo"
}
}
]
},
{
"id": 2,
"name": "story - 2",
"tasks": [
{
"id": 2,
"name": "add ut",
"user_id": 2,
"user": {
"id": 2,
"name": "john"
}
}
]
}
]
That's the basic sample of resolve_method
in fetching related data.
Let's take Agile's model for example, it includes Story, Task and User
Establish entity relationships model based on business concept.
which is stable, serves as architectural blueprint

from pydantic import BaseModel
class Story(BaseModel):
id: int
name: str
owner_id: int
sprint_id: int
model_config = ConfigDict(from_attributes=True)
class Task(BaseModel):
id: int
name: str
owner_id: int
story_id: int
estimate: int
model_config = ConfigDict(from_attributes=True)
class User(BaseModel):
id: int
name: str
level: str
model_config = ConfigDict(from_attributes=True)
The dataloader is defined for general usage, if other approach such as ORM relationship is available, it can be easily replaced. DataLoader's implementation supports all kinds of data sources, from database queries to microservice RPC calls.
from .model import Task
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
import src.db as db
from pydantic_resolve import build_list
async def batch_get_tasks_by_ids(session: AsyncSession, story_ids: list[int]):
users = (await session.execute(select(Task).where(Task.story_id.in_(story_ids)))).scalars().all()
return users
# user_id -> user
async def batch_get_users_by_ids(session: AsyncSession, user_ids: list[int]):
users = (await session.execute(select(User).where(User.id.in_(user_ids)))).scalars().all()
return users
Based on a our business logic, create domain-specific data structures through schemas and relationship dataloader
We just need to extend tasks
, assignee
and reporter
for Story
, and extend user
for Task
Extending new fields is dynamic, depends on business requirement, however the relationships / loaders are restricted by the definition in step 1.

generated by fastapi-voyager
from typing import Optional
from pydantic_resolve import Loader, Collector
from src.services.story.schema import Story as BaseStory
from src.services.task.schema import Task as BaseTask
from src.services.task.loader import story_to_task_loader
from src.services.user.schema import User as BaseUser
from src.services.user.loader import user_batch_loader
class Task(BaseTask):
user: Optional[BaseUser] = None
def resolve_user(self, loader=Loader(user_batch_loader)):
return loader.load(self.owner_id) if self.owner_id else None
class Story(BaseStory):
tasks: list[Task] = []
def resolve_tasks(self, loader=Loader(story_to_task_loader)):
return loader.load(self.id)
assignee: Optional[BaseUser] = None
def resolve_assignee(self, loader=Loader(user_batch_loader)):
return loader.load(self.owner_id) if self.owner_id else None
ensure_subset
decorator is a helper function which ensures the target class's fields (without default value) are strictly subset of class in parameter.
@ensure_subset(BaseStory)
class Story1(BaseModel):
id: int
name: str
owner_id: int
# sprint_id: int # ignore some fields
model_config = ConfigDict(from_attributes=True)
tasks: list[Task1] = []
def resolve_tasks(self, loader=Loader(story_to_task_loader)):
return loader.load(self.id)
assignee: Optional[BaseUser] = None
def resolve_assignee(self, loader=Loader(user_batch_loader)):
return loader.load(self.owner_id) if self.owner_id else None
Once this combination is stable, you can consider optimizing with specialized queries to replace DataLoader for enhanced performance, such as ORM's join relationship
Dataset from data-persistent layer can not meet all requirements for view model, adding extra computed fields or adjusting current data is very common.
post_method
is what you need, it is triggered after all descendant nodes are resolved.
It could read fields from ancestor, collect fields from descendants or modify the data fetched by resolve method.
Let's show them case by case.

__pydantic_resolve_collect__
can collect fields from current node and then send them to ancestor node who declared related_users
.
from typing import Optional
from pydantic import BaseModel, ConfigDict
from pydantic_resolve import Loader, Collector, ensure_subset
from src.services.story.schema import Story as BaseStory
from src.services.task.schema import Task as BaseTask
from src.services.task.loader import story_to_task_loader
from src.services.user.schema import User as BaseUser
from src.services.user.loader import user_batch_loader
class Task1(BaseTask):
__pydantic_resolve_collect__ = {'user': 'related_users'} # Propagate user to collector: 'related_users'
user: Optional[BaseUser] = None
def resolve_user(self, loader=Loader(user_batch_loader)):
return loader.load(self.owner_id) if self.owner_id else None
@ensure_subset(BaseStory)
class Story1(BaseModel):
id: int
name: str
owner_id: int
model_config = ConfigDict(from_attributes=True)
tasks: list[Task1] = []
def resolve_tasks(self, loader=Loader(story_to_task_loader)):
return loader.load(self.id)
assignee: Optional[BaseUser] = None
def resolve_assignee(self, loader=Loader(user_batch_loader)):
return loader.load(self.owner_id) if self.owner_id else None
# ----- collect from descendants ---------
related_users: list[BaseUser] = []
def post_related_users(self, collector=Collector(alias='related_users')):
return collector.values()

post methods are executed after all resolve_methods are resolved, so we can use it to calculate extra fields.
from typing import Optional
from pydantic import BaseModel, ConfigDict
from pydantic_resolve import Loader, ensure_subset
from src.services.story.schema import Story as BaseStory
from src.services.task.schema import Task as BaseTask
from src.services.task.loader import story_to_task_loader
from src.services.user.schema import User as BaseUser
from src.services.user.loader import user_batch_loader
class Task2(BaseTask):
user: Optional[BaseUser] = None
def resolve_user(self, loader=Loader(user_batch_loader)):
return loader.load(self.owner_id) if self.owner_id else None
@ensure_subset(BaseStory)
class Story2(BaseModel):
id: int
name: str
owner_id: int
model_config = ConfigDict(from_attributes=True)
tasks: list[Task2] = []
def resolve_tasks(self, loader=Loader(story_to_task_loader)):
return loader.load(self.id)
assignee: Optional[BaseUser] = None
def resolve_assignee(self, loader=Loader(user_batch_loader)):
return loader.load(self.owner_id) if self.owner_id else None
# ---- calculate extra fields ----
total_estimate: int = 0
def post_total_estimate(self):
return sum(task.estimate for task in self.tasks)

__pydantic_resolve_expose__
could expose specific fields from current node to it's descendant.
alias_names should be global unique inside root node.
descendant nodes could read the value with ancestor_context[alias_name]
.
from typing import Optional
from pydantic import BaseModel, ConfigDict
from pydantic_resolve import Loader, ensure_subset
from src.services.story.schema import Story as BaseStory
from src.services.task.schema import Task as BaseTask
from src.services.task.loader import story_to_task_loader
from src.services.user.schema import User as BaseUser
from src.services.user.loader import user_batch_loader
class Task3(BaseTask):
user: Optional[BaseUser] = None
def resolve_user(self, loader=Loader(user_batch_loader)):
return loader.load(self.owner_id) if self.owner_id else None
fullname: str = ''
def post_fullname(self, ancestor_context): # Access story.name from parent context
return f'{ancestor_context["story_name"]} - {self.name}'
@ensure_subset(BaseStory)
class Story3(BaseModel):
__pydantic_resolve_expose__ = {'name': 'story_name'} # expose to descendants.
id: int
name: str
owner_id: int
model_config = ConfigDict(from_attributes=True)
tasks: list[Task3] = []
def resolve_tasks(self, loader=Loader(story_to_task_loader)):
return loader.load(self.id)
assignee: Optional[BaseUser] = None
def resolve_assignee(self, loader=Loader(user_batch_loader)):
return loader.load(self.owner_id) if self.owner_id else None
from pydantic_resolve import Resolver
stories = [Story(**s) for s in await query_stories()]
data = await Resolver().resolve(stories)
query_stories()
returns BaseStory
list, after we transformed it into Story
, resolve and post fields are initialized as default value, after Resolver().resolve()
finished, all these fields will be resolved and post-processed to what we expected.
tox
tox -e coverage
python -m http.server
Current test coverage: 97%