diff --git a/.flake8 b/.flake8 index d72d7daa..85c3c632 100644 --- a/.flake8 +++ b/.flake8 @@ -44,6 +44,7 @@ per-file-ignores = web_poet/serialization/__init__.py:F401,F403 web_poet/testing/__init__.py:F401,F403 tests/po_lib_to_return/__init__.py:D102 + tests/test_fields.py:D102 # the suggestion makes the code worse tests/test_serialization.py:B028 diff --git a/tests/test_fields.py b/tests/test_fields.py index f84a2851..5928733f 100644 --- a/tests/test_fields.py +++ b/tests/test_fields.py @@ -1,6 +1,8 @@ import asyncio import random -from typing import Optional +import warnings +from collections import defaultdict +from typing import DefaultDict, Optional import attrs import pytest @@ -23,6 +25,7 @@ HttpResponse, Injectable, ItemPage, + SelectFields, field, item_from_fields, item_from_fields_sync, @@ -630,3 +633,172 @@ def x(self) -> int: assert info["x"] == FieldInfo(name="x", meta=None, out=None, disabled=True) assert info["y"] == FieldInfo(name="y", meta=None, out=None, disabled=False) assert info["z"] == FieldInfo(name="z", meta=None, out=None, disabled=True) + + +@attrs.define +class BigItem: + x: int + y: Optional[int] = None + z: Optional[int] = None + + +@attrs.define +class BigPage(ItemPage[BigItem]): + select_fields: Optional[SelectFields] = None + call_counter: DefaultDict = attrs.field(factory=lambda: defaultdict(int)) + + @field + def x(self): + self.call_counter["x"] += 1 + return 1 + + @field(disabled=False) + def y(self): + self.call_counter["y"] += 1 + return 2 + + @field(disabled=True) + def z(self): + self.call_counter["z"] += 1 + return 3 + + +@pytest.mark.asyncio +async def test_select_fields() -> None: + # Required fields from the item cls which are not included raise an TypeError + expected_type_error_msg = ( + r"__init__\(\) missing 1 required positional argument: 'x'" + ) + + # When SelectFields isn't set, it should simply extract the non-disabled + # fields. + page = BigPage() + item = await page.to_item() + assert item == BigItem(x=1, y=2, z=None) + assert page.fields_to_extract == ["x", "y"] + assert page.call_counter == {"x": 1, "y": 1} + + # If no field selection directive is given but SelectFields is set, it would + # use the default fields that are not disabled. + page = BigPage(SelectFields(fields=None)) + item = await page.to_item() + assert item == BigItem(x=1, y=2, z=None) + assert page.fields_to_extract == ["x", "y"] + assert page.call_counter == {"x": 1, "y": 1} + + # Same case as above but given an empty dict. + page = BigPage(SelectFields(fields={})) + item = await page.to_item() + assert item == BigItem(x=1, y=2, z=None) + assert page.fields_to_extract == ["x", "y"] + assert page.call_counter == {"x": 1, "y": 1} + + # Select all fields + page = BigPage(SelectFields(fields={"*": True})) + item = await page.to_item() + assert item == BigItem(x=1, y=2, z=3) + assert page.fields_to_extract == ["x", "y", "z"] + assert page.call_counter == {"x": 1, "y": 1, "z": 1} + + # Don't select all fields + page = BigPage(SelectFields(fields={"*": False})) + with pytest.raises(TypeError, match=expected_type_error_msg): + await page.to_item() + assert page.fields_to_extract == [] + assert page.call_counter == {} + + # Exclude all but one + page = BigPage(SelectFields(fields={"*": False, "x": True})) + item = await page.to_item() + assert item == BigItem(x=1, y=None, z=None) + assert page.fields_to_extract == ["x"] + assert page.call_counter == {"x": 1} + + # Include all fields but one + page = BigPage(SelectFields(fields={"*": True, "z": False})) + item = await page.to_item() + assert item == BigItem(x=1, y=2, z=None) + assert page.fields_to_extract == ["x", "y"] + assert page.call_counter == {"x": 1, "y": 1} + + # overlapping directives on the same field should be okay + page = BigPage(SelectFields(fields={"*": True, "x": True, "y": True, "z": True})) + item = await page.to_item() + assert item == BigItem(x=1, y=2, z=3) + assert page.fields_to_extract == ["x", "y", "z"] + assert page.call_counter == {"x": 1, "y": 1, "z": 1} + + # Excluding a required field throws an error + page = BigPage(SelectFields(fields={"x": False})) + with pytest.raises(TypeError, match=expected_type_error_msg): + item = await page.to_item() + assert page.fields_to_extract == ["y"] + assert page.call_counter == {"y": 1} + + # boolean-like values are not supported. They are simply ignored and the + # page would revert back to the default field directives. + page = BigPage(SelectFields(fields={"x": 0, "y": 0, "z": 1})) # type: ignore[dict-item] + item = await page.to_item() + assert item == BigItem(x=1, y=2, z=None) + assert page.fields_to_extract == ["x", "y"] + assert page.call_counter == {"x": 1, "y": 1} + + # If a non-SelectFields instance was passed to the `select_fields` init + # parameter, it would simply ignore it and use the default field directives. + page = BigPage(select_fields="not the instance it's expecting") # type: ignore[arg-type] + item = await page.to_item() + assert item == BigItem(x=1, y=2, z=None) + assert page.fields_to_extract == ["x", "y"] + assert page.call_counter == {"x": 1, "y": 1} + + # The remaining tests below checks the different behaviors when encountering a + # field which doesn't existing in the PO + fields = {"x": True, "not_existing": True} + expected_attribute_error_msg = ( + "The {'not_existing'} fields isn't available in tests.test_fields.BigPage" + ) + + # Unknown field raises an AttributeError by default + page = BigPage(SelectFields(fields=fields)) + with pytest.raises(AttributeError, match=expected_attribute_error_msg): + await page.to_item() + with pytest.raises(AttributeError, match=expected_attribute_error_msg): + assert page.fields_to_extract + + page = BigPage(SelectFields(fields=fields, on_unknown_field="raise")) + with pytest.raises(AttributeError, match=expected_attribute_error_msg): + await page.to_item() + with pytest.raises(AttributeError, match=expected_attribute_error_msg): + assert page.fields_to_extract + + # It should safely ignore it as well. + page = BigPage(SelectFields(fields=fields, on_unknown_field="ignore")) + with warnings.catch_warnings(record=True) as caught_warnings: + item = await page.to_item() + assert not caught_warnings + assert item == BigItem(x=1, y=2, z=None) + with warnings.catch_warnings(record=True) as caught_warnings: + assert page.fields_to_extract == ["x", "y"] + assert not caught_warnings + assert page.call_counter == {"x": 1, "y": 1} + + # When 'warn' is used, the same msg when 'raise' is used. + page = BigPage(SelectFields(fields=fields, on_unknown_field="warn")) + with pytest.warns(UserWarning, match=expected_attribute_error_msg): + item = await page.to_item() + assert item == BigItem(x=1, y=2, z=None) + with pytest.warns(UserWarning, match=expected_attribute_error_msg): + assert page.fields_to_extract == ["x", "y"] + assert page.call_counter == {"x": 1, "y": 1} + + # When SelectFields receive an invalid 'on_unknown_field' value, it should + # ignore it. Moreover, mypy should also raise an error (see + # tests_typing/test_fields) + page = BigPage( + # ignore mypy error since it's expecting a valid value inside the Literal. + SelectFields(fields=fields, on_unknown_field="bad val") # type: ignore[arg-type] + ) + item = await page.to_item() + assert item == BigItem(x=1, y=2, z=None) + assert page.fields_to_extract == ["x", "y"] + assert page.call_counter == {"x": 1, "y": 1} diff --git a/tests_typing/test_fields.mypy-testing b/tests_typing/test_fields.mypy-testing index c92630dd..6a1e667b 100644 --- a/tests_typing/test_fields.mypy-testing +++ b/tests_typing/test_fields.mypy-testing @@ -6,6 +6,7 @@ from web_poet import ( field, item_from_fields, item_from_fields_sync, + SelectFields, ) @@ -138,3 +139,12 @@ async def test_item_from_fields_default_item_cls() -> None: page = Page() item1 = await item_from_fields(page) reveal_type(item1) # R: builtins.dict[Any, Any] + + +@pytest.mark.mypy_testing +def test_select_fields_on_unknown_field() -> None: + SelectFields(fields={}, on_unknown_field="raise") + SelectFields(fields={}, on_unknown_field="warn") + SelectFields(fields={}, on_unknown_field="ignore") + + SelectFields(fields={}, on_unknown_field="bad value") # E: Argument "on_unknown_field" to "SelectFields" has incompatible type "Literal['bad value']"; expected "Literal['ignore', 'warn', 'raise']" [arg-type] diff --git a/web_poet/__init__.py b/web_poet/__init__.py index 3c8b0112..acf7b7d7 100644 --- a/web_poet/__init__.py +++ b/web_poet/__init__.py @@ -1,4 +1,4 @@ -from .fields import field, item_from_fields, item_from_fields_sync +from .fields import SelectFields, field, item_from_fields, item_from_fields_sync from .page_inputs import ( BrowserHtml, HttpClient, diff --git a/web_poet/fields.py b/web_poet/fields.py index ca92d4ac..948ac866 100644 --- a/web_poet/fields.py +++ b/web_poet/fields.py @@ -5,7 +5,17 @@ import inspect from contextlib import suppress from functools import update_wrapper, wraps -from typing import Callable, Dict, List, Optional, Type, TypeVar +from typing import ( + Callable, + Dict, + Iterable, + List, + Literal, + MutableMapping, + Optional, + Type, + TypeVar, +) import attrs from itemadapter import ItemAdapter @@ -172,13 +182,18 @@ def get_fields_dict( T = TypeVar("T") +UnknownFieldActions = Literal["ignore", "warn", "raise"] # FIXME: type is ignored as a workaround for https://github.com/python/mypy/issues/3737 # inference works properly if a non-default item_cls is passed; for dict # it's not working (return type is Any) async def item_from_fields( - obj, item_cls: Type[T] = dict, *, skip_nonitem_fields: bool = False # type: ignore[assignment] + obj, + item_cls: Type[T] = dict, # type: ignore[assignment] + *, + skip_nonitem_fields: bool = False, + field_names: Optional[Iterable[str]] = None, ) -> T: """Return an item of ``item_cls`` type, with its attributes populated from the ``obj`` methods decorated with :class:`field` decorator. @@ -190,8 +205,13 @@ async def item_from_fields( to ``item_cls.__init__``, possibly causing exceptions if ``item_cls.__init__`` doesn't support them. """ - item_dict = item_from_fields_sync(obj, item_cls=dict, skip_nonitem_fields=False) - field_names = list(item_dict.keys()) + item_dict = item_from_fields_sync( + obj, + item_cls=dict, + skip_nonitem_fields=False, + field_names=field_names, + ) + field_names = field_names or list(item_dict.keys()) if skip_nonitem_fields: field_names = _without_unsupported_field_names(item_cls, field_names) return item_cls( @@ -200,19 +220,43 @@ async def item_from_fields( def item_from_fields_sync( - obj, item_cls: Type[T] = dict, *, skip_nonitem_fields: bool = False # type: ignore[assignment] + obj, + item_cls: Type[T] = dict, # type: ignore[assignment] + *, + skip_nonitem_fields: bool = False, + field_names: Optional[Iterable[str]] = None, ) -> T: """Synchronous version of :func:`item_from_fields`.""" - field_names = list(get_fields_dict(obj)) + if field_names is None: + field_names = list(get_fields_dict(obj)) if skip_nonitem_fields: field_names = _without_unsupported_field_names(item_cls, field_names) return item_cls(**{name: getattr(obj, name) for name in field_names}) def _without_unsupported_field_names( - item_cls: type, field_names: List[str] + item_cls: type, field_names: Iterable[str] ) -> List[str]: item_field_names = ItemAdapter.get_field_names_from_class(item_cls) if item_field_names is None: # item_cls doesn't define field names upfront - return field_names[:] + return list(field_names) return list(set(field_names) & set(item_field_names)) + + +@attrs.define +class SelectFields: + """This is used as a dependency in a page object to control which fields it + would populate the item class that it returns. + """ + + #: Fields that the page object would use to populate its item class. It's a + #: mapping of field names to boolean values to where ``True`` would indicate + #: it being included in ``.to_item()`` calls. + fields: Optional[MutableMapping[str, bool]] = None + + #: Controls what happens when encountering an unknown field. For example, + #: an unknown field was passed and the page object doesn't recognize it. + #: + #: Setting it to ``"raise"`` would raise an :class:`AttributeError`, + #: ``"warn"`` produces a :class:`UserWarning`, while ``"ignore"`` does nothing. + on_unknown_field: UnknownFieldActions = "raise" diff --git a/web_poet/pages.py b/web_poet/pages.py index 77b39af3..733eb371 100644 --- a/web_poet/pages.py +++ b/web_poet/pages.py @@ -1,13 +1,20 @@ import abc -import typing +from typing import Any, Generic, Iterable, Optional, Set, Type, TypeVar +from warnings import warn import attr from web_poet._typing import get_item_cls -from web_poet.fields import FieldsMixin, item_from_fields +from web_poet.fields import ( + FieldsMixin, + SelectFields, + UnknownFieldActions, + get_fields_dict, + item_from_fields, +) from web_poet.mixins import ResponseShortcutsMixin from web_poet.page_inputs import HttpResponse -from web_poet.utils import _create_deprecated_class +from web_poet.utils import _create_deprecated_class, get_fq_class_name class Injectable(abc.ABC, FieldsMixin): @@ -31,21 +38,21 @@ class Injectable(abc.ABC, FieldsMixin): Injectable.register(type(None)) -def is_injectable(cls: typing.Any) -> bool: +def is_injectable(cls: Any) -> bool: """Return True if ``cls`` is a class which inherits from :class:`~.Injectable`.""" return isinstance(cls, type) and issubclass(cls, Injectable) -ItemT = typing.TypeVar("ItemT") +ItemT = TypeVar("ItemT") -class Returns(typing.Generic[ItemT]): +class Returns(Generic[ItemT]): """Inherit from this generic mixin to change the item class used by :class:`~.ItemPage`""" @property - def item_cls(self) -> typing.Type[ItemT]: + def item_cls(self) -> Type[ItemT]: """Item class""" return get_item_cls(self.__class__, default=dict) @@ -64,9 +71,74 @@ def __init_subclass__(cls, skip_nonitem_fields: bool = False, **kwargs): async def to_item(self) -> ItemT: """Extract an item from a web page""" return await item_from_fields( - self, item_cls=self.item_cls, skip_nonitem_fields=self._skip_nonitem_fields + self, + item_cls=self.item_cls, + skip_nonitem_fields=self._skip_nonitem_fields, + field_names=self.fields_to_extract, ) + def _get_select_fields(self) -> Optional[SelectFields]: + select_fields = getattr(self, "select_fields", None) + if not select_fields: + return None + + if not isinstance(select_fields, SelectFields): + return None + + return select_fields + + @property + def fields_to_extract(self) -> Iterable[str]: + """Returns an Iterable of field names which should populate the designated + ``item_cls``. + + If :class:`web_poet.fields.SelectFields` is set, this takes into account + the fields that are marked as included or excluded alongside any field + that are marked as enabled/disabled by default. + """ + select_fields = self._get_select_fields() + if select_fields is None: + return list(get_fields_dict(self)) + + fields = select_fields.fields + + if fields is None or len(fields) == 0: + return list(get_fields_dict(self)) + + page_obj_fields = get_fields_dict(self, include_disabled=True) + + unknown_fields = set(fields) - set(page_obj_fields.keys()).union({"*"}) + if unknown_fields: + self._handle_unknown_field(unknown_fields, select_fields.on_unknown_field) + + fields_to_extract = [] + for name, field_info in page_obj_fields.items(): + if fields.get("*") is False and fields.get(name) is not True: + continue + if ( + fields.get("*") is True + or fields.get(name) is True + or field_info.disabled is False + ) and fields.get(name) is not False: + fields_to_extract.append(name) + + return fields_to_extract + + def _handle_unknown_field( + self, name: Set[str], action: UnknownFieldActions = "raise" + ) -> None: + if action == "ignore": + return None + + msg = ( + f"The {name} fields isn't available in {get_fq_class_name(self.__class__)}." + ) + + if action == "raise": + raise AttributeError(msg) + elif action == "warn": + warn(msg) + @attr.s(auto_attribs=True) class WebPage(ItemPage[ItemT], ResponseShortcutsMixin):