diff --git a/function_pipe/core/function_pipe.py b/function_pipe/core/function_pipe.py index 27839cf..710f3ea 100644 --- a/function_pipe/core/function_pipe.py +++ b/function_pipe/core/function_pipe.py @@ -9,6 +9,8 @@ Common usage: import function_pipe as fpn """ +from __future__ import annotations + import copy import enum import functools @@ -20,41 +22,50 @@ # ------------------------------------------------------------------------------- # FunctionNode and utilities +@tp.runtime_checkable +class FuncT(tp.Protocol): + def __call__(self, *args: tp.Any, **kwargs: tp.Any) -> tp.Any: + ... + -FN = tp.TypeVar("FN", bound="FunctionNode") -PN = tp.TypeVar("PN", bound="PipeNode") -PNI = tp.TypeVar("PNI", bound="PipeNodeInput") -PipeNodeDescriptorT = tp.TypeVar("PipeNodeDescriptorT", bound="PipeNodeDescriptor") -KeyPosition = tp.Union[tp.Callable, str] -HandlerT = tp.Callable[[tp.Any], tp.Callable] +_DecoratorT = tp.TypeVar("_DecoratorT", bound=FuncT) +Decorator = tp.Callable[[_DecoratorT], _DecoratorT] +UnaryFunc = tp.Callable[["FunctionNode"], FuncT] +BinaryFunc = tp.Callable[["FunctionNode", tp.Any], FuncT] -def compose(*funcs: tp.Callable) -> FN: + +def compose(*funcs: FuncT) -> FunctionNode: """ Given a list of functions, execute them from right to left, passing the returned value of the right f to the left f. Store the reduced function in a FunctionNode """ - # call right first, then left of each pair; each reduction returns a function - reducer = functools.reduce( - lambda f, g: lambda *args, **kwargs: f(g(*args, **kwargs)), funcs - ) + + def reducer_func(f: FuncT, g: FuncT) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return f(g(*args, **kwargs)) + + return inner + + reducer = functools.reduce(reducer_func, funcs) + # args are reversed to show execution from right to left - return FunctionNode( # type: ignore + return FunctionNode( reducer, doc_function=compose, doc_args=tuple(reversed(funcs)), ) -def _wrap_unary(func: tp.Callable[[FN], FN]) -> tp.Callable[[FN], FN]: +def _wrap_unary(func: UnaryFunc) -> UnaryFunc: """ Decorator for operator overloads. Given a higher order function that takes one args, wrap it in a FunctionNode function and provide documentation labels. """ @functools.wraps(func) - def unary(lhs: FN) -> FN: + def unary(lhs: FunctionNode) -> FunctionNode: # wrapped function will prepare correct class, even if a constant - cls = PipeNode if isinstance(lhs, PipeNode) else FunctionNode + cls = lhs.__class__ return cls(func(lhs), doc_function=func, doc_args=(lhs,)) return unary @@ -62,16 +73,17 @@ def unary(lhs: FN) -> FN: def _wrap_binary( operation: str, lhs_name: str, rhs_name: str, clause: str -) -> tp.Callable[[tp.Callable[[FN, tp.Any], FN]], tp.Callable[[FN, tp.Any], FN]]: - def binary_decorator( - func: tp.Callable[[FN, tp.Any], FN] - ) -> tp.Callable[[FN, tp.Any], FN]: - """Decorator for operators. Given a higher order function that takes two args, wrap it in a FunctionNode function and provide documentation labels.""" +) -> Decorator[BinaryFunc]: + def binary_decorator(func: BinaryFunc) -> BinaryFunc: + """ + Decorator for operators. + Given a higher order function that takes two args, wrap it in a FunctionNode function and provide documentation labels. + """ @functools.wraps(func) - def binary(lhs: FN, rhs: tp.Any) -> FN: + def binary(lhs: FunctionNode, rhs: tp.Any) -> FunctionNode: # wrapped function will prepare correct class, even if a constant - cls = PipeNode if isinstance(lhs, PipeNode) else FunctionNode + cls = lhs.__class__ return cls(func(lhs, rhs), doc_function=func, doc_args=(lhs, rhs)) binary.__doc__ = f"Return a new FunctionNode that will {operation} the result of ``{lhs_name}`` {clause} the result of ``{rhs_name}``" @@ -167,8 +179,8 @@ def _format_expression(f: tp.Any) -> str: def pretty_repr(f: tp.Any) -> str: """ - Provide a pretty string representation of a FN, PN, or anything. - If the object is a FN or PN, it will recursively represent any nested FNs/PNs. + Provide a pretty string representation of a FunctionNode, PipeNode, or anything. + If the object is a FunctionNode or PipeNode, it will recursively represent any nested FNs/PNs. """ def get_function_name(f: tp.Any) -> str: @@ -251,14 +263,19 @@ class FunctionNode: "_doc_kwargs", ) + _function: FuncT + _doc_function: FuncT + _doc_args: tuple[tp.Any, ...] + _doc_kwargs: dict[str, tp.Any] | None + # --------------------------------------------------------------------------- def __init__( - self: FN, - function: tp.Any, + self, + function: FuncT | tp.Any, *, - doc_function: tp.Optional[tp.Callable] = None, - doc_args: tp.Tuple[tp.Any, ...] = (), - doc_kwargs: tp.Optional[tp.Dict[str, tp.Any]] = None, + doc_function: tp.Callable[..., tp.Any] | None = None, + doc_args: tuple[tp.Any, ...] = (), + doc_kwargs: dict[str, tp.Any] | None = None, ) -> None: """ Args: @@ -273,30 +290,32 @@ def __init__( if isinstance(function, FunctionNode): for attr in self.__slots__: setattr(self, attr, getattr(function, attr)) + return + + if callable(function): + self._function = function else: - if callable(function): - self._function = function - else: - # if not a callable, we upgrade a constant, non function value to be a function that returns that value - self._function = lambda *args, **kwargs: function - # if not supplied, doc_function is set to function - self._doc_function = doc_function if doc_function else self._function - self._doc_args = doc_args - self._doc_kwargs = doc_kwargs + def constant_wrapper(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return function + + self._function = constant_wrapper + + # if not supplied, doc_function is set to function + self._doc_function = doc_function if doc_function else self._function + self._doc_args = doc_args + self._doc_kwargs = doc_kwargs @property - def unwrap(self: FN) -> tp.Callable: + def unwrap(self) -> FuncT: """ The doc_function should be set to the core function being wrapped, no matter the level of wrapping. """ - # if the stored function is using _pipe_kwarg_bind, need to go lower - doc_func = self - while hasattr(doc_func, "_doc_function"): - doc_func = getattr(doc_func, "_doc_function") - return doc_func + if hasattr(self, "_doc_function"): + return self._doc_function + return self - def __call__(self: FN, *args: tp.Any, **kwargs: tp.Any) -> tp.Any: + def __call__(self, *args: tp.Any, **kwargs: tp.Any) -> tp.Any: """ Call the wrapped function with args and kwargs. """ @@ -305,13 +324,13 @@ def __call__(self: FN, *args: tp.Any, **kwargs: tp.Any) -> tp.Any: except Exception as e: raise _exception_with_cleaned_tb(e) from None - def __str__(self: FN) -> str: + def __str__(self) -> str: return f"" - def __repr__(self: FN) -> str: + def __repr__(self) -> str: return f"" - def partial(self: FN, *args: tp.Any, **kwargs: tp.Any) -> "FunctionNode": + def partial(self, *args: tp.Any, **kwargs: tp.Any) -> FunctionNode: """ Return a new FunctionNode with a partialed function with args and kwargs. """ @@ -327,137 +346,164 @@ def partial(self: FN, *args: tp.Any, **kwargs: tp.Any) -> "FunctionNode": # Unary Operators @_wrap_unary - def __neg__(self: FN) -> FN: + def __neg__(self) -> FuncT: """ Return a new FunctionNode that when evaulated, will negate the result of ``self`` """ - return lambda *args, **kwargs: self(*args, **kwargs) * -1 # type: ignore + + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self(*args, **kwargs) * -1 + + return inner @_wrap_unary - def __invert__(self: FN) -> FN: + def __invert__(self) -> FuncT: """ Return a new FunctionNode that when evaulated, will invert the result of ``self`` NOTE: This is generally expected to be a Boolean inversion, such as ~ (not) applied to a Numpy, Pandas, or Static-Frame objects. """ - return lambda *args, **kwargs: self(*args, **kwargs).__invert__() # type: ignore + + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self(*args, **kwargs).__invert__() + + return inner @_wrap_unary - def __abs__(self: FN) -> FN: + def __abs__(self) -> FuncT: """ Return a new FunctionNode that when evaulated, will find the absolute value of the result of ``self`` """ - return lambda *args, **kwargs: abs(self(*args, **kwargs)) # type: ignore + + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return abs(self(*args, **kwargs)) + + return inner # --------------------------------------------------------------------------- # all binary operators return a function; the _wrap_binary decorator then wraps this function in a FunctionNode definition and supplies appropriate doc args. Note both left and righ sides are wrapped in FNs to permit operations on constants @_wrap_binary("add", "self", "rhs", "to") - def __add__(self: FN, rhs: tp.Any) -> FN: - return lambda *args, **kwargs: self(*args, **kwargs) + self.__class__(rhs)( # type: ignore - *args, **kwargs - ) + def __add__(self, rhs: tp.Any) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self(*args, **kwargs) + self.__class__(rhs)(*args, **kwargs) + + return inner @_wrap_binary("subtract", "self", "rhs", "to") - def __sub__(self: FN, rhs: tp.Any) -> FN: - return lambda *args, **kwargs: self(*args, **kwargs) - self.__class__(rhs)( # type: ignore - *args, **kwargs - ) + def __sub__(self, rhs: tp.Any) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self(*args, **kwargs) - self.__class__(rhs)(*args, **kwargs) + + return inner @_wrap_binary("multiply", "self", "rhs", "to") - def __mul__(self: FN, rhs: tp.Any) -> FN: - return lambda *args, **kwargs: self(*args, **kwargs) * self.__class__(rhs)( # type: ignore - *args, **kwargs - ) + def __mul__(self, rhs: tp.Any) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self(*args, **kwargs) * self.__class__(rhs)(*args, **kwargs) + + return inner @_wrap_binary("divide", "self", "rhs", "to") - def __truediv__(self: FN, rhs: tp.Any) -> FN: - return lambda *args, **kwargs: self(*args, **kwargs) / self.__class__(rhs)( # type: ignore - *args, **kwargs - ) + def __truediv__(self, rhs: tp.Any) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self(*args, **kwargs) / self.__class__(rhs)(*args, **kwargs) + + return inner @_wrap_binary("raise", "self", "rhs", "to") - def __pow__(self: FN, rhs: tp.Any) -> FN: - return lambda *args, **kwargs: self(*args, **kwargs) ** self.__class__(rhs)( # type: ignore - *args, **kwargs - ) + def __pow__(self, rhs: tp.Any) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self(*args, **kwargs) ** self.__class__(rhs)(*args, **kwargs) + + return inner @_wrap_binary("add", "lhs", "self", "to") - def __radd__(self: FN, lhs: tp.Any) -> FN: - return lambda *args, **kwargs: self.__class__(lhs)(*args, **kwargs) + self( # type: ignore - *args, **kwargs - ) + def __radd__(self, lhs: tp.Any) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self.__class__(lhs)(*args, **kwargs) + self(*args, **kwargs) + + return inner @_wrap_binary("subract", "lhs", "self", "to") - def __rsub__(self: FN, lhs: tp.Any) -> FN: - return lambda *args, **kwargs: self.__class__(lhs)(*args, **kwargs) - self( # type: ignore - *args, **kwargs - ) + def __rsub__(self, lhs: tp.Any) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self.__class__(lhs)(*args, **kwargs) - self(*args, **kwargs) + + return inner @_wrap_binary("multiply", "lhs", "self", "to") - def __rmul__(self: FN, lhs: tp.Any) -> FN: - return lambda *args, **kwargs: self.__class__(lhs)(*args, **kwargs) * self( # type: ignore - *args, **kwargs - ) + def __rmul__(self, lhs: tp.Any) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self.__class__(lhs)(*args, **kwargs) * self(*args, **kwargs) + + return inner @_wrap_binary("divide", "lhs", "self", "to") - def __rtruediv__(self: FN, lhs: tp.Any) -> FN: - return lambda *args, **kwargs: self.__class__(lhs)(*args, **kwargs) / self( # type: ignore - *args, **kwargs - ) + def __rtruediv__(self, lhs: tp.Any) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self.__class__(lhs)(*args, **kwargs) / self(*args, **kwargs) + + return inner @_wrap_binary("test if", "self", "rhs", "equals") - def __eq__(self: FN, rhs: tp.Any) -> FN: # type: ignore - return lambda *args, **kwargs: self(*args, **kwargs) == self.__class__(rhs)( # type: ignore - *args, **kwargs - ) + def __eq__(self, rhs: tp.Any) -> FuncT: # type: ignore + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self(*args, **kwargs) == self.__class__(rhs)(*args, **kwargs) + + return inner @_wrap_binary("test if", "self", "rhs", "is less than") - def __lt__(self: FN, rhs: tp.Any) -> FN: - return lambda *args, **kwargs: self(*args, **kwargs) < self.__class__(rhs)( # type: ignore - *args, **kwargs - ) + def __lt__(self, rhs: tp.Any) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self(*args, **kwargs) < self.__class__(rhs)(*args, **kwargs) + + return inner @_wrap_binary("test if", "self", "rhs", "is less than or equal to") - def __le__(self: FN, rhs: tp.Any) -> FN: - return lambda *args, **kwargs: self(*args, **kwargs) <= self.__class__(rhs)( # type: ignore - *args, **kwargs - ) + def __le__(self, rhs: tp.Any) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self(*args, **kwargs) <= self.__class__(rhs)(*args, **kwargs) + + return inner @_wrap_binary("test if", "self", "rhs", "is greater than") - def __gt__(self: FN, rhs: tp.Any) -> FN: - return lambda *args, **kwargs: self(*args, **kwargs) > self.__class__(rhs)( # type: ignore - *args, **kwargs - ) + def __gt__(self, rhs: tp.Any) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self(*args, **kwargs) > self.__class__(rhs)(*args, **kwargs) + + return inner @_wrap_binary("test if", "self", "rhs", "is greater than or equal to") - def __ge__(self: FN, rhs: tp.Any) -> FN: - return lambda *args, **kwargs: self(*args, **kwargs) >= self.__class__(rhs)( # type: ignore - *args, **kwargs - ) + def __ge__(self, rhs: tp.Any) -> FuncT: + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self(*args, **kwargs) >= self.__class__(rhs)(*args, **kwargs) + + return inner @_wrap_binary("test if", "self", "rhs", "does not equal") - def __ne__(self: FN, rhs: tp.Any) -> FN: # type: ignore - return lambda *args, **kwargs: self(*args, **kwargs) != self.__class__(rhs)( # type: ignore - *args, **kwargs - ) + def __ne__(self, rhs: tp.Any) -> FuncT: # type: ignore + def inner(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: + return self(*args, **kwargs) != self.__class__(rhs)(*args, **kwargs) + + return inner # --------------------------------------------------------------------------- # composition operators _composition_op_doc_template = "Composes a new FunctionNode will call ``{lhs}`` first, and then feed its result into ``{rhs}``" - def __rshift__(self: FN, rhs: tp.Callable) -> FN: + def __rshift__(self, rhs: FuncT) -> FunctionNode: return compose(rhs, self) - def __rrshift__(self: FN, lhs: tp.Callable) -> FN: + def __rrshift__(self, lhs: FuncT) -> FunctionNode: return compose(self, lhs) - def __lshift__(self: FN, rhs: tp.Callable) -> FN: + def __lshift__(self, rhs: FuncT) -> FunctionNode: return compose(self, rhs) - def __rlshift__(self: FN, lhs: tp.Callable) -> FN: + def __rlshift__(self, lhs: FuncT) -> FunctionNode: return compose(lhs, self) __rshift__.__doc__ = _composition_op_doc_template.format(lhs="rhs", rhs="self") @@ -465,11 +511,11 @@ def __rlshift__(self: FN, lhs: tp.Callable) -> FN: __lshift__.__doc__ = _composition_op_doc_template.format(lhs="rhs", rhs="self") __rlshift__.__doc__ = _composition_op_doc_template.format(lhs="self", rhs="lhs") - def __or__(self: FN, rhs: FN) -> FN: + def __or__(self, rhs: FunctionNode) -> FunctionNode: """Only implemented for PipeNode.""" raise NotImplementedError() - def __ror__(self: FN, lhs: FN) -> FN: + def __ror__(self, lhs: FunctionNode) -> FunctionNode: """Only implemented for PipeNode.""" raise NotImplementedError() @@ -510,15 +556,20 @@ class State(enum.Enum): ) # --------------------------------------------------------------------------- + _call_state: State | None + _predecessor: PipeNode | None + + # --------------------------------------------------------------------------- + def __init__( - self: PN, - function: tp.Any, + self, + function: FuncT | tp.Any, *, - doc_function: tp.Optional[tp.Callable] = None, - doc_args: tp.Tuple[tp.Any, ...] = (), - doc_kwargs: tp.Optional[tp.Dict[str, tp.Any]] = None, - call_state: tp.Optional[State] = None, - predecessor: tp.Optional[PN] = None, + doc_function: tp.Callable[..., tp.Any] | None = None, + doc_args: tuple[tp.Any, ...] = (), + doc_kwargs: dict[str, tp.Any] | None = None, + call_state: State | None = None, + predecessor: PipeNode | None = None, ): super().__init__( function=function, @@ -529,17 +580,17 @@ def __init__( self._call_state = call_state self._predecessor = predecessor - def __str__(self: PN) -> str: + def __str__(self) -> str: if self._call_state is PipeNode.State.FACTORY: return f"" return f"" - def __repr__(self: PN) -> str: + def __repr__(self) -> str: if self._call_state is PipeNode.State.FACTORY: return f"" return f"" - def partial(self: PN, *args: str, **kwargs: str) -> PN: + def partial(self, *args: str, **kwargs: str) -> PipeNode: """ Partialing PipeNodes is prohibited. Use ``pipe_node_factory`` (and related) decorators to pass in expression-level arguments. """ @@ -549,12 +600,12 @@ def partial(self: PN, *args: str, **kwargs: str) -> PN: # pipe node properties @property - def call_state(self: PN) -> tp.Optional["State"]: + def call_state(self) -> tp.Optional["State"]: """The current call state of the Node""" return self._call_state @property - def predecessor(self: PN) -> tp.Optional[PN]: + def predecessor(self) -> tp.Optional[PipeNode]: """ The PipeNode preceeding this Node in a pipeline. Can be None """ @@ -563,37 +614,41 @@ def predecessor(self: PN) -> tp.Optional[PN]: # --------------------------------------------------------------------------- # composition operators - def __rshift__(self: PN, rhs: tp.Callable) -> PN: + def __rshift__(self, rhs: FuncT) -> PipeNode: """Only implemented for FunctionNode.""" raise NotImplementedError() - def __rrshift__(self: PN, lhs: tp.Callable) -> PN: + def __rrshift__(self, lhs: FuncT) -> PipeNode: """Only implemented for FunctionNode.""" raise NotImplementedError() - def __lshift__(self: PN, rhs: tp.Callable) -> PN: + def __lshift__(self, rhs: FuncT) -> PipeNode: """Only implemented for FunctionNode.""" raise NotImplementedError() - def __rlshift__(self: PN, lhs: tp.Callable) -> PN: + def __rlshift__(self, lhs: FuncT) -> PipeNode: """Only implemented for FunctionNode.""" raise NotImplementedError() - def __or__(self: PN, rhs: PN) -> PN: + def __or__(self, rhs: PipeNode) -> PipeNode: # type: ignore """ Invokes ``rhs``, passing in ``self`` as the kwarg ``PREDECESSOR_PN``. """ - return rhs(**{PREDECESSOR_PN: self}) + pn = rhs(**{PREDECESSOR_PN: self}) + assert isinstance(pn, PipeNode) + return pn - def __ror__(self: PN, lhs: PN) -> PN: + def __ror__(self, lhs: PipeNode) -> PipeNode: # type: ignore """ Invokes ``lhs``, passing in ``lhs`` as the kwarg ``PREDECESSOR_PN``. """ - return self(**{PREDECESSOR_PN: lhs}) + pn = self(**{PREDECESSOR_PN: lhs}) + assert isinstance(pn, PipeNode) + return pn # --------------------------------------------------------------------------- - def __getitem__(self: PN, pn_input: tp.Any) -> tp.Any: + def __getitem__(self, pn_input: tp.Any) -> tp.Any: """ Invokes ``self``, passing in ``pn_input`` as the kwarg ``PN_INPUT``. @@ -604,7 +659,7 @@ def __getitem__(self: PN, pn_input: tp.Any) -> tp.Any: pn_input = pn_input if pn_input is not None else PipeNodeInput() return self(**{PN_INPUT: pn_input}) - def __call__(self: PN, *args: tp.Any, **kwargs: tp.Any) -> tp.Any: + def __call__(self, *args: tp.Any, **kwargs: tp.Any) -> tp.Any: """ Call the wrapped function with args and kwargs. """ @@ -631,11 +686,11 @@ def __call__(self: PN, *args: tp.Any, **kwargs: tp.Any) -> tp.Any: def _broadcast( *, - factory_args: tp.Tuple[tp.Any, ...], - factory_kwargs: tp.Dict[str, tp.Any], - processing_args: tp.Tuple[tp.Any, ...] = (), - processing_kwargs: tp.Dict[str, tp.Any], -) -> tp.Tuple[tp.Tuple[tp.Any, ...], tp.Dict[str, tp.Any]]: + factory_args: tuple[tp.Any, ...], + factory_kwargs: dict[str, tp.Any], + processing_args: tuple[tp.Any, ...] = (), + processing_kwargs: dict[str, tp.Any], +) -> tuple[tuple[tp.Any, ...], dict[str, tp.Any]]: """ Factory args/kwargs are those given to pipe_node_factory at the expression level. Processing args/kwargs are those given as the initial input, and used to call all processing functions. @@ -657,7 +712,7 @@ def _broadcast( return core_callable_args, core_callable_kwargs -def _core_logger(core_callable: tp.Callable) -> tp.Callable: +def _core_logger(core_callable: FuncT) -> FuncT: """ A decorator to provide output on the execution of each core callable call. Alternative decorators can be used to partial pipe_node_factory and pipe_node. """ @@ -670,7 +725,7 @@ def wrapped(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: return wrapped -def _has_key_positions(*key_positions: KeyPosition) -> bool: +def _has_key_positions(key_positions: tuple[FuncT] | tuple[str, ...]) -> bool: """ Returns whether or not key_positions is a list of key positions, or if it is just a single callable """ @@ -678,7 +733,7 @@ def _has_key_positions(*key_positions: KeyPosition) -> bool: def is_unbound_self_method( - core_callable: tp.Union[classmethod, staticmethod, tp.Callable], + core_callable: classmethod | staticmethod | FuncT, *, self_keyword: str, ) -> bool: @@ -698,17 +753,15 @@ def is_unbound_self_method( return bool(argspec.args and argspec.args[0] == self_keyword) -def _pipe_kwarg_bind( - *key_positions: KeyPosition, -) -> tp.Callable[[tp.Callable], tp.Callable]: +def _pipe_kwarg_bind(*key_positions: str) -> Decorator[FuncT]: """ - Binds a specific PN labels wrapped up in **kwargs to the first n positional arguments of the core callable + Binds a specific PipeNode labels wrapped up in **kwargs to the first n positional arguments of the core callable """ - def decorator(core_callable: tp.Callable) -> tp.Callable: + def decorator(core_callable: FuncT) -> FuncT: @functools.wraps(core_callable) def wrapped(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: - target_args = [kwargs.pop(key) for key in key_positions] # type: ignore + target_args = [kwargs.pop(key) for key in key_positions] target_kwargs = { k: v for k, v in kwargs.items() if k not in PIPE_NODE_KWARGS } @@ -731,43 +784,39 @@ class PipeNodeDescriptor: # pylint: disable=too-few-public-methods ) def __init__( - self: PipeNodeDescriptorT, - core_callable: tp.Callable, - core_handler: HandlerT, - key_positions: tp.Optional[tp.Tuple[KeyPosition, ...]] = None, + self, + core_callable: FuncT, + core_handler: Decorator[FuncT], + key_positions: tuple[str, ...] | None = None, ) -> None: self.core_callable = core_callable self.core_handler = core_handler self.key_positions = key_positions - def __get__( - self: PipeNodeDescriptorT, - instance: tp.Any, - owner: tp.Any, - ) -> tp.Callable: + def __get__(self, instance: tp.Any, owner: tp.Any) -> FuncT: """ Returns a callable that will be bound to the instance/owner, and then passed along the pipeline. """ - core_callable: tp.Callable = self.core_callable.__get__(instance, owner) # type: ignore + core_callable: FuncT = self.core_callable.__get__(instance, owner) # type: ignore if self.key_positions is not None: core_callable = _pipe_kwarg_bind(*self.key_positions)(core_callable) return self.core_handler(core_callable) def _handle_descriptors_and_key_positions( - *key_positions: KeyPosition, - core_handler: HandlerT, + *key_positions: FuncT | str, + core_handler: Decorator[FuncT], self_keyword: str, ) -> tp.Union[ PipeNodeDescriptor, - HandlerT, - tp.Callable[[tp.Callable], tp.Union[PipeNodeDescriptor, HandlerT]], + Decorator[FuncT], + tp.Callable[[tp.Callable], tp.Union[PipeNodeDescriptor, Decorator[FuncT]]], ]: """ We can return either a callable or a ``PipeNodeDescriptor``, OR, a decorator that when called, will return either a callable or a ``PipeNodeDescriptor``. """ - has_key_positions = _has_key_positions(*key_positions) + has_key_positions = _has_key_positions(key_positions) # See if decorator was given no arguments, and received the core_callable directly. if not has_key_positions: @@ -781,7 +830,7 @@ def _handle_descriptors_and_key_positions( def decorator_wrapper( core_callable: tp.Callable, - ) -> tp.Union[PipeNodeDescriptor, HandlerT]: + ) -> tp.Union[PipeNodeDescriptor, Decorator[FuncT]]: if is_unbound_self_method(core_callable, self_keyword=self_keyword): return PipeNodeDescriptor(core_callable, core_handler, key_positions) @@ -792,13 +841,13 @@ def decorator_wrapper( def _descriptor_factory( - *key_positions: KeyPosition, + *key_positions: FuncT | str, decorator: tp.Callable, - core_decorator: HandlerT, + core_decorator: Decorator[FuncT], emulator: tp.Any, ) -> tp.Any: - has_key_positions = _has_key_positions(*key_positions) + has_key_positions = _has_key_positions(key_positions) class Descriptor: # pylint: disable=too-few-public-methods def __init__(self, func: tp.Callable) -> None: @@ -822,7 +871,7 @@ def func(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: if not has_key_positions: [core_callable] = key_positions - return Descriptor(core_callable) # type: ignore + return Descriptor(core_callable) return Descriptor @@ -832,8 +881,8 @@ def func(*args: tp.Any, **kwargs: tp.Any) -> tp.Any: def pipe_node_factory( - *key_positions: KeyPosition, - core_decorator: HandlerT = _core_logger, + *key_positions: FuncT | str, + core_decorator: Decorator[FuncT] = _core_logger, self_keyword: str = "self", ) -> tp.Union[tp.Callable, tp.Callable[[tp.Any], PipeNode]]: """ @@ -980,7 +1029,7 @@ def process_f(*p_args: tp.Any, **p_kwargs: tp.Any) -> tp.Any: call_state=PipeNode.State.FACTORY, ) - return _handle_descriptors_and_key_positions( # type: ignore + return _handle_descriptors_and_key_positions( *key_positions, core_handler=build_factory, self_keyword=self_keyword, @@ -988,8 +1037,8 @@ def process_f(*p_args: tp.Any, **p_kwargs: tp.Any) -> tp.Any: def pipe_node( - *key_positions: KeyPosition, - core_decorator: HandlerT = _core_logger, + *key_positions: FuncT | str, + core_decorator: Decorator[FuncT] = _core_logger, self_keyword: str = "self", ) -> tp.Union[tp.Callable, PipeNode]: """ @@ -1037,9 +1086,9 @@ def create_factory_and_call_once(core_callable: tp.Callable) -> PipeNode: if not callable(pnf): raise ValueError(f"{core_callable.__qualname__} requires an instance") - return pnf() # type: ignore + return pnf() - return _handle_descriptors_and_key_positions( # type: ignore + return _handle_descriptors_and_key_positions( *key_positions, core_handler=create_factory_and_call_once, self_keyword=self_keyword, @@ -1047,7 +1096,7 @@ def create_factory_and_call_once(core_callable: tp.Callable) -> PipeNode: def classmethod_pipe_node_factory( - *key_positions: KeyPosition, core_decorator: HandlerT = _core_logger + *key_positions: FuncT | str, core_decorator: Decorator[FuncT] = _core_logger ) -> tp.Callable: """ Decorates a function to become a classmethod pipe node factory, that when given *expression-level* arguments, will return a ``PipeNode`` @@ -1095,8 +1144,8 @@ def classmethod_pipe_node_factory( def classmethod_pipe_node( - *key_positions: KeyPosition, - core_decorator: HandlerT = _core_logger, + *key_positions: FuncT | str, + core_decorator: Decorator[FuncT] = _core_logger, ) -> tp.Union[tp.Callable, PipeNode]: """ Decorates a function to become a classmethod ``PipeNode`` that takes no expression-level args. @@ -1136,7 +1185,7 @@ def classmethod_pipe_node( def staticmethod_pipe_node_factory( - *key_positions: KeyPosition, core_decorator: HandlerT = _core_logger + *key_positions: FuncT | str, core_decorator: Decorator[FuncT] = _core_logger ) -> tp.Callable: """ Decorates a function to become a staticmethod pipe node factory, that when given *expression-level* arguments, will return a ``PipeNode`` @@ -1184,8 +1233,8 @@ def staticmethod_pipe_node_factory( def staticmethod_pipe_node( - *key_positions: KeyPosition, - core_decorator: HandlerT = _core_logger, + *key_positions: FuncT | str, + core_decorator: Decorator[FuncT] = _core_logger, ) -> tp.Union[tp.Callable, PipeNode]: """ Decorates a function to become a staticmethod ``PipeNode`` that takes no expression-level args. @@ -1233,21 +1282,21 @@ class PipeNodeInput: PipeNode input to support store and recall; subclassable to expose other attributes and parameters. """ - def __init__(self: PNI) -> None: - self._store: tp.Dict[str, tp.Any] = {} + def __init__(self: PipeNodeInput) -> None: + self._store: dict[str, tp.Any] = {} - def store(self: PNI, key: str, value: tp.Any) -> None: + def store(self: PipeNodeInput, key: str, value: tp.Any) -> None: """Store ``key`` and ``value`` in the underlying store.""" if key in self._store: raise KeyError("cannot store the same key", key) self._store[key] = value - def recall(self: PNI, key: str) -> tp.Any: + def recall(self: PipeNodeInput, key: str) -> tp.Any: """Recall ``key`` from the underlying store. Can raise an ``KeyError``""" return self._store[key] @property - def store_items(self: PNI) -> tp.ItemsView[str, tp.Any]: + def store_items(self: PipeNodeInput) -> tp.ItemsView[str, tp.Any]: """Return an items view of the underlying store.""" return self._store.items() diff --git a/mypy.ini b/mypy.ini index 34d33dc..f77b212 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,16 +1,4 @@ [mypy] +strict = True +follow_imports = silent files = function_pipe/**/*.py -show_error_codes = True -warn_redundant_casts = True -warn_unused_ignores = True -warn_unreachable = True -warn_return_any = True -warn_unused_configs = True - -disable_error_code = arg-type, misc, type-arg, no-any-return - -[mypy-tasks] -ignore_errors = True - -[mypy-sketch] -ignore_errors = True diff --git a/setup.py b/setup.py index 6dfc195..a11214e 100644 --- a/setup.py +++ b/setup.py @@ -14,31 +14,32 @@ ROOT_DIR_FP = path.abspath(path.dirname(__file__)) + def get_long_description() -> str: with open(path.join(ROOT_DIR_FP, "README.rst"), encoding="utf-8") as f: return f.read() + def get_version() -> str: - with open(path.join(ROOT_DIR_FP, 'function_pipe', '__init__.py'), - encoding='utf-8') as f: + with open( + path.join(ROOT_DIR_FP, "function_pipe", "__init__.py"), encoding="utf-8" + ) as f: for l in f: - if l.startswith('__version__'): - if '#' in l: - l = l.split('#')[0].strip() - return l.split('=')[-1].strip()[1:-1] - raise ValueError('__version__ not found!') + if l.startswith("__version__"): + if "#" in l: + l = l.split("#")[0].strip() + return l.split("=")[-1].strip()[1:-1] + raise ValueError("__version__ not found!") + setup( name="function-pipe", version=get_version(), - description="Tools for extended function composition and pipelines", long_description=get_long_description(), - url="https://github.com/InvestmentSystems/function-pipe", author="Christopher Ariza, Charles Burkland", license="MIT", - # See https://pypi.python.org/pypi?%3Aaction=list_classifiers classifiers=[ "Development Status :: 5 - Production/Stable", @@ -48,12 +49,14 @@ def get_version() -> str: "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", - ], - + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + ], keywords="functionnode pipenode composition pipeline pipe", - #py_modules=["function_pipe"], # no .py! - packages=["function_pipe", + # py_modules=["function_pipe"], # no .py! + packages=[ + "function_pipe", "function_pipe.core", "function_pipe.test", - ], + ], ) diff --git a/tasks.py b/tasks.py index 1b01ed8..78aa966 100644 --- a/tasks.py +++ b/tasks.py @@ -2,12 +2,12 @@ import invoke -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------- + @invoke.task def clean(context): - """Clean doc and build artifacts - """ + """Clean doc and build artifacts""" context.run("rm -rf coverage.xml") context.run("rm -rf htmlcov") context.run("rm -rf doc/build") @@ -21,18 +21,15 @@ def clean(context): context.run("rm -rf .ipynb_checkpoints") - @invoke.task() def doc(context): - """Build docs - """ + """Build docs""" context.run(f"{sys.executable} doc/doc_build.py") @invoke.task def test(context, cov=False): - """Run tests. - """ + """Run tests.""" cmd = f"pytest -s function_pipe/test" if cov: @@ -49,26 +46,25 @@ def coverage(context): cmd = "pytest -s --color no --cov=function_pipe/core --cov-report html" context.run(cmd, echo=True) import webbrowser + webbrowser.open("htmlcov/index.html") @invoke.task def mypy(context): - """Run mypy static analysis. - """ + """Run mypy static analysis.""" context.run("mypy function_pipe/core --strict") @invoke.task def lint(context): - """Run pylint static analysis. - """ + """Run pylint static analysis.""" context.run("pylint -f colorized function_pipe") + @invoke.task(pre=(mypy, lint)) def quality(context): - """Perform all quality checks. - """ + """Perform all quality checks.""" @invoke.task @@ -100,14 +96,15 @@ def formatting(context, check=False): isort(context, check=check) -#------------------------------------------------------------------------------- +# ------------------------------------------------------------------------------- + @invoke.task(pre=(clean,)) def build(context): - """Build packages - """ + """Build packages""" context.run(f"{sys.executable} setup.py sdist") + @invoke.task(pre=(build,), post=(clean,)) def release(context): context.run("twine upload dist/* --repository function-pipe")