From fc5689b18efc8300b019312ff2f8898788f57db6 Mon Sep 17 00:00:00 2001 From: Brian Summa Date: Tue, 20 May 2025 08:42:19 -0500 Subject: [PATCH 1/4] Initial commit --- pythonik/models/acls.py | 384 +++++++++++++++ pythonik/specs/acls.py | 1001 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 1385 insertions(+) create mode 100644 pythonik/models/acls.py create mode 100644 pythonik/specs/acls.py diff --git a/pythonik/models/acls.py b/pythonik/models/acls.py new file mode 100644 index 0000000..45c5f92 --- /dev/null +++ b/pythonik/models/acls.py @@ -0,0 +1,384 @@ +""" +Iconik Acls Models +This module contains Pydantic models for the Iconik Acls API. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import List, Literal, Optional +from uuid import UUID + +from pydantic import BaseModel, Field + + +class UsersSchema(BaseModel): + """Represents a UsersSchema in the Iconik system.""" + + users: Optional[List["UsersCheckAclSchema"]] = Field(default_factory=list) + + +class UsersCheckAclSchema(BaseModel): + """Represents a UsersCheckAclSchema in the Iconik system.""" + + group_ids: Optional[List[str]] = Field(default_factory=list) + user_id: Optional[UUID] = None + + +class UserIDsSchema(BaseModel): + """Represents a UserIDsSchema in the Iconik system.""" + + user_ids: Optional[List[UUID]] = Field(default_factory=list) + + +class UserACLSchema(BaseModel): + """Represents a UserACLSchema in the Iconik system.""" + + date_created: Optional[datetime] = None + date_modified: Optional[datetime] = None + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + user_id: Optional[UUID] = None + + +class UserACLBaseSchema(BaseModel): + """Represents a UserACLBaseSchema in the Iconik system.""" + + permissions: List[str] + user_id: Optional[UUID] = None + + +class SharesACLSchema(BaseModel): + """Represents a SharesACLSchema in the Iconik system.""" + + first_url: Optional[str] = None + last_url: Optional[str] = None + next_url: Optional[str] = None + objects: Optional[List["ShareACLSchema"]] = Field(default_factory=list) + page: Optional[int] = Field(None, ge=-2147483648, le=2147483647) + pages: Optional[int] = Field(None, ge=-2147483648, le=2147483647) + per_page: Optional[int] = Field(None, ge=-2147483648, le=2147483647) + prev_url: Optional[str] = None + total: Optional[int] = Field( + None, ge=-9223372036854775808, le=9223372036854775807 + ) + + +class ShareACLSchema(BaseModel): + """Represents a ShareACLSchema in the Iconik system.""" + + date_created: Optional[datetime] = None + date_modified: Optional[datetime] = None + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + share_id: Optional[UUID] = None + + +class ReindexPropagatingACLSchema(BaseModel): + """Represents a ReindexPropagatingACLSchema in the Iconik system.""" + + sync_to_another_dc: Optional[bool] = None + + +class PropagatingGroupACLSchema(BaseModel): + """Represents a PropagatingGroupACLSchema in the Iconik system.""" + + group_id: Optional[UUID] = None + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + + +class PropagatingACLSchema(BaseModel): + """Represents a PropagatingACLSchema in the Iconik system.""" + + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + user_id: Optional[UUID] = None + + +class ListObjectsSchema(BaseModel): + """Represents a ListObjectsSchema in the Iconik system.""" + + first_url: Optional[str] = None + last_url: Optional[str] = None + next_url: Optional[str] = None + page: Optional[int] = Field(None, ge=-2147483648, le=2147483647) + pages: Optional[int] = Field(None, ge=-2147483648, le=2147483647) + per_page: Optional[int] = Field(None, ge=-2147483648, le=2147483647) + prev_url: Optional[str] = None + total: Optional[int] = Field( + None, ge=-9223372036854775808, le=9223372036854775807 + ) + + +class InheritedACLSchema(BaseModel): + """Represents a InheritedACLSchema in the Iconik system.""" + + collection_ids: List[str] + date_created: Optional[datetime] = None + date_modified: Optional[datetime] = None + object_key: Optional[str] = None + object_type: Optional[str] = None + + +class GroupIDsSchema(BaseModel): + """Represents a GroupIDsSchema in the Iconik system.""" + + group_ids: Optional[List[UUID]] = Field(default_factory=list) + + +class GroupACLSchema(BaseModel): + """Represents a GroupACLSchema in the Iconik system.""" + + date_created: Optional[datetime] = None + date_modified: Optional[datetime] = None + group_id: Optional[UUID] = None + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + + +class GroupACLBaseSchema(BaseModel): + """Represents a GroupACLBaseSchema in the Iconik system.""" + + group_id: Optional[UUID] = None + permissions: List[str] + + +class DeleteBulkACLsSchema(BaseModel): + """Represents a DeleteBulkACLsSchema in the Iconik system.""" + + group_ids: Optional[List[UUID]] = Field(default_factory=list) + include_assets: bool + include_collections: bool + object_ids: Optional[List[UUID]] = Field(default_factory=list) + object_type: Optional[str] = None + user_ids: Optional[List[UUID]] = Field(default_factory=list) + + +class DeleteACLsSchema(BaseModel): + """Represents a DeleteACLsSchema in the Iconik system.""" + + group_ids: Optional[List[UUID]] = Field(default_factory=list) + object_keys: Optional[List[str]] = Field(default_factory=list) + object_type: Optional[str] = None + user_ids: Optional[List[UUID]] = Field(default_factory=list) + + +class CreateShareACLsSchema(BaseModel): + """Represents a CreateShareACLsSchema in the Iconik system.""" + + object_keys: Optional[List[str]] = Field(default_factory=list) + object_type: Optional[str] = None + permissions: List[str] + share_id: Optional[UUID] = None + + +class CreateMultipleACLsSchema(BaseModel): + """Represents a CreateMultipleACLsSchema in the Iconik system.""" + + objects: List["CreateACLsSchemaMultiple"] + + +class CreateBulkACLsSchema(BaseModel): + """Represents a CreateBulkACLsSchema in the Iconik system.""" + + group_ids: Optional[List[UUID]] = Field(default_factory=list) + include_assets: bool + include_collections: bool + mode: Optional[Literal["APPEND", "OVERWRITE"]] = None + object_ids: Optional[List[UUID]] = Field(default_factory=list) + object_type: Optional[str] = None + permissions: List[str] + user_ids: Optional[List[UUID]] = Field(default_factory=list) + + +class CreateACLsSchemaMultiple(BaseModel): + """Represents a CreateACLsSchemaMultiple in the Iconik system.""" + + group_ids: Optional[List[UUID]] = Field(default_factory=list) + mode: Optional[Literal["APPEND", "OVERWRITE"]] = None + object_keys: List[str] + object_type: Optional[str] = None + permissions: List[str] + user_ids: Optional[List[UUID]] = Field(default_factory=list) + + +class CreateACLsSchema(BaseModel): + """Represents a CreateACLsSchema in the Iconik system.""" + + group_ids: Optional[List[UUID]] = Field(default_factory=list) + mode: Optional[Literal["APPEND", "OVERWRITE"]] = None + object_keys: List[str] + object_type: Optional[str] = None + permissions: List[str] + user_ids: Optional[List[UUID]] = Field(default_factory=list) + + +class CreateACLsResultSchema(BaseModel): + """Represents a CreateACLsResultSchema in the Iconik system.""" + + updated_object_keys: Optional[List[str]] = Field(default_factory=list) + + +class CopyInheritedACLSchema(BaseModel): + """Represents a CopyInheritedACLSchema in the Iconik system.""" + + date_created: Optional[datetime] = None + date_modified: Optional[datetime] = None + from_collection_ids: List[str] + object_key: str + object_type: str + + +class CombinedPermissionsSchema(BaseModel): + """Represents a CombinedPermissionsSchema in the Iconik system.""" + + permissions: Optional[List[str]] = Field(default_factory=list) + + +class CheckBulkACLsSchema(BaseModel): + """Represents a CheckBulkACLsSchema in the Iconik system.""" + + objects: List["BulkACLsObjectSchema"] + + +class BulkDeleteShareACLs(BaseModel): + """Represents a BulkDeleteShareACLs in the Iconik system.""" + + share_ids: List[UUID] + + +class BulkCreateShareACLs(BaseModel): + """Represents a BulkCreateShareACLs in the Iconik system.""" + + permissions: List[str] + share_ids: List[UUID] + + +class BulkACLsObjectSchema(BaseModel): + """Represents a BulkACLsObjectSchema in the Iconik system.""" + + object_keys: List[str] + object_type: str + permissions: List[Literal["read", "write", "delete", "change-acl"]] + + +class BulkACLSchema(BaseModel): + """Represents a BulkACLSchema in the Iconik system.""" + + access_denied: Optional[List[str]] = Field(default_factory=list) + access_granted: Optional[List[str]] = Field(default_factory=list) + + +class ACLsSchema(BaseModel): + """Represents a ACLsSchema in the Iconik system.""" + + object_keys: Optional[List[str]] = Field(default_factory=list) + + +class ACLTemplatesSchema(BaseModel): + """Represents a ACLTemplatesSchema in the Iconik system.""" + + objects: Optional[List["ACLTemplateSchema"]] = Field(default_factory=list) + + +class ACLTemplateSchema(BaseModel): + """Represents a ACLTemplateSchema in the Iconik system.""" + + date_created: Optional[datetime] = None + date_modified: Optional[datetime] = None + id: Optional[UUID] = None + name: str + + +class ACLSchema(BaseModel): + """Represents a ACLSchema in the Iconik system.""" + + groups_acl: Optional[List["GroupACLBase"]] = Field(default_factory=list) + inherited_groups_acl: Optional[List["PropagatingGroupACL"] + ] = Field(default_factory=list) + inherited_users_acl: Optional[List["PropagatingACL"] + ] = Field(default_factory=list) + propagating_groups_acl: Optional[List["PropagatingGroupACL"] + ] = Field(default_factory=list) + propagating_users_acl: Optional[List["PropagatingACL"] + ] = Field(default_factory=list) + users_acl: Optional[List["UserACLBase"]] = Field(default_factory=list) + + +class PropagatingGroupACL(BaseModel): + """Represents a PropagatingGroupACL in the Iconik system.""" + + group_id: Optional[UUID] = None + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + + +class PropagatingACL(BaseModel): + """Represents a PropagatingACL in the Iconik system.""" + + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + user_id: Optional[UUID] = None + + +class UserACLBase(BaseModel): + """Represents a UserACLBase in the Iconik system.""" + + permissions: List[str] + user_id: Optional[UUID] = None + + +class GroupACLBase(BaseModel): + """Represents a GroupACLBase in the Iconik system.""" + + group_id: Optional[UUID] = None + permissions: List[str] + + +# Update forward references +UsersSchema.model_rebuild() +UsersCheckAclSchema.model_rebuild() +UserIDsSchema.model_rebuild() +UserACLSchema.model_rebuild() +UserACLBaseSchema.model_rebuild() +SharesACLSchema.model_rebuild() +ShareACLSchema.model_rebuild() +ReindexPropagatingACLSchema.model_rebuild() +PropagatingGroupACLSchema.model_rebuild() +PropagatingACLSchema.model_rebuild() +ListObjectsSchema.model_rebuild() +InheritedACLSchema.model_rebuild() +GroupIDsSchema.model_rebuild() +GroupACLSchema.model_rebuild() +GroupACLBaseSchema.model_rebuild() +DeleteBulkACLsSchema.model_rebuild() +DeleteACLsSchema.model_rebuild() +CreateShareACLsSchema.model_rebuild() +CreateMultipleACLsSchema.model_rebuild() +CreateBulkACLsSchema.model_rebuild() +CreateACLsSchemaMultiple.model_rebuild() +CreateACLsSchema.model_rebuild() +CreateACLsResultSchema.model_rebuild() +CopyInheritedACLSchema.model_rebuild() +CombinedPermissionsSchema.model_rebuild() +CheckBulkACLsSchema.model_rebuild() +BulkDeleteShareACLs.model_rebuild() +BulkCreateShareACLs.model_rebuild() +BulkACLsObjectSchema.model_rebuild() +BulkACLSchema.model_rebuild() +ACLsSchema.model_rebuild() +ACLTemplatesSchema.model_rebuild() +ACLTemplateSchema.model_rebuild() +ACLSchema.model_rebuild() +PropagatingGroupACL.model_rebuild() +PropagatingACL.model_rebuild() +UserACLBase.model_rebuild() +GroupACLBase.model_rebuild() diff --git a/pythonik/specs/acls.py b/pythonik/specs/acls.py new file mode 100644 index 0000000..d043629 --- /dev/null +++ b/pythonik/specs/acls.py @@ -0,0 +1,1001 @@ +from typing import Any, Dict, List, Optional, Union + +from pythonik.models.base import Response + +from aiopythonik import is_pydantic_model +from aiopythonik._pythonik_patches import Spec + +from .._typing import ObjectID, ObjectKey +from ..models.acls import ( + ACLSchema, + ACLsSchema, + ACLTemplateSchema, + ACLTemplatesSchema, + BulkACLSchema, + CheckBulkACLsSchema, + CombinedPermissionsSchema, + CreateACLsResultSchema, + CreateACLsSchema, + CreateBulkACLsSchema, + CreateMultipleACLsSchema, + CreateShareACLsSchema, + DeleteACLsSchema, + DeleteBulkACLsSchema, + GroupACLSchema, + ShareACLSchema, + SharesACLSchema, + UserACLSchema, +) + + +class AclsSpec(Spec): + server = "API/acls/" + VALID_PERMISSIONS = ["change-acl", "delete", "read", "write"] + + # pylint: disable=too-many-positional-arguments + def apply_template_permissions( + self, + template_id: ObjectID, + object_type: str, + object_key: ObjectKey, + ignore_reindexing: Optional[bool] = False, + restrict_acls_collection_id: Optional[str] = None, + **kwargs, + ) -> Response: + """ + Apply template permissions to an object + + Args: + template_id: ID of the template to apply permissions for + object_type: Type of the object (e.g. "user") + object_key: ID of the object + ignore_reindexing: Ignore reindexing + restrict_acls_collection_id: Do not apply any ACLs that are not in + the collection_id provided (Parent collection normally) + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 User does not have permission + - 404 ACL template does not exist + """ + params = { + "ignore_reindexing": ignore_reindexing, + "restrict_acls_collection_id": restrict_acls_collection_id, + } + url = self.gen_url( + f"acl/templates/{template_id}/{object_type}/{object_key}/" + ) + resp = self._post(url, params=params, **kwargs) + return self.parse_response(resp, None) + + def apply_group_permissions( + self, + group_id: ObjectID, + object_type: str, + object_key: ObjectKey, + permissions: List[str], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Update or create group acl for an object + + Args: + group_id: ID of the group to apply permissions for + object_type: Type of the object + object_key: ID of the object + permissions: List of permissions to apply + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=GroupACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL does not exist + - ValueError: If invalid permissions provided + """ + if permissions and not any( + _ in permissions for _ in self.VALID_PERMISSIONS + ): + raise ValueError(f"Value must be one of {self.VALID_PERMISSIONS}") + model = GroupACLSchema(permissions=permissions) + body = model.model_dump(exclude_defaults=exclude_defaults) + url = self.gen_url(f"groups/{group_id}/acl/{object_type}/{object_key}/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, GroupACLSchema) + + def fetch_object_permissions( + self, object_type: str, object_key: ObjectKey, **kwargs + ) -> Response: + """ + List of object permissions + + Args: + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + url = self.gen_url(f"acl/{object_type}/{object_key}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, ACLSchema) + + def fetch_acl_templates(self, **kwargs) -> Response: + """ + Retrieve all ACL templates + + Args: + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ACLTemplatesSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + url = self.gen_url("acl/templates/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, ACLTemplatesSchema) + + def create_acl_template( + self, + name: str, + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Create an ACL template + + Args: + name: Name of the template + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ACLTemplateSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + model = ACLTemplateSchema(name=name) + body = model.model_dump(exclude_defaults=exclude_defaults) + url = self.gen_url("acl/templates/") + resp = self._post(url, json=body, **kwargs) + return self.parse_response(resp, ACLTemplateSchema) + + def get_acl_template(self, template_id: str, **kwargs) -> Response: + """ + Retrieve an ACL template + + Args: + template_id: ID of the template + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ACLTemplateSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 Template does not exist + """ + url = self.gen_url(f"acl/templates/{template_id}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, ACLTemplateSchema) + + def update_acl_template( + self, + template_id: str, + template: Union[ACLTemplateSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Update an ACL template + + Args: + template_id: ID of the template + template: Template data (either as ACLTemplateSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ACLTemplateSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL template does not exist + """ + body = ( + template.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(template) else template + ) + url = self.gen_url(f"acl/templates/{template_id}/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, ACLTemplateSchema) + + def partial_update_acl_template( + self, + template_id: str, + template: Union[ACLTemplateSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Partially update an ACL template + + Args: + template_id: ID of the template + template: Template data to update (either as ACLTemplateSchema + or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ACLTemplateSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL template does not exist + """ + body = ( + template.model_dump( + exclude_defaults=exclude_defaults, exclude_unset=True + ) if is_pydantic_model(template) else template + ) + url = self.gen_url(f"acl/templates/{template_id}/") + resp = self._patch(url, json=body, **kwargs) + return self.parse_response(resp, ACLTemplateSchema) + + def delete_acl_template(self, template_id: str, **kwargs) -> Response: + """ + Remove an ACL template + + Args: + template_id: ID of the template to delete + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL template does not exist + """ + url = self.gen_url(f"acl/templates/{template_id}/") + resp = self._delete(url, **kwargs) + return self.parse_response(resp, None) + + def check_objects_permission( + self, + objects: Union[CheckBulkACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Check if objects have required permission + + Args: + objects: Objects to check permissions for (either as + CheckBulkACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=BulkACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 User doesn't have permission + """ + body = ( + objects.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(objects) else objects + ) + url = self.gen_url("acl/") + resp = self._post(url, json=body, **kwargs) + return self.parse_response(resp, BulkACLSchema) + + def check_object_permission( + self, + object_type: str, + object_key: ObjectKey, + permission: str, + **kwargs, + ) -> Response: + """ + Check if a particular object has required permission + + Args: + object_type: Type of the object + object_key: ID of the object + permission: Permission to check + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 User doesn't have permission + """ + url = self.gen_url(f"acl/{object_type}/{object_key}/{permission}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, None) + + def get_combined_permissions( + self, + object_type: str, + object_key: ObjectKey, + **kwargs, + ) -> Response: + """ + List of permissions for the user + + Args: + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=CombinedPermissionsSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + url = self.gen_url(f"acl/{object_type}/{object_key}/permissions/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, CombinedPermissionsSchema) + + def check_objects_have_permission( + self, + object_type: str, + permission: str, + object_keys: Union[ACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Check if objects have required permission + + Args: + object_type: Type of the object + permission: Permission to check + object_keys: Object keys to check + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=BulkACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 User doesn't have permission + """ + body = ( + object_keys.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(object_keys) else object_keys + ) + url = self.gen_url(f"acl/{object_type}/{permission}/") + resp = self._post(url, json=body, **kwargs) + return self.parse_response(resp, BulkACLSchema) + + def create_acls( + self, + object_type: str, + acls: Union[CreateACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Create a new ACL for multiple objects + + Args: + object_type: Type of the object + acls: ACLs to create (either as CreateACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=CreateACLsResultSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + body = ( + acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls + ) + url = self.gen_url(f"acl/{object_type}/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, CreateACLsResultSchema) + + def create_bulk_acls( + self, + object_type: str, + acls: Union[CreateMultipleACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Create a new ACL for multiple objects with multiple permissions + + Args: + object_type: Type of the object + acls: ACLs to create (either as CreateMultipleACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + body = ( + acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls + ) + url = self.gen_url(f"acl/{object_type}/bulk/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, None) + + def create_acls_for_content( + self, + object_type: str, + acls: Union[CreateBulkACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Create a new ACL for content of multiple objects + + Args: + object_type: Type of the object + acls: ACLs to create (either as CreateBulkACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 501 Invalid object type + """ + body = ( + acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls + ) + url = self.gen_url(f"acl/{object_type}/content/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, None) + + def delete_acls( + self, + object_type: str, + acls: Union[DeleteACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Delete ACLs for multiple objects + + Args: + object_type: Type of the object + acls: ACLs to delete (either as DeleteACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + body = ( + acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls + ) + url = self.gen_url(f"acl/{object_type}/") + resp = self._delete(url, json=body, **kwargs) + return self.parse_response(resp, None) + + def delete_acls_for_content( + self, + object_type: str, + acls: Union[DeleteBulkACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Delete ACLs for content of multiple objects + + Args: + object_type: Type of the object + acls: ACLs to delete (either as DeleteBulkACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + body = ( + acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls + ) + url = self.gen_url(f"acl/{object_type}/content/") + resp = self._delete(url, json=body, **kwargs) + return self.parse_response(resp, None) + + def get_group_acl( + self, + group_id: ObjectID, + object_type: str, + object_key: ObjectKey, + **kwargs, + ) -> Response: + """ + List of group permissions for an object + + Args: + group_id: ID of the group + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=GroupACLSchema) + + Raises: + - 401 Token is invalid + - 404 Group doesn't have permissions + """ + url = self.gen_url(f"groups/{group_id}/acl/{object_type}/{object_key}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, GroupACLSchema) + + def check_group_permission( + self, + group_id: ObjectID, + object_type: str, + object_key: ObjectKey, + permission: str, + **kwargs, + ) -> Response: + """ + Check if group has particular permission for an object + + Args: + group_id: ID of the group + object_type: Type of the object + object_key: ID of the object + permission: Permission to check + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 Group doesn't have particular permission + """ + url = self.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/" + f"{permission}/" + ) + resp = self._get(url, **kwargs) + return self.parse_response(resp, None) + + def delete_group_acl( + self, + group_id: ObjectID, + object_type: str, + object_key: ObjectKey, + **kwargs, + ) -> Response: + """ + Delete a particular ACL by id for an object + + Args: + group_id: ID of the group + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL does not exist + """ + url = self.gen_url(f"groups/{group_id}/acl/{object_type}/{object_key}/") + resp = self._delete(url, **kwargs) + return self.parse_response(resp, None) + + def get_user_acl( + self, + user_id: ObjectID, + object_type: str, + object_key: ObjectKey, + **kwargs, + ) -> Response: + """ + List of user permissions for an object + + Args: + user_id: ID of the user + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=UserACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + url = self.gen_url(f"users/{user_id}/acl/{object_type}/{object_key}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, UserACLSchema) + + def update_user_acl( + self, + user_id: ObjectID, + object_type: str, + object_key: ObjectKey, + acl: Union[UserACLSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Update or create user ACL for an object + + Args: + user_id: ID of the user + object_type: Type of the object + object_key: ID of the object + acl: User ACL data (either as UserACLSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=UserACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL does not exist + """ + body = ( + acl.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acl) else acl + ) + url = self.gen_url(f"users/{user_id}/acl/{object_type}/{object_key}/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, UserACLSchema) + + def delete_user_acl( + self, + user_id: ObjectID, + object_type: str, + object_key: ObjectKey, + **kwargs, + ) -> Response: + """ + Delete a user ACL for an object + + Args: + user_id: ID of the user + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL does not exist + """ + url = self.gen_url(f"users/{user_id}/acl/{object_type}/{object_key}/") + resp = self._delete(url, **kwargs) + return self.parse_response(resp, None) + + def check_user_permission( + self, + user_id: ObjectID, + object_type: str, + object_key: ObjectKey, + permission: str, + **kwargs, + ) -> Response: + """ + Returns a user ACL for an object + + Args: + user_id: ID of the user + object_type: Type of the object + object_key: ID of the object + permission: Permission to check + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=UserACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 User does not have permission + """ + url = self.gen_url( + f"users/{user_id}/acl/{object_type}/{object_key}/" + f"{permission}/" + ) + resp = self._get(url, **kwargs) + return self.parse_response(resp, UserACLSchema) + + def fetch_share_acls( + self, + object_type: str, + object_key: ObjectKey, + **kwargs, + ) -> Response: + """ + List of share ACLs + + Args: + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=SharesACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + url = self.gen_url(f"shares/{object_type}/{object_key}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, SharesACLSchema) + + def create_share_acls( + self, + share_id: ObjectID, + object_type: str, + acls: Union[CreateShareACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Create a new ACL for multiple share objects + + Args: + share_id: ID of the share + object_type: Type of the object + acls: Share ACLs data (either as CreateShareACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=CreateACLsResultSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + body = ( + acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls + ) + url = self.gen_url(f"shares/{share_id}/acl/{object_type}/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, CreateACLsResultSchema) + + def get_share_acl( + self, + share_id: ObjectID, + object_type: str, + object_key: ObjectKey, + **kwargs, + ) -> Response: + """ + List of share permissions for an object + + Args: + share_id: ID of the share + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ShareACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + url = self.gen_url( + f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/" + ) + resp = self._get(url, **kwargs) + return self.parse_response(resp, ShareACLSchema) + + def create_share_acl( + self, + share_id: ObjectID, + object_type: str, + object_key: ObjectKey, + acl: Union[ShareACLSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Create a new share ACL for an object + + Args: + share_id: ID of the share + object_type: Type of the object + object_key: ID of the object + acl: Share ACL data (either as ShareACLSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ShareACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + body = ( + acl.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acl) else acl + ) + url = self.gen_url( + f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/" + ) + resp = self._post(url, json=body, **kwargs) + return self.parse_response(resp, ShareACLSchema) + + def update_share_acl( + self, + share_id: ObjectID, + object_type: str, + object_key: ObjectKey, + acl: Union[ShareACLSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Update share ACL for an object + + Args: + share_id: ID of the share + object_type: Type of the object + object_key: ID of the object + acl: Share ACL data (either as ShareACLSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ShareACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL does not exist + """ + body = ( + acl.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acl) else acl + ) + url = self.gen_url( + f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/" + ) + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, ShareACLSchema) + + def delete_share_acl( + self, + share_id: ObjectID, + object_type: str, + object_key: ObjectKey, + **kwargs, + ) -> Response: + """ + Delete a share ACL for an object + + Args: + share_id: ID of the share + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL does not exist + """ + url = self.gen_url( + f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/" + ) + resp = self._delete(url, **kwargs) + return self.parse_response(resp, None) + + def check_share_permission( + self, + share_id: ObjectID, + object_type: str, + object_key: ObjectKey, + permission: str, + **kwargs, + ) -> Response: + """ + Returns a share ACL for an object + + Args: + share_id: ID of the share + object_type: Type of the object + object_key: ID of the object + permission: Permission to check + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 User does not have permission + """ + url = self.gen_url( + f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/{permission}/" + ) + resp = self._get(url, **kwargs) + return self.parse_response(resp, None) From ccb04232fbc2acd40219ed8af462e28e1e2939d8 Mon Sep 17 00:00:00 2001 From: Brian Summa Date: Tue, 20 May 2025 10:15:36 -0500 Subject: [PATCH 2/4] feat(acls): implement ACLs API functionality Add complete ACLs API integration providing access control management features. Key additions: - AclsSpec with comprehensive permission management methods - Pydantic models for ACL request/response handling - Internal utilities for Pydantic model detection - Extensive test coverage for ACLs functionality --- pythonik/client.py | 4 + pythonik/models/acls.py | 39 +- pythonik/specs/_internal_utils.py | 31 + pythonik/specs/acls.py | 230 ++-- pythonik/tests/test_acls.py | 1443 +++++++++++++++++++++++++ pythonik/tests/test_internal_utils.py | 96 ++ 6 files changed, 1695 insertions(+), 148 deletions(-) create mode 100644 pythonik/specs/_internal_utils.py create mode 100644 pythonik/tests/test_acls.py create mode 100644 pythonik/tests/test_internal_utils.py diff --git a/pythonik/client.py b/pythonik/client.py index d76989c..c74c824 100644 --- a/pythonik/client.py +++ b/pythonik/client.py @@ -2,6 +2,7 @@ from requests import Session from requests.adapters import HTTPAdapter +from pythonik.specs.acls import AclsSpec from pythonik.specs.assets import AssetSpec from pythonik.specs.files import FilesSpec from pythonik.specs.jobs import JobSpec @@ -50,3 +51,6 @@ def search(self): def jobs(self): return JobSpec(self.session, self.timeout, self.base_url) + + def acls(self) -> AclsSpec: + return AclsSpec(self.session, self.timeout, self.base_url) diff --git a/pythonik/models/acls.py b/pythonik/models/acls.py index 45c5f92..37e501a 100644 --- a/pythonik/models/acls.py +++ b/pythonik/models/acls.py @@ -6,10 +6,17 @@ from __future__ import annotations from datetime import datetime -from typing import List, Literal, Optional +from typing import ( + List, + Literal, + Optional, +) from uuid import UUID -from pydantic import BaseModel, Field +from pydantic import ( + BaseModel, + Field, +) class UsersSchema(BaseModel): @@ -60,9 +67,9 @@ class SharesACLSchema(BaseModel): pages: Optional[int] = Field(None, ge=-2147483648, le=2147483647) per_page: Optional[int] = Field(None, ge=-2147483648, le=2147483647) prev_url: Optional[str] = None - total: Optional[int] = Field( - None, ge=-9223372036854775808, le=9223372036854775807 - ) + total: Optional[int] = Field(None, + ge=-9223372036854775808, + le=9223372036854775807) class ShareACLSchema(BaseModel): @@ -110,9 +117,9 @@ class ListObjectsSchema(BaseModel): pages: Optional[int] = Field(None, ge=-2147483648, le=2147483647) per_page: Optional[int] = Field(None, ge=-2147483648, le=2147483647) prev_url: Optional[str] = None - total: Optional[int] = Field( - None, ge=-9223372036854775808, le=9223372036854775807 - ) + total: Optional[int] = Field(None, + ge=-9223372036854775808, + le=9223372036854775807) class InheritedACLSchema(BaseModel): @@ -300,14 +307,14 @@ class ACLSchema(BaseModel): """Represents a ACLSchema in the Iconik system.""" groups_acl: Optional[List["GroupACLBase"]] = Field(default_factory=list) - inherited_groups_acl: Optional[List["PropagatingGroupACL"] - ] = Field(default_factory=list) - inherited_users_acl: Optional[List["PropagatingACL"] - ] = Field(default_factory=list) - propagating_groups_acl: Optional[List["PropagatingGroupACL"] - ] = Field(default_factory=list) - propagating_users_acl: Optional[List["PropagatingACL"] - ] = Field(default_factory=list) + inherited_groups_acl: Optional[List["PropagatingGroupACL"]] = Field( + default_factory=list) + inherited_users_acl: Optional[List["PropagatingACL"]] = Field( + default_factory=list) + propagating_groups_acl: Optional[List["PropagatingGroupACL"]] = Field( + default_factory=list) + propagating_users_acl: Optional[List["PropagatingACL"]] = Field( + default_factory=list) users_acl: Optional[List["UserACLBase"]] = Field(default_factory=list) diff --git a/pythonik/specs/_internal_utils.py b/pythonik/specs/_internal_utils.py new file mode 100644 index 0000000..58fd9e6 --- /dev/null +++ b/pythonik/specs/_internal_utils.py @@ -0,0 +1,31 @@ +from typing import Any + +from pythonik.exceptions import PythonikException + + +def is_pydantic_model(obj: Any) -> bool: + """ + Checks if an object is a Pydantic model instance. + + Args: + obj: The object to check. + + Returns: + True if the object is a Pydantic model instance, False otherwise. + """ + # Check for common Pydantic model attributes/methods + if obj is None: + return False + try: + # Pydantic v1 + has_dict_method = hasattr(obj, "dict") and callable( + getattr(obj, "dict", None)) + # Pydantic v2 + has_model_dump = hasattr(obj, "model_dump") and callable( + getattr(obj, "model_dump", None)) + # Check for schema-related attributes that are common in Pydantic models + has_schema_attrs = hasattr(obj, "__fields__") or hasattr( + obj, "model_fields") + return (has_dict_method or has_model_dump) and has_schema_attrs + except PythonikException: + return False diff --git a/pythonik/specs/acls.py b/pythonik/specs/acls.py index d043629..829c73c 100644 --- a/pythonik/specs/acls.py +++ b/pythonik/specs/acls.py @@ -1,12 +1,13 @@ -from typing import Any, Dict, List, Optional, Union - -from pythonik.models.base import Response - -from aiopythonik import is_pydantic_model -from aiopythonik._pythonik_patches import Spec +from typing import ( + Any, + Dict, + List, + Optional, + Union, +) +from uuid import UUID -from .._typing import ObjectID, ObjectKey -from ..models.acls import ( +from pythonik.models.acls import ( ACLSchema, ACLsSchema, ACLTemplateSchema, @@ -26,6 +27,10 @@ SharesACLSchema, UserACLSchema, ) +from pythonik.models.base import Response +from pythonik.specs.base import Spec + +from ._internal_utils import is_pydantic_model class AclsSpec(Spec): @@ -35,9 +40,9 @@ class AclsSpec(Spec): # pylint: disable=too-many-positional-arguments def apply_template_permissions( self, - template_id: ObjectID, + template_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], ignore_reindexing: Optional[bool] = False, restrict_acls_collection_id: Optional[str] = None, **kwargs, @@ -68,16 +73,15 @@ def apply_template_permissions( "restrict_acls_collection_id": restrict_acls_collection_id, } url = self.gen_url( - f"acl/templates/{template_id}/{object_type}/{object_key}/" - ) + f"acl/templates/{template_id}/{object_type}/{object_key}/") resp = self._post(url, params=params, **kwargs) return self.parse_response(resp, None) def apply_group_permissions( self, - group_id: ObjectID, + group_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], permissions: List[str], exclude_defaults: bool = True, **kwargs, @@ -102,19 +106,19 @@ def apply_group_permissions( - 404 ACL does not exist - ValueError: If invalid permissions provided """ - if permissions and not any( - _ in permissions for _ in self.VALID_PERMISSIONS - ): + if permissions and not any(_ in permissions + for _ in self.VALID_PERMISSIONS): raise ValueError(f"Value must be one of {self.VALID_PERMISSIONS}") model = GroupACLSchema(permissions=permissions) body = model.model_dump(exclude_defaults=exclude_defaults) - url = self.gen_url(f"groups/{group_id}/acl/{object_type}/{object_key}/") + url = self.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/") resp = self._put(url, json=body, **kwargs) return self.parse_response(resp, GroupACLSchema) - def fetch_object_permissions( - self, object_type: str, object_key: ObjectKey, **kwargs - ) -> Response: + def fetch_object_permissions(self, object_type: str, + object_key: Union[UUID, + str], **kwargs) -> Response: """ List of object permissions @@ -223,10 +227,8 @@ def update_acl_template( - 401 Token is invalid - 404 ACL template does not exist """ - body = ( - template.model_dump(exclude_defaults=exclude_defaults) - if is_pydantic_model(template) else template - ) + body = (template.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(template) else template) url = self.gen_url(f"acl/templates/{template_id}/") resp = self._put(url, json=body, **kwargs) return self.parse_response(resp, ACLTemplateSchema) @@ -256,11 +258,9 @@ def partial_update_acl_template( - 401 Token is invalid - 404 ACL template does not exist """ - body = ( - template.model_dump( - exclude_defaults=exclude_defaults, exclude_unset=True - ) if is_pydantic_model(template) else template - ) + body = (template.model_dump(exclude_defaults=exclude_defaults, + exclude_unset=True) + if is_pydantic_model(template) else template) url = self.gen_url(f"acl/templates/{template_id}/") resp = self._patch(url, json=body, **kwargs) return self.parse_response(resp, ACLTemplateSchema) @@ -308,10 +308,8 @@ def check_objects_permission( - 401 Token is invalid - 403 User doesn't have permission """ - body = ( - objects.model_dump(exclude_defaults=exclude_defaults) - if is_pydantic_model(objects) else objects - ) + body = (objects.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(objects) else objects) url = self.gen_url("acl/") resp = self._post(url, json=body, **kwargs) return self.parse_response(resp, BulkACLSchema) @@ -319,7 +317,7 @@ def check_objects_permission( def check_object_permission( self, object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], permission: str, **kwargs, ) -> Response: @@ -347,7 +345,7 @@ def check_object_permission( def get_combined_permissions( self, object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], **kwargs, ) -> Response: """ @@ -395,10 +393,8 @@ def check_objects_have_permission( - 401 Token is invalid - 403 User doesn't have permission """ - body = ( - object_keys.model_dump(exclude_defaults=exclude_defaults) - if is_pydantic_model(object_keys) else object_keys - ) + body = (object_keys.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(object_keys) else object_keys) url = self.gen_url(f"acl/{object_type}/{permission}/") resp = self._post(url, json=body, **kwargs) return self.parse_response(resp, BulkACLSchema) @@ -426,10 +422,8 @@ def create_acls( - 400 Bad request - 401 Token is invalid """ - body = ( - acls.model_dump(exclude_defaults=exclude_defaults) - if is_pydantic_model(acls) else acls - ) + body = (acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls) url = self.gen_url(f"acl/{object_type}/") resp = self._put(url, json=body, **kwargs) return self.parse_response(resp, CreateACLsResultSchema) @@ -457,10 +451,8 @@ def create_bulk_acls( - 400 Bad request - 401 Token is invalid """ - body = ( - acls.model_dump(exclude_defaults=exclude_defaults) - if is_pydantic_model(acls) else acls - ) + body = (acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls) url = self.gen_url(f"acl/{object_type}/bulk/") resp = self._put(url, json=body, **kwargs) return self.parse_response(resp, None) @@ -489,10 +481,8 @@ def create_acls_for_content( - 401 Token is invalid - 501 Invalid object type """ - body = ( - acls.model_dump(exclude_defaults=exclude_defaults) - if is_pydantic_model(acls) else acls - ) + body = (acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls) url = self.gen_url(f"acl/{object_type}/content/") resp = self._put(url, json=body, **kwargs) return self.parse_response(resp, None) @@ -520,10 +510,8 @@ def delete_acls( - 400 Bad request - 401 Token is invalid """ - body = ( - acls.model_dump(exclude_defaults=exclude_defaults) - if is_pydantic_model(acls) else acls - ) + body = (acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls) url = self.gen_url(f"acl/{object_type}/") resp = self._delete(url, json=body, **kwargs) return self.parse_response(resp, None) @@ -551,19 +539,17 @@ def delete_acls_for_content( - 400 Bad request - 401 Token is invalid """ - body = ( - acls.model_dump(exclude_defaults=exclude_defaults) - if is_pydantic_model(acls) else acls - ) + body = (acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls) url = self.gen_url(f"acl/{object_type}/content/") resp = self._delete(url, json=body, **kwargs) return self.parse_response(resp, None) def get_group_acl( self, - group_id: ObjectID, + group_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], **kwargs, ) -> Response: """ @@ -582,15 +568,16 @@ def get_group_acl( - 401 Token is invalid - 404 Group doesn't have permissions """ - url = self.gen_url(f"groups/{group_id}/acl/{object_type}/{object_key}/") + url = self.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/") resp = self._get(url, **kwargs) return self.parse_response(resp, GroupACLSchema) def check_group_permission( self, - group_id: ObjectID, + group_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], permission: str, **kwargs, ) -> Response: @@ -612,18 +599,16 @@ def check_group_permission( - 401 Token is invalid - 403 Group doesn't have particular permission """ - url = self.gen_url( - f"groups/{group_id}/acl/{object_type}/{object_key}/" - f"{permission}/" - ) + url = self.gen_url(f"groups/{group_id}/acl/{object_type}/{object_key}/" + f"{permission}/") resp = self._get(url, **kwargs) return self.parse_response(resp, None) def delete_group_acl( self, - group_id: ObjectID, + group_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], **kwargs, ) -> Response: """ @@ -643,15 +628,16 @@ def delete_group_acl( - 401 Token is invalid - 404 ACL does not exist """ - url = self.gen_url(f"groups/{group_id}/acl/{object_type}/{object_key}/") + url = self.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/") resp = self._delete(url, **kwargs) return self.parse_response(resp, None) def get_user_acl( self, - user_id: ObjectID, + user_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], **kwargs, ) -> Response: """ @@ -676,9 +662,9 @@ def get_user_acl( def update_user_acl( self, - user_id: ObjectID, + user_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], acl: Union[UserACLSchema, Dict[str, Any]], exclude_defaults: bool = True, **kwargs, @@ -702,19 +688,17 @@ def update_user_acl( - 401 Token is invalid - 404 ACL does not exist """ - body = ( - acl.model_dump(exclude_defaults=exclude_defaults) - if is_pydantic_model(acl) else acl - ) + body = (acl.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acl) else acl) url = self.gen_url(f"users/{user_id}/acl/{object_type}/{object_key}/") resp = self._put(url, json=body, **kwargs) return self.parse_response(resp, UserACLSchema) def delete_user_acl( self, - user_id: ObjectID, + user_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], **kwargs, ) -> Response: """ @@ -740,9 +724,9 @@ def delete_user_acl( def check_user_permission( self, - user_id: ObjectID, + user_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], permission: str, **kwargs, ) -> Response: @@ -764,17 +748,15 @@ def check_user_permission( - 401 Token is invalid - 403 User does not have permission """ - url = self.gen_url( - f"users/{user_id}/acl/{object_type}/{object_key}/" - f"{permission}/" - ) + url = self.gen_url(f"users/{user_id}/acl/{object_type}/{object_key}/" + f"{permission}/") resp = self._get(url, **kwargs) return self.parse_response(resp, UserACLSchema) def fetch_share_acls( self, object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], **kwargs, ) -> Response: """ @@ -798,7 +780,7 @@ def fetch_share_acls( def create_share_acls( self, - share_id: ObjectID, + share_id: Union[UUID, str], object_type: str, acls: Union[CreateShareACLsSchema, Dict[str, Any]], exclude_defaults: bool = True, @@ -821,19 +803,17 @@ def create_share_acls( - 400 Bad request - 401 Token is invalid """ - body = ( - acls.model_dump(exclude_defaults=exclude_defaults) - if is_pydantic_model(acls) else acls - ) + body = (acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls) url = self.gen_url(f"shares/{share_id}/acl/{object_type}/") resp = self._put(url, json=body, **kwargs) return self.parse_response(resp, CreateACLsResultSchema) def get_share_acl( self, - share_id: ObjectID, + share_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], **kwargs, ) -> Response: """ @@ -852,18 +832,16 @@ def get_share_acl( - 400 Bad request - 401 Token is invalid """ - url = self.gen_url( - f"shares/{share_id}/acl/{object_type}/" - f"{object_key}/" - ) + url = self.gen_url(f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/") resp = self._get(url, **kwargs) return self.parse_response(resp, ShareACLSchema) def create_share_acl( self, - share_id: ObjectID, + share_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], acl: Union[ShareACLSchema, Dict[str, Any]], exclude_defaults: bool = True, **kwargs, @@ -886,22 +864,18 @@ def create_share_acl( - 400 Bad request - 401 Token is invalid """ - body = ( - acl.model_dump(exclude_defaults=exclude_defaults) - if is_pydantic_model(acl) else acl - ) - url = self.gen_url( - f"shares/{share_id}/acl/{object_type}/" - f"{object_key}/" - ) + body = (acl.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acl) else acl) + url = self.gen_url(f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/") resp = self._post(url, json=body, **kwargs) return self.parse_response(resp, ShareACLSchema) def update_share_acl( self, - share_id: ObjectID, + share_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], acl: Union[ShareACLSchema, Dict[str, Any]], exclude_defaults: bool = True, **kwargs, @@ -925,22 +899,18 @@ def update_share_acl( - 401 Token is invalid - 404 ACL does not exist """ - body = ( - acl.model_dump(exclude_defaults=exclude_defaults) - if is_pydantic_model(acl) else acl - ) - url = self.gen_url( - f"shares/{share_id}/acl/{object_type}/" - f"{object_key}/" - ) + body = (acl.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acl) else acl) + url = self.gen_url(f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/") resp = self._put(url, json=body, **kwargs) return self.parse_response(resp, ShareACLSchema) def delete_share_acl( self, - share_id: ObjectID, + share_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], **kwargs, ) -> Response: """ @@ -960,18 +930,16 @@ def delete_share_acl( - 401 Token is invalid - 404 ACL does not exist """ - url = self.gen_url( - f"shares/{share_id}/acl/{object_type}/" - f"{object_key}/" - ) + url = self.gen_url(f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/") resp = self._delete(url, **kwargs) return self.parse_response(resp, None) def check_share_permission( self, - share_id: ObjectID, + share_id: Union[UUID, str], object_type: str, - object_key: ObjectKey, + object_key: Union[UUID, str], permission: str, **kwargs, ) -> Response: @@ -993,9 +961,7 @@ def check_share_permission( - 401 Token is invalid - 403 User does not have permission """ - url = self.gen_url( - f"shares/{share_id}/acl/{object_type}/" - f"{object_key}/{permission}/" - ) + url = self.gen_url(f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/{permission}/") resp = self._get(url, **kwargs) return self.parse_response(resp, None) diff --git a/pythonik/tests/test_acls.py b/pythonik/tests/test_acls.py new file mode 100644 index 0000000..8802e77 --- /dev/null +++ b/pythonik/tests/test_acls.py @@ -0,0 +1,1443 @@ +# pythonik/tests/test_acls.py +import uuid + +import pytest +import requests_mock + +from pythonik.client import PythonikClient +from pythonik.models.acls import ( + ACLTemplateSchema, + CheckBulkACLsSchema, +) +from pythonik.specs.acls import AclsSpec + + +def test_apply_template_permissions(): + """Test applying template permissions to an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"acl/templates/{template_id}/{object_type}/{object_key}/") + m.post(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().apply_template_permissions( + template_id=template_id, + object_type=object_type, + object_key=object_key, + ignore_reindexing=True, + restrict_acls_collection_id="collection-uuid", + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "POST" + + # Verify query parameters - lowercase comparison for boolean values + assert "ignore_reindexing=" in m.last_request.url + assert ("restrict_acls_collection_id=collection-uuid" + in m.last_request.url) + + +def test_apply_group_permissions(): + """Test applying group permissions to an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + permissions = ["read", "write"] + + # Expected response data + response_data = { + "group_id": group_id, + "object_key": object_key, + "object_type": object_type, + "permissions": permissions, + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/") + m.put(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().apply_group_permissions( + group_id=group_id, + object_type=object_type, + object_key=object_key, + permissions=permissions, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert request_json["permissions"] == permissions + + +def test_apply_group_permissions_invalid_permission(): + """Test applying group permissions with invalid permission raises ValueError.""" + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + permissions = ["invalid_permission"] + + # Create client + client = PythonikClient(app_id=app_id, auth_token=auth_token, timeout=3) + + # Make request with invalid permissions and verify it raises ValueError + with pytest.raises(ValueError) as exc_info: + client.acls().apply_group_permissions( + group_id=group_id, + object_type=object_type, + object_key=object_key, + permissions=permissions, + ) + + # Check error message contains valid permissions + assert "Value must be one of" in str(exc_info.value) + assert "read" in str(exc_info.value) + assert "write" in str(exc_info.value) + assert "delete" in str(exc_info.value) + assert "change-acl" in str(exc_info.value) + + +def test_fetch_object_permissions(): + """Test fetching permissions for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Expected response data + response_data = { + "users_acl": [{ + "user_id": str(uuid.uuid4()), + "permissions": ["read", "write"] + }], + "groups_acl": [{ + "group_id": str(uuid.uuid4()), + "permissions": ["read"] + }], + "propagating_users_acl": [], + "propagating_groups_acl": [], + "inherited_users_acl": [], + "inherited_groups_acl": [], + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/{object_key}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().fetch_object_permissions( + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert len(response.data.users_acl) == 1 + assert len(response.data.groups_acl) == 1 + assert response.data.users_acl[0].permissions == ["read", "write"] + assert response.data.groups_acl[0].permissions == ["read"] + + +def test_fetch_acl_templates(): + """Test fetching ACL templates.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + + # Expected response data + response_data = { + "objects": [{ + "id": template_id, + "name": "Test Template", + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + }] + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url("acl/templates/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().fetch_acl_templates() + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert len(response.data.objects) == 1 + assert str(response.data.objects[0].id) == template_id + assert response.data.objects[0].name == "Test Template" + + +def test_create_acl_template(): + """Test creating an ACL template.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + template_name = "New Template" + + # Expected response data + response_data = { + "id": template_id, + "name": template_name, + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url("acl/templates/") + m.post(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().create_acl_template(name=template_name) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "POST" + + # Verify request body + request_json = m.last_request.json() + assert request_json["name"] == template_name + + # Verify response data + assert str(response.data.id) == template_id + assert response.data.name == template_name + + +def test_get_acl_template(): + """Test getting an ACL template.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + template_name = "Test Template" + + # Expected response data + response_data = { + "id": template_id, + "name": template_name, + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/templates/{template_id}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().get_acl_template(template_id=template_id) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert str(response.data.id) == template_id + assert response.data.name == template_name + + +def test_update_acl_template(): + """Test updating an ACL template.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + template_name = "Updated Template" + + # Create template data + template_data = ACLTemplateSchema(name=template_name) + + # Expected response data + response_data = { + "id": template_id, + "name": template_name, + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/templates/{template_id}/") + m.put(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().update_acl_template( + template_id=template_id, + template=template_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert request_json["name"] == template_name + + # Verify response data + assert str(response.data.id) == template_id + assert response.data.name == template_name + + +def test_partial_update_acl_template(): + """Test partially updating an ACL template.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + template_name = "Partially Updated Template" + + # Create template data + template_data = {"name": template_name} # Using dict instead of model + + # Expected response data + response_data = { + "id": template_id, + "name": template_name, + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/templates/{template_id}/") + m.patch(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().partial_update_acl_template( + template_id=template_id, + template=template_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PATCH" + + # Verify request body + request_json = m.last_request.json() + assert request_json["name"] == template_name + + # Verify response data + assert str(response.data.id) == template_id + assert response.data.name == template_name + + +def test_delete_acl_template(): + """Test deleting an ACL template.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/templates/{template_id}/") + m.delete(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().delete_acl_template(template_id=template_id) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "DELETE" + + +def test_check_objects_permission(): + """Test checking permissions for multiple objects.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_key1 = str(uuid.uuid4()) + object_key2 = str(uuid.uuid4()) + + # Create request data + check_data = CheckBulkACLsSchema( + objects=[{ + "object_keys": [object_key1, object_key2], + "object_type": "assets", + "permissions": ["read", "write"], + }]) + + # Expected response data + response_data = { + "access_granted": [object_key1], + "access_denied": [object_key2], + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url("acl/") + m.post(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().check_objects_permission(objects=check_data) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "POST" + + # Verify request body + request_json = m.last_request.json() + assert "objects" in request_json + assert len(request_json["objects"]) == 1 + assert object_key1 in request_json["objects"][0]["object_keys"] + assert object_key2 in request_json["objects"][0]["object_keys"] + + # Verify response data + assert response.data.access_granted == [object_key1] + assert response.data.access_denied == [object_key2] + + +def test_check_object_permission(): + """Test checking permission for a single object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + permission = "read" + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"acl/{object_type}/{object_key}/{permission}/") + m.get(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().check_object_permission( + object_type=object_type, + object_key=object_key, + permission=permission, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "GET" + + +def test_get_combined_permissions(): + """Test getting combined permissions for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Expected response data + response_data = {"permissions": ["read", "write", "delete"]} + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"acl/{object_type}/{object_key}/permissions/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().get_combined_permissions( + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert response.data.permissions == ["read", "write", "delete"] + + +def test_check_objects_have_permission(): + """Test checking if objects have a specific permission.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + permission = "read" + object_key1 = str(uuid.uuid4()) + object_key2 = str(uuid.uuid4()) + + # Create request data + object_keys = {"object_keys": [object_key1, object_key2]} + + # Expected response data + response_data = { + "access_granted": [object_key1], + "access_denied": [object_key2], + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/{permission}/") + m.post(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().check_objects_have_permission( + object_type=object_type, + permission=permission, + object_keys=object_keys, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "POST" + + # Verify request body + request_json = m.last_request.json() + assert "object_keys" in request_json + assert object_key1 in request_json["object_keys"] + assert object_key2 in request_json["object_keys"] + + # Verify response data + assert response.data.access_granted == [object_key1] + assert response.data.access_denied == [object_key2] + + +def test_create_acls(): + """Test creating ACLs for multiple objects.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key1 = str(uuid.uuid4()) + object_key2 = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + + # Create request data with all UUIDs as strings + acls_data = { + "object_keys": [str(object_key1), + str(object_key2)], + "object_type": object_type, + "permissions": ["read", "write"], + "user_ids": [str(user_id)], + "group_ids": [str(group_id)], + "mode": "APPEND", + } + + # Expected response data + response_data = { + "updated_object_keys": [str(object_key1), + str(object_key2)] + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/") + m.put(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().create_acls( + object_type=object_type, + acls=acls_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert str(object_key1) in request_json["object_keys"] + assert str(object_key2) in request_json["object_keys"] + assert request_json["permissions"] == ["read", "write"] + assert str(user_id) in request_json["user_ids"] + assert str(group_id) in request_json["group_ids"] + assert request_json["mode"] == "APPEND" + + # Verify response data + assert response.data.updated_object_keys == [ + str(object_key1), + str(object_key2), + ] + + +def test_create_bulk_acls(): + """Test creating ACLs for multiple objects with multiple permissions.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key1 = str(uuid.uuid4()) + object_key2 = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + + # Create request data + acls_data = { + "objects": [ + { + "object_keys": [str(object_key1)], + "permissions": ["read", "write"], + "user_ids": [str(user_id)], + "mode": "APPEND", + }, + { + "object_keys": [str(object_key2)], + "permissions": ["read"], + "user_ids": [str(user_id)], + "mode": "OVERWRITE", + }, + ] + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/bulk/") + m.put(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().create_bulk_acls( + object_type=object_type, + acls=acls_data, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert "objects" in request_json + assert len(request_json["objects"]) == 2 + assert str(object_key1) in request_json["objects"][0]["object_keys"] + assert request_json["objects"][0]["permissions"] == ["read", "write"] + assert str(object_key2) in request_json["objects"][1]["object_keys"] + assert request_json["objects"][1]["permissions"] == ["read"] + + +def test_create_acls_for_content(): + """Test creating ACLs for content of multiple objects.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "collections" + object_id1 = str(uuid.uuid4()) + object_id2 = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + + # Create request data + acls_data = { + "object_ids": [str(object_id1), str(object_id2)], + "permissions": ["read", "write"], + "user_ids": [str(user_id)], + "group_ids": [str(group_id)], + "include_assets": True, + "include_collections": True, + "mode": "APPEND", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/content/") + m.put(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().create_acls_for_content( + object_type=object_type, + acls=acls_data, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert str(object_id1) in request_json["object_ids"] + assert str(object_id2) in request_json["object_ids"] + assert request_json["permissions"] == ["read", "write"] + assert str(user_id) in request_json["user_ids"] + assert str(group_id) in request_json["group_ids"] + assert request_json["include_assets"] is True + assert request_json["include_collections"] is True + assert request_json["mode"] == "APPEND" + + +def test_delete_acls(): + """Test deleting ACLs for multiple objects.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key1 = str(uuid.uuid4()) + object_key2 = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + + # Create request data + acls_data = { + "object_keys": [str(object_key1), + str(object_key2)], + "object_type": object_type, + "user_ids": [str(user_id)], + "group_ids": [str(group_id)], + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/") + m.delete(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().delete_acls( + object_type=object_type, + acls=acls_data, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "DELETE" + + # Verify request body + request_json = m.last_request.json() + assert str(object_key1) in request_json["object_keys"] + assert str(object_key2) in request_json["object_keys"] + assert str(user_id) in request_json["user_ids"] + assert str(group_id) in request_json["group_ids"] + + +def test_delete_acls_for_content(): + """Test deleting ACLs for content of multiple objects.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "collections" + object_id1 = str(uuid.uuid4()) + object_id2 = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + + # Create request data + acls_data = { + "object_ids": [str(object_id1), str(object_id2)], + "user_ids": [str(user_id)], + "group_ids": [str(group_id)], + "include_assets": True, + "include_collections": True, + "object_type": object_type, + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/content/") + m.delete(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().delete_acls_for_content( + object_type=object_type, + acls=acls_data, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "DELETE" + + # Verify request body + request_json = m.last_request.json() + assert str(object_id1) in request_json["object_ids"] + assert str(object_id2) in request_json["object_ids"] + assert str(user_id) in request_json["user_ids"] + assert str(group_id) in request_json["group_ids"] + assert request_json["include_assets"] is True + assert request_json["include_collections"] is True + + +def test_get_group_acl(): + """Test getting group ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Expected response data + response_data = { + "group_id": group_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read", "write"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().get_group_acl( + group_id=group_id, + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert str(response.data.group_id) == group_id + assert response.data.object_key == object_key + assert response.data.object_type == object_type + assert response.data.permissions == ["read", "write"] + + +def test_check_group_permission(): + """Test checking group permission for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + permission = "read" + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/{permission}/") + m.get(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().check_group_permission( + group_id=group_id, + object_type=object_type, + object_key=object_key, + permission=permission, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "GET" + + +def test_delete_group_acl(): + """Test deleting group ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/") + m.delete(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().delete_group_acl( + group_id=group_id, + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "DELETE" + + +def test_get_user_acl(): + """Test getting user ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Expected response data + response_data = { + "user_id": user_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read", "write"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"users/{user_id}/acl/{object_type}/{object_key}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().get_user_acl( + user_id=user_id, + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert str(response.data.user_id) == user_id + assert response.data.object_key == object_key + assert response.data.object_type == object_type + assert response.data.permissions == ["read", "write"] + + +def test_update_user_acl(): + """Test updating user ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Create ACL data + acl_data = {"permissions": ["read", "write"]} + + # Expected response data + response_data = { + "user_id": user_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read", "write"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"users/{user_id}/acl/{object_type}/{object_key}/") + m.put(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().update_user_acl( + user_id=user_id, + object_type=object_type, + object_key=object_key, + acl=acl_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert request_json["permissions"] == ["read", "write"] + + # Verify response data + assert str(response.data.user_id) == user_id + assert response.data.object_key == object_key + assert response.data.object_type == object_type + assert response.data.permissions == ["read", "write"] + + +def test_delete_user_acl(): + """Test deleting user ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"users/{user_id}/acl/{object_type}/{object_key}/") + m.delete(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().delete_user_acl( + user_id=user_id, + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "DELETE" + + +def test_check_user_permission(): + """Test checking user permission for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + permission = "read" + + # Expected response data + response_data = { + "user_id": user_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read", "write"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"users/{user_id}/acl/{object_type}/{object_key}/{permission}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().check_user_permission( + user_id=user_id, + object_type=object_type, + object_key=object_key, + permission=permission, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert str(response.data.user_id) == user_id + assert response.data.permissions == ["read", "write"] + + +def test_fetch_share_acls(): + """Test fetching share ACLs for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + + # Expected response data + response_data = { + "objects": [{ + "share_id": share_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + }], + "total": + 1, + "page": + 1, + "pages": + 1, + "per_page": + 10, + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"shares/{object_type}/{object_key}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().fetch_share_acls( + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert len(response.data.objects) == 1 + assert str(response.data.objects[0].share_id) == share_id + assert response.data.objects[0].object_key == object_key + assert response.data.objects[0].object_type == object_type + assert response.data.objects[0].permissions == ["read"] + + +def test_create_share_acls(): + """Test creating share ACLs for multiple objects.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + object_type = "assets" + object_key1 = str(uuid.uuid4()) + object_key2 = str(uuid.uuid4()) + + # Create ACLs data + acls_data = { + "object_keys": [str(object_key1), + str(object_key2)], + "object_type": object_type, + "permissions": ["read"], + "share_id": str(share_id), + } + + # Expected response data + response_data = { + "updated_object_keys": [str(object_key1), + str(object_key2)] + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"shares/{share_id}/acl/{object_type}/") + m.put(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().create_share_acls( + share_id=share_id, + object_type=object_type, + acls=acls_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert str(object_key1) in request_json["object_keys"] + assert str(object_key2) in request_json["object_keys"] + assert request_json["permissions"] == ["read"] + assert request_json["object_type"] == object_type + + # Verify response data + assert response.data.updated_object_keys == [ + str(object_key1), + str(object_key2), + ] + + +def test_get_share_acl(): + """Test getting share ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Expected response data + response_data = { + "share_id": share_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"shares/{share_id}/acl/{object_type}/{object_key}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().get_share_acl( + share_id=share_id, + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert str(response.data.share_id) == share_id + assert response.data.object_key == object_key + assert response.data.object_type == object_type + assert response.data.permissions == ["read"] + + +def test_create_share_acl(): + """Test creating share ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Create ACL data + acl_data = {"permissions": ["read"]} + + # Expected response data + response_data = { + "share_id": share_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"shares/{share_id}/acl/{object_type}/{object_key}/") + m.post(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().create_share_acl( + share_id=share_id, + object_type=object_type, + object_key=object_key, + acl=acl_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "POST" + + # Verify request body + request_json = m.last_request.json() + assert request_json["permissions"] == ["read"] + + # Verify response data + assert str(response.data.share_id) == share_id + assert response.data.object_key == object_key + assert response.data.object_type == object_type + assert response.data.permissions == ["read"] + + +def test_update_share_acl(): + """Test updating share ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Create ACL data + acl_data = {"permissions": ["read", "write"]} + + # Expected response data + response_data = { + "share_id": share_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read", "write"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"shares/{share_id}/acl/{object_type}/{object_key}/") + m.put(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().update_share_acl( + share_id=share_id, + object_type=object_type, + object_key=object_key, + acl=acl_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert request_json["permissions"] == ["read", "write"] + + # Verify response data + assert str(response.data.share_id) == share_id + assert response.data.object_key == object_key + assert response.data.object_type == object_type + assert response.data.permissions == ["read", "write"] + + +def test_delete_share_acl(): + """Test deleting share ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"shares/{share_id}/acl/{object_type}/{object_key}/") + m.delete(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().delete_share_acl( + share_id=share_id, + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "DELETE" + + +def test_check_share_permission(): + """Test checking share permission for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + permission = "read" + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"shares/{share_id}/acl/{object_type}/{object_key}/{permission}/") + m.get(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().check_share_permission( + share_id=share_id, + object_type=object_type, + object_key=object_key, + permission=permission, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "GET" diff --git a/pythonik/tests/test_internal_utils.py b/pythonik/tests/test_internal_utils.py new file mode 100644 index 0000000..314c72d --- /dev/null +++ b/pythonik/tests/test_internal_utils.py @@ -0,0 +1,96 @@ +# pythonik/tests/test_internal_utils.py +import uuid + +from pydantic import BaseModel + +from pythonik.exceptions import PythonikException +from pythonik.specs._internal_utils import is_pydantic_model + + +class PydanticV1StyleModel: + """Mock class that mimics a Pydantic v1 model.""" + + __fields__ = {"test": "field"} + + def dict(self): + return {"test": "value"} + + +class PydanticV2StyleModel: + """Mock class that mimics a Pydantic v2 model.""" + + model_fields = {"test": "field"} + + def model_dump(self): + return {"test": "value"} + + +class RealPydanticModel(BaseModel): + """A real Pydantic model for testing.""" + + id: str + name: str + + +class NonPydanticClass: + """A regular class that is not a Pydantic model.""" + + def __init__(self): + self.value = "test" + + +def test_is_pydantic_model_with_real_model(): + """Test is_pydantic_model with a real Pydantic model.""" + model = RealPydanticModel(id=str(uuid.uuid4()), name="Test Model") + assert is_pydantic_model(model) is True + + +def test_is_pydantic_model_with_none(): + """Test is_pydantic_model with None.""" + assert is_pydantic_model(None) is False + + +def test_is_pydantic_model_with_non_model(): + """Test is_pydantic_model with a non-model class instance.""" + non_model = NonPydanticClass() + assert is_pydantic_model(non_model) is False + + +def test_is_pydantic_model_with_v1_style_mock(): + """Test is_pydantic_model with a class that looks like a Pydantic v1 model.""" + v1_model = PydanticV1StyleModel() + assert is_pydantic_model(v1_model) is True + + +def test_is_pydantic_model_with_v2_style_mock(): + """Test is_pydantic_model with a class that looks like a Pydantic v2 model.""" + v2_model = PydanticV2StyleModel() + assert is_pydantic_model(v2_model) is True + + +def test_is_pydantic_model_with_dict(): + """Test is_pydantic_model with a dictionary.""" + dict_data = {"id": str(uuid.uuid4()), "name": "Test Dict"} + assert is_pydantic_model(dict_data) is False + + +def test_is_pydantic_model_with_exception(): + """Test is_pydantic_model when an exception is raised.""" + + class ExceptionRaisingModel: + """A model that raises an exception when properties are accessed.""" + + @property + def dict(self): + raise PythonikException("Test exception") + + @property + def model_dump(self): + raise PythonikException("Test exception") + + @property + def __fields__(self): + raise PythonikException("Test exception") + + model = ExceptionRaisingModel() + assert is_pydantic_model(model) is False From df730e82db3e69e935b9153c5f024a90c4b29a8d Mon Sep 17 00:00:00 2001 From: Brian Summa Date: Tue, 20 May 2025 11:47:13 -0500 Subject: [PATCH 3/4] fix(acls): replace UUID type with str for improved compatibility Update ACLs models and specs to use string type instead of UUID for all identifiers. This change ensures better compatibility with existing models and aligns with user expectations when calling API methods. --- pythonik/models/acls.py | 61 ++++++++++++++++++----------------- pythonik/specs/acls.py | 70 ++++++++++++++++++++--------------------- 2 files changed, 64 insertions(+), 67 deletions(-) diff --git a/pythonik/models/acls.py b/pythonik/models/acls.py index 37e501a..51c9bd4 100644 --- a/pythonik/models/acls.py +++ b/pythonik/models/acls.py @@ -11,7 +11,6 @@ Literal, Optional, ) -from uuid import UUID from pydantic import ( BaseModel, @@ -29,13 +28,13 @@ class UsersCheckAclSchema(BaseModel): """Represents a UsersCheckAclSchema in the Iconik system.""" group_ids: Optional[List[str]] = Field(default_factory=list) - user_id: Optional[UUID] = None + user_id: Optional[str] = None class UserIDsSchema(BaseModel): """Represents a UserIDsSchema in the Iconik system.""" - user_ids: Optional[List[UUID]] = Field(default_factory=list) + user_ids: Optional[List[str]] = Field(default_factory=list) class UserACLSchema(BaseModel): @@ -46,14 +45,14 @@ class UserACLSchema(BaseModel): object_key: Optional[str] = None object_type: Optional[str] = None permissions: List[str] - user_id: Optional[UUID] = None + user_id: Optional[str] = None class UserACLBaseSchema(BaseModel): """Represents a UserACLBaseSchema in the Iconik system.""" permissions: List[str] - user_id: Optional[UUID] = None + user_id: Optional[str] = None class SharesACLSchema(BaseModel): @@ -80,7 +79,7 @@ class ShareACLSchema(BaseModel): object_key: Optional[str] = None object_type: Optional[str] = None permissions: List[str] - share_id: Optional[UUID] = None + share_id: Optional[str] = None class ReindexPropagatingACLSchema(BaseModel): @@ -92,7 +91,7 @@ class ReindexPropagatingACLSchema(BaseModel): class PropagatingGroupACLSchema(BaseModel): """Represents a PropagatingGroupACLSchema in the Iconik system.""" - group_id: Optional[UUID] = None + group_id: Optional[str] = None object_key: Optional[str] = None object_type: Optional[str] = None permissions: List[str] @@ -104,7 +103,7 @@ class PropagatingACLSchema(BaseModel): object_key: Optional[str] = None object_type: Optional[str] = None permissions: List[str] - user_id: Optional[UUID] = None + user_id: Optional[str] = None class ListObjectsSchema(BaseModel): @@ -135,7 +134,7 @@ class InheritedACLSchema(BaseModel): class GroupIDsSchema(BaseModel): """Represents a GroupIDsSchema in the Iconik system.""" - group_ids: Optional[List[UUID]] = Field(default_factory=list) + group_ids: Optional[List[str]] = Field(default_factory=list) class GroupACLSchema(BaseModel): @@ -143,7 +142,7 @@ class GroupACLSchema(BaseModel): date_created: Optional[datetime] = None date_modified: Optional[datetime] = None - group_id: Optional[UUID] = None + group_id: Optional[str] = None object_key: Optional[str] = None object_type: Optional[str] = None permissions: List[str] @@ -152,28 +151,28 @@ class GroupACLSchema(BaseModel): class GroupACLBaseSchema(BaseModel): """Represents a GroupACLBaseSchema in the Iconik system.""" - group_id: Optional[UUID] = None + group_id: Optional[str] = None permissions: List[str] class DeleteBulkACLsSchema(BaseModel): """Represents a DeleteBulkACLsSchema in the Iconik system.""" - group_ids: Optional[List[UUID]] = Field(default_factory=list) + group_ids: Optional[List[str]] = Field(default_factory=list) include_assets: bool include_collections: bool - object_ids: Optional[List[UUID]] = Field(default_factory=list) + object_ids: Optional[List[str]] = Field(default_factory=list) object_type: Optional[str] = None - user_ids: Optional[List[UUID]] = Field(default_factory=list) + user_ids: Optional[List[str]] = Field(default_factory=list) class DeleteACLsSchema(BaseModel): """Represents a DeleteACLsSchema in the Iconik system.""" - group_ids: Optional[List[UUID]] = Field(default_factory=list) + group_ids: Optional[List[str]] = Field(default_factory=list) object_keys: Optional[List[str]] = Field(default_factory=list) object_type: Optional[str] = None - user_ids: Optional[List[UUID]] = Field(default_factory=list) + user_ids: Optional[List[str]] = Field(default_factory=list) class CreateShareACLsSchema(BaseModel): @@ -182,7 +181,7 @@ class CreateShareACLsSchema(BaseModel): object_keys: Optional[List[str]] = Field(default_factory=list) object_type: Optional[str] = None permissions: List[str] - share_id: Optional[UUID] = None + share_id: Optional[str] = None class CreateMultipleACLsSchema(BaseModel): @@ -194,36 +193,36 @@ class CreateMultipleACLsSchema(BaseModel): class CreateBulkACLsSchema(BaseModel): """Represents a CreateBulkACLsSchema in the Iconik system.""" - group_ids: Optional[List[UUID]] = Field(default_factory=list) + group_ids: Optional[List[str]] = Field(default_factory=list) include_assets: bool include_collections: bool mode: Optional[Literal["APPEND", "OVERWRITE"]] = None - object_ids: Optional[List[UUID]] = Field(default_factory=list) + object_ids: Optional[List[str]] = Field(default_factory=list) object_type: Optional[str] = None permissions: List[str] - user_ids: Optional[List[UUID]] = Field(default_factory=list) + user_ids: Optional[List[str]] = Field(default_factory=list) class CreateACLsSchemaMultiple(BaseModel): """Represents a CreateACLsSchemaMultiple in the Iconik system.""" - group_ids: Optional[List[UUID]] = Field(default_factory=list) + group_ids: Optional[List[str]] = Field(default_factory=list) mode: Optional[Literal["APPEND", "OVERWRITE"]] = None object_keys: List[str] object_type: Optional[str] = None permissions: List[str] - user_ids: Optional[List[UUID]] = Field(default_factory=list) + user_ids: Optional[List[str]] = Field(default_factory=list) class CreateACLsSchema(BaseModel): """Represents a CreateACLsSchema in the Iconik system.""" - group_ids: Optional[List[UUID]] = Field(default_factory=list) + group_ids: Optional[List[str]] = Field(default_factory=list) mode: Optional[Literal["APPEND", "OVERWRITE"]] = None object_keys: List[str] object_type: Optional[str] = None permissions: List[str] - user_ids: Optional[List[UUID]] = Field(default_factory=list) + user_ids: Optional[List[str]] = Field(default_factory=list) class CreateACLsResultSchema(BaseModel): @@ -257,14 +256,14 @@ class CheckBulkACLsSchema(BaseModel): class BulkDeleteShareACLs(BaseModel): """Represents a BulkDeleteShareACLs in the Iconik system.""" - share_ids: List[UUID] + share_ids: List[str] class BulkCreateShareACLs(BaseModel): """Represents a BulkCreateShareACLs in the Iconik system.""" permissions: List[str] - share_ids: List[UUID] + share_ids: List[str] class BulkACLsObjectSchema(BaseModel): @@ -299,7 +298,7 @@ class ACLTemplateSchema(BaseModel): date_created: Optional[datetime] = None date_modified: Optional[datetime] = None - id: Optional[UUID] = None + id: Optional[str] = None name: str @@ -321,7 +320,7 @@ class ACLSchema(BaseModel): class PropagatingGroupACL(BaseModel): """Represents a PropagatingGroupACL in the Iconik system.""" - group_id: Optional[UUID] = None + group_id: Optional[str] = None object_key: Optional[str] = None object_type: Optional[str] = None permissions: List[str] @@ -333,20 +332,20 @@ class PropagatingACL(BaseModel): object_key: Optional[str] = None object_type: Optional[str] = None permissions: List[str] - user_id: Optional[UUID] = None + user_id: Optional[str] = None class UserACLBase(BaseModel): """Represents a UserACLBase in the Iconik system.""" permissions: List[str] - user_id: Optional[UUID] = None + user_id: Optional[str] = None class GroupACLBase(BaseModel): """Represents a GroupACLBase in the Iconik system.""" - group_id: Optional[UUID] = None + group_id: Optional[str] = None permissions: List[str] diff --git a/pythonik/specs/acls.py b/pythonik/specs/acls.py index 829c73c..1f971e3 100644 --- a/pythonik/specs/acls.py +++ b/pythonik/specs/acls.py @@ -5,7 +5,6 @@ Optional, Union, ) -from uuid import UUID from pythonik.models.acls import ( ACLSchema, @@ -40,9 +39,9 @@ class AclsSpec(Spec): # pylint: disable=too-many-positional-arguments def apply_template_permissions( self, - template_id: Union[UUID, str], + template_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, ignore_reindexing: Optional[bool] = False, restrict_acls_collection_id: Optional[str] = None, **kwargs, @@ -79,9 +78,9 @@ def apply_template_permissions( def apply_group_permissions( self, - group_id: Union[UUID, str], + group_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, permissions: List[str], exclude_defaults: bool = True, **kwargs, @@ -116,9 +115,8 @@ def apply_group_permissions( resp = self._put(url, json=body, **kwargs) return self.parse_response(resp, GroupACLSchema) - def fetch_object_permissions(self, object_type: str, - object_key: Union[UUID, - str], **kwargs) -> Response: + def fetch_object_permissions(self, object_type: str, object_key: str, + **kwargs) -> Response: """ List of object permissions @@ -317,7 +315,7 @@ def check_objects_permission( def check_object_permission( self, object_type: str, - object_key: Union[UUID, str], + object_key: str, permission: str, **kwargs, ) -> Response: @@ -345,7 +343,7 @@ def check_object_permission( def get_combined_permissions( self, object_type: str, - object_key: Union[UUID, str], + object_key: str, **kwargs, ) -> Response: """ @@ -547,9 +545,9 @@ def delete_acls_for_content( def get_group_acl( self, - group_id: Union[UUID, str], + group_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, **kwargs, ) -> Response: """ @@ -575,9 +573,9 @@ def get_group_acl( def check_group_permission( self, - group_id: Union[UUID, str], + group_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, permission: str, **kwargs, ) -> Response: @@ -606,9 +604,9 @@ def check_group_permission( def delete_group_acl( self, - group_id: Union[UUID, str], + group_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, **kwargs, ) -> Response: """ @@ -635,9 +633,9 @@ def delete_group_acl( def get_user_acl( self, - user_id: Union[UUID, str], + user_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, **kwargs, ) -> Response: """ @@ -662,9 +660,9 @@ def get_user_acl( def update_user_acl( self, - user_id: Union[UUID, str], + user_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, acl: Union[UserACLSchema, Dict[str, Any]], exclude_defaults: bool = True, **kwargs, @@ -696,9 +694,9 @@ def update_user_acl( def delete_user_acl( self, - user_id: Union[UUID, str], + user_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, **kwargs, ) -> Response: """ @@ -724,9 +722,9 @@ def delete_user_acl( def check_user_permission( self, - user_id: Union[UUID, str], + user_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, permission: str, **kwargs, ) -> Response: @@ -756,7 +754,7 @@ def check_user_permission( def fetch_share_acls( self, object_type: str, - object_key: Union[UUID, str], + object_key: str, **kwargs, ) -> Response: """ @@ -780,7 +778,7 @@ def fetch_share_acls( def create_share_acls( self, - share_id: Union[UUID, str], + share_id: str, object_type: str, acls: Union[CreateShareACLsSchema, Dict[str, Any]], exclude_defaults: bool = True, @@ -811,9 +809,9 @@ def create_share_acls( def get_share_acl( self, - share_id: Union[UUID, str], + share_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, **kwargs, ) -> Response: """ @@ -839,9 +837,9 @@ def get_share_acl( def create_share_acl( self, - share_id: Union[UUID, str], + share_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, acl: Union[ShareACLSchema, Dict[str, Any]], exclude_defaults: bool = True, **kwargs, @@ -873,9 +871,9 @@ def create_share_acl( def update_share_acl( self, - share_id: Union[UUID, str], + share_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, acl: Union[ShareACLSchema, Dict[str, Any]], exclude_defaults: bool = True, **kwargs, @@ -908,9 +906,9 @@ def update_share_acl( def delete_share_acl( self, - share_id: Union[UUID, str], + share_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, **kwargs, ) -> Response: """ @@ -937,9 +935,9 @@ def delete_share_acl( def check_share_permission( self, - share_id: Union[UUID, str], + share_id: str, object_type: str, - object_key: Union[UUID, str], + object_key: str, permission: str, **kwargs, ) -> Response: From 59416bb0c407a6d2a9208cec09d000b7fc328d1a Mon Sep 17 00:00:00 2001 From: Brian Summa Date: Tue, 20 May 2025 15:19:24 -0500 Subject: [PATCH 4/4] Refactor to use `list_` prefix instead of `fetch_` when getting a list of objects --- pythonik/specs/acls.py | 8 ++++---- pythonik/tests/test_acls.py | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pythonik/specs/acls.py b/pythonik/specs/acls.py index 1f971e3..b7e68dc 100644 --- a/pythonik/specs/acls.py +++ b/pythonik/specs/acls.py @@ -115,8 +115,8 @@ def apply_group_permissions( resp = self._put(url, json=body, **kwargs) return self.parse_response(resp, GroupACLSchema) - def fetch_object_permissions(self, object_type: str, object_key: str, - **kwargs) -> Response: + def list_object_permissions(self, object_type: str, object_key: str, + **kwargs) -> Response: """ List of object permissions @@ -136,7 +136,7 @@ def fetch_object_permissions(self, object_type: str, object_key: str, resp = self._get(url, **kwargs) return self.parse_response(resp, ACLSchema) - def fetch_acl_templates(self, **kwargs) -> Response: + def list_acl_templates(self, **kwargs) -> Response: """ Retrieve all ACL templates @@ -751,7 +751,7 @@ def check_user_permission( resp = self._get(url, **kwargs) return self.parse_response(resp, UserACLSchema) - def fetch_share_acls( + def list_share_acls( self, object_type: str, object_key: str, diff --git a/pythonik/tests/test_acls.py b/pythonik/tests/test_acls.py index 8802e77..91a8082 100644 --- a/pythonik/tests/test_acls.py +++ b/pythonik/tests/test_acls.py @@ -156,7 +156,7 @@ def test_fetch_object_permissions(): client = PythonikClient(app_id=app_id, auth_token=auth_token, timeout=3) - response = client.acls().fetch_object_permissions( + response = client.acls().list_object_permissions( object_type=object_type, object_key=object_key, ) @@ -198,7 +198,7 @@ def test_fetch_acl_templates(): client = PythonikClient(app_id=app_id, auth_token=auth_token, timeout=3) - response = client.acls().fetch_acl_templates() + response = client.acls().list_acl_templates() # Verify the response and request assert response.response.ok @@ -1150,7 +1150,7 @@ def test_fetch_share_acls(): client = PythonikClient(app_id=app_id, auth_token=auth_token, timeout=3) - response = client.acls().fetch_share_acls( + response = client.acls().list_share_acls( object_type=object_type, object_key=object_key, )