diff --git a/pythonik/client.py b/pythonik/client.py index d76989c..c74c824 100644 --- a/pythonik/client.py +++ b/pythonik/client.py @@ -2,6 +2,7 @@ from requests import Session from requests.adapters import HTTPAdapter +from pythonik.specs.acls import AclsSpec from pythonik.specs.assets import AssetSpec from pythonik.specs.files import FilesSpec from pythonik.specs.jobs import JobSpec @@ -50,3 +51,6 @@ def search(self): def jobs(self): return JobSpec(self.session, self.timeout, self.base_url) + + def acls(self) -> AclsSpec: + return AclsSpec(self.session, self.timeout, self.base_url) diff --git a/pythonik/models/acls.py b/pythonik/models/acls.py new file mode 100644 index 0000000..51c9bd4 --- /dev/null +++ b/pythonik/models/acls.py @@ -0,0 +1,390 @@ +""" +Iconik Acls Models +This module contains Pydantic models for the Iconik Acls API. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import ( + List, + Literal, + Optional, +) + +from pydantic import ( + BaseModel, + Field, +) + + +class UsersSchema(BaseModel): + """Represents a UsersSchema in the Iconik system.""" + + users: Optional[List["UsersCheckAclSchema"]] = Field(default_factory=list) + + +class UsersCheckAclSchema(BaseModel): + """Represents a UsersCheckAclSchema in the Iconik system.""" + + group_ids: Optional[List[str]] = Field(default_factory=list) + user_id: Optional[str] = None + + +class UserIDsSchema(BaseModel): + """Represents a UserIDsSchema in the Iconik system.""" + + user_ids: Optional[List[str]] = Field(default_factory=list) + + +class UserACLSchema(BaseModel): + """Represents a UserACLSchema in the Iconik system.""" + + date_created: Optional[datetime] = None + date_modified: Optional[datetime] = None + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + user_id: Optional[str] = None + + +class UserACLBaseSchema(BaseModel): + """Represents a UserACLBaseSchema in the Iconik system.""" + + permissions: List[str] + user_id: Optional[str] = None + + +class SharesACLSchema(BaseModel): + """Represents a SharesACLSchema in the Iconik system.""" + + first_url: Optional[str] = None + last_url: Optional[str] = None + next_url: Optional[str] = None + objects: Optional[List["ShareACLSchema"]] = Field(default_factory=list) + page: Optional[int] = Field(None, ge=-2147483648, le=2147483647) + pages: Optional[int] = Field(None, ge=-2147483648, le=2147483647) + per_page: Optional[int] = Field(None, ge=-2147483648, le=2147483647) + prev_url: Optional[str] = None + total: Optional[int] = Field(None, + ge=-9223372036854775808, + le=9223372036854775807) + + +class ShareACLSchema(BaseModel): + """Represents a ShareACLSchema in the Iconik system.""" + + date_created: Optional[datetime] = None + date_modified: Optional[datetime] = None + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + share_id: Optional[str] = None + + +class ReindexPropagatingACLSchema(BaseModel): + """Represents a ReindexPropagatingACLSchema in the Iconik system.""" + + sync_to_another_dc: Optional[bool] = None + + +class PropagatingGroupACLSchema(BaseModel): + """Represents a PropagatingGroupACLSchema in the Iconik system.""" + + group_id: Optional[str] = None + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + + +class PropagatingACLSchema(BaseModel): + """Represents a PropagatingACLSchema in the Iconik system.""" + + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + user_id: Optional[str] = None + + +class ListObjectsSchema(BaseModel): + """Represents a ListObjectsSchema in the Iconik system.""" + + first_url: Optional[str] = None + last_url: Optional[str] = None + next_url: Optional[str] = None + page: Optional[int] = Field(None, ge=-2147483648, le=2147483647) + pages: Optional[int] = Field(None, ge=-2147483648, le=2147483647) + per_page: Optional[int] = Field(None, ge=-2147483648, le=2147483647) + prev_url: Optional[str] = None + total: Optional[int] = Field(None, + ge=-9223372036854775808, + le=9223372036854775807) + + +class InheritedACLSchema(BaseModel): + """Represents a InheritedACLSchema in the Iconik system.""" + + collection_ids: List[str] + date_created: Optional[datetime] = None + date_modified: Optional[datetime] = None + object_key: Optional[str] = None + object_type: Optional[str] = None + + +class GroupIDsSchema(BaseModel): + """Represents a GroupIDsSchema in the Iconik system.""" + + group_ids: Optional[List[str]] = Field(default_factory=list) + + +class GroupACLSchema(BaseModel): + """Represents a GroupACLSchema in the Iconik system.""" + + date_created: Optional[datetime] = None + date_modified: Optional[datetime] = None + group_id: Optional[str] = None + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + + +class GroupACLBaseSchema(BaseModel): + """Represents a GroupACLBaseSchema in the Iconik system.""" + + group_id: Optional[str] = None + permissions: List[str] + + +class DeleteBulkACLsSchema(BaseModel): + """Represents a DeleteBulkACLsSchema in the Iconik system.""" + + group_ids: Optional[List[str]] = Field(default_factory=list) + include_assets: bool + include_collections: bool + object_ids: Optional[List[str]] = Field(default_factory=list) + object_type: Optional[str] = None + user_ids: Optional[List[str]] = Field(default_factory=list) + + +class DeleteACLsSchema(BaseModel): + """Represents a DeleteACLsSchema in the Iconik system.""" + + group_ids: Optional[List[str]] = Field(default_factory=list) + object_keys: Optional[List[str]] = Field(default_factory=list) + object_type: Optional[str] = None + user_ids: Optional[List[str]] = Field(default_factory=list) + + +class CreateShareACLsSchema(BaseModel): + """Represents a CreateShareACLsSchema in the Iconik system.""" + + object_keys: Optional[List[str]] = Field(default_factory=list) + object_type: Optional[str] = None + permissions: List[str] + share_id: Optional[str] = None + + +class CreateMultipleACLsSchema(BaseModel): + """Represents a CreateMultipleACLsSchema in the Iconik system.""" + + objects: List["CreateACLsSchemaMultiple"] + + +class CreateBulkACLsSchema(BaseModel): + """Represents a CreateBulkACLsSchema in the Iconik system.""" + + group_ids: Optional[List[str]] = Field(default_factory=list) + include_assets: bool + include_collections: bool + mode: Optional[Literal["APPEND", "OVERWRITE"]] = None + object_ids: Optional[List[str]] = Field(default_factory=list) + object_type: Optional[str] = None + permissions: List[str] + user_ids: Optional[List[str]] = Field(default_factory=list) + + +class CreateACLsSchemaMultiple(BaseModel): + """Represents a CreateACLsSchemaMultiple in the Iconik system.""" + + group_ids: Optional[List[str]] = Field(default_factory=list) + mode: Optional[Literal["APPEND", "OVERWRITE"]] = None + object_keys: List[str] + object_type: Optional[str] = None + permissions: List[str] + user_ids: Optional[List[str]] = Field(default_factory=list) + + +class CreateACLsSchema(BaseModel): + """Represents a CreateACLsSchema in the Iconik system.""" + + group_ids: Optional[List[str]] = Field(default_factory=list) + mode: Optional[Literal["APPEND", "OVERWRITE"]] = None + object_keys: List[str] + object_type: Optional[str] = None + permissions: List[str] + user_ids: Optional[List[str]] = Field(default_factory=list) + + +class CreateACLsResultSchema(BaseModel): + """Represents a CreateACLsResultSchema in the Iconik system.""" + + updated_object_keys: Optional[List[str]] = Field(default_factory=list) + + +class CopyInheritedACLSchema(BaseModel): + """Represents a CopyInheritedACLSchema in the Iconik system.""" + + date_created: Optional[datetime] = None + date_modified: Optional[datetime] = None + from_collection_ids: List[str] + object_key: str + object_type: str + + +class CombinedPermissionsSchema(BaseModel): + """Represents a CombinedPermissionsSchema in the Iconik system.""" + + permissions: Optional[List[str]] = Field(default_factory=list) + + +class CheckBulkACLsSchema(BaseModel): + """Represents a CheckBulkACLsSchema in the Iconik system.""" + + objects: List["BulkACLsObjectSchema"] + + +class BulkDeleteShareACLs(BaseModel): + """Represents a BulkDeleteShareACLs in the Iconik system.""" + + share_ids: List[str] + + +class BulkCreateShareACLs(BaseModel): + """Represents a BulkCreateShareACLs in the Iconik system.""" + + permissions: List[str] + share_ids: List[str] + + +class BulkACLsObjectSchema(BaseModel): + """Represents a BulkACLsObjectSchema in the Iconik system.""" + + object_keys: List[str] + object_type: str + permissions: List[Literal["read", "write", "delete", "change-acl"]] + + +class BulkACLSchema(BaseModel): + """Represents a BulkACLSchema in the Iconik system.""" + + access_denied: Optional[List[str]] = Field(default_factory=list) + access_granted: Optional[List[str]] = Field(default_factory=list) + + +class ACLsSchema(BaseModel): + """Represents a ACLsSchema in the Iconik system.""" + + object_keys: Optional[List[str]] = Field(default_factory=list) + + +class ACLTemplatesSchema(BaseModel): + """Represents a ACLTemplatesSchema in the Iconik system.""" + + objects: Optional[List["ACLTemplateSchema"]] = Field(default_factory=list) + + +class ACLTemplateSchema(BaseModel): + """Represents a ACLTemplateSchema in the Iconik system.""" + + date_created: Optional[datetime] = None + date_modified: Optional[datetime] = None + id: Optional[str] = None + name: str + + +class ACLSchema(BaseModel): + """Represents a ACLSchema in the Iconik system.""" + + groups_acl: Optional[List["GroupACLBase"]] = Field(default_factory=list) + inherited_groups_acl: Optional[List["PropagatingGroupACL"]] = Field( + default_factory=list) + inherited_users_acl: Optional[List["PropagatingACL"]] = Field( + default_factory=list) + propagating_groups_acl: Optional[List["PropagatingGroupACL"]] = Field( + default_factory=list) + propagating_users_acl: Optional[List["PropagatingACL"]] = Field( + default_factory=list) + users_acl: Optional[List["UserACLBase"]] = Field(default_factory=list) + + +class PropagatingGroupACL(BaseModel): + """Represents a PropagatingGroupACL in the Iconik system.""" + + group_id: Optional[str] = None + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + + +class PropagatingACL(BaseModel): + """Represents a PropagatingACL in the Iconik system.""" + + object_key: Optional[str] = None + object_type: Optional[str] = None + permissions: List[str] + user_id: Optional[str] = None + + +class UserACLBase(BaseModel): + """Represents a UserACLBase in the Iconik system.""" + + permissions: List[str] + user_id: Optional[str] = None + + +class GroupACLBase(BaseModel): + """Represents a GroupACLBase in the Iconik system.""" + + group_id: Optional[str] = None + permissions: List[str] + + +# Update forward references +UsersSchema.model_rebuild() +UsersCheckAclSchema.model_rebuild() +UserIDsSchema.model_rebuild() +UserACLSchema.model_rebuild() +UserACLBaseSchema.model_rebuild() +SharesACLSchema.model_rebuild() +ShareACLSchema.model_rebuild() +ReindexPropagatingACLSchema.model_rebuild() +PropagatingGroupACLSchema.model_rebuild() +PropagatingACLSchema.model_rebuild() +ListObjectsSchema.model_rebuild() +InheritedACLSchema.model_rebuild() +GroupIDsSchema.model_rebuild() +GroupACLSchema.model_rebuild() +GroupACLBaseSchema.model_rebuild() +DeleteBulkACLsSchema.model_rebuild() +DeleteACLsSchema.model_rebuild() +CreateShareACLsSchema.model_rebuild() +CreateMultipleACLsSchema.model_rebuild() +CreateBulkACLsSchema.model_rebuild() +CreateACLsSchemaMultiple.model_rebuild() +CreateACLsSchema.model_rebuild() +CreateACLsResultSchema.model_rebuild() +CopyInheritedACLSchema.model_rebuild() +CombinedPermissionsSchema.model_rebuild() +CheckBulkACLsSchema.model_rebuild() +BulkDeleteShareACLs.model_rebuild() +BulkCreateShareACLs.model_rebuild() +BulkACLsObjectSchema.model_rebuild() +BulkACLSchema.model_rebuild() +ACLsSchema.model_rebuild() +ACLTemplatesSchema.model_rebuild() +ACLTemplateSchema.model_rebuild() +ACLSchema.model_rebuild() +PropagatingGroupACL.model_rebuild() +PropagatingACL.model_rebuild() +UserACLBase.model_rebuild() +GroupACLBase.model_rebuild() diff --git a/pythonik/specs/_internal_utils.py b/pythonik/specs/_internal_utils.py new file mode 100644 index 0000000..58fd9e6 --- /dev/null +++ b/pythonik/specs/_internal_utils.py @@ -0,0 +1,31 @@ +from typing import Any + +from pythonik.exceptions import PythonikException + + +def is_pydantic_model(obj: Any) -> bool: + """ + Checks if an object is a Pydantic model instance. + + Args: + obj: The object to check. + + Returns: + True if the object is a Pydantic model instance, False otherwise. + """ + # Check for common Pydantic model attributes/methods + if obj is None: + return False + try: + # Pydantic v1 + has_dict_method = hasattr(obj, "dict") and callable( + getattr(obj, "dict", None)) + # Pydantic v2 + has_model_dump = hasattr(obj, "model_dump") and callable( + getattr(obj, "model_dump", None)) + # Check for schema-related attributes that are common in Pydantic models + has_schema_attrs = hasattr(obj, "__fields__") or hasattr( + obj, "model_fields") + return (has_dict_method or has_model_dump) and has_schema_attrs + except PythonikException: + return False diff --git a/pythonik/specs/acls.py b/pythonik/specs/acls.py new file mode 100644 index 0000000..b7e68dc --- /dev/null +++ b/pythonik/specs/acls.py @@ -0,0 +1,965 @@ +from typing import ( + Any, + Dict, + List, + Optional, + Union, +) + +from pythonik.models.acls import ( + ACLSchema, + ACLsSchema, + ACLTemplateSchema, + ACLTemplatesSchema, + BulkACLSchema, + CheckBulkACLsSchema, + CombinedPermissionsSchema, + CreateACLsResultSchema, + CreateACLsSchema, + CreateBulkACLsSchema, + CreateMultipleACLsSchema, + CreateShareACLsSchema, + DeleteACLsSchema, + DeleteBulkACLsSchema, + GroupACLSchema, + ShareACLSchema, + SharesACLSchema, + UserACLSchema, +) +from pythonik.models.base import Response +from pythonik.specs.base import Spec + +from ._internal_utils import is_pydantic_model + + +class AclsSpec(Spec): + server = "API/acls/" + VALID_PERMISSIONS = ["change-acl", "delete", "read", "write"] + + # pylint: disable=too-many-positional-arguments + def apply_template_permissions( + self, + template_id: str, + object_type: str, + object_key: str, + ignore_reindexing: Optional[bool] = False, + restrict_acls_collection_id: Optional[str] = None, + **kwargs, + ) -> Response: + """ + Apply template permissions to an object + + Args: + template_id: ID of the template to apply permissions for + object_type: Type of the object (e.g. "user") + object_key: ID of the object + ignore_reindexing: Ignore reindexing + restrict_acls_collection_id: Do not apply any ACLs that are not in + the collection_id provided (Parent collection normally) + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 User does not have permission + - 404 ACL template does not exist + """ + params = { + "ignore_reindexing": ignore_reindexing, + "restrict_acls_collection_id": restrict_acls_collection_id, + } + url = self.gen_url( + f"acl/templates/{template_id}/{object_type}/{object_key}/") + resp = self._post(url, params=params, **kwargs) + return self.parse_response(resp, None) + + def apply_group_permissions( + self, + group_id: str, + object_type: str, + object_key: str, + permissions: List[str], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Update or create group acl for an object + + Args: + group_id: ID of the group to apply permissions for + object_type: Type of the object + object_key: ID of the object + permissions: List of permissions to apply + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=GroupACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL does not exist + - ValueError: If invalid permissions provided + """ + if permissions and not any(_ in permissions + for _ in self.VALID_PERMISSIONS): + raise ValueError(f"Value must be one of {self.VALID_PERMISSIONS}") + model = GroupACLSchema(permissions=permissions) + body = model.model_dump(exclude_defaults=exclude_defaults) + url = self.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, GroupACLSchema) + + def list_object_permissions(self, object_type: str, object_key: str, + **kwargs) -> Response: + """ + List of object permissions + + Args: + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + url = self.gen_url(f"acl/{object_type}/{object_key}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, ACLSchema) + + def list_acl_templates(self, **kwargs) -> Response: + """ + Retrieve all ACL templates + + Args: + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ACLTemplatesSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + url = self.gen_url("acl/templates/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, ACLTemplatesSchema) + + def create_acl_template( + self, + name: str, + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Create an ACL template + + Args: + name: Name of the template + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ACLTemplateSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + model = ACLTemplateSchema(name=name) + body = model.model_dump(exclude_defaults=exclude_defaults) + url = self.gen_url("acl/templates/") + resp = self._post(url, json=body, **kwargs) + return self.parse_response(resp, ACLTemplateSchema) + + def get_acl_template(self, template_id: str, **kwargs) -> Response: + """ + Retrieve an ACL template + + Args: + template_id: ID of the template + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ACLTemplateSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 Template does not exist + """ + url = self.gen_url(f"acl/templates/{template_id}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, ACLTemplateSchema) + + def update_acl_template( + self, + template_id: str, + template: Union[ACLTemplateSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Update an ACL template + + Args: + template_id: ID of the template + template: Template data (either as ACLTemplateSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ACLTemplateSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL template does not exist + """ + body = (template.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(template) else template) + url = self.gen_url(f"acl/templates/{template_id}/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, ACLTemplateSchema) + + def partial_update_acl_template( + self, + template_id: str, + template: Union[ACLTemplateSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Partially update an ACL template + + Args: + template_id: ID of the template + template: Template data to update (either as ACLTemplateSchema + or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ACLTemplateSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL template does not exist + """ + body = (template.model_dump(exclude_defaults=exclude_defaults, + exclude_unset=True) + if is_pydantic_model(template) else template) + url = self.gen_url(f"acl/templates/{template_id}/") + resp = self._patch(url, json=body, **kwargs) + return self.parse_response(resp, ACLTemplateSchema) + + def delete_acl_template(self, template_id: str, **kwargs) -> Response: + """ + Remove an ACL template + + Args: + template_id: ID of the template to delete + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL template does not exist + """ + url = self.gen_url(f"acl/templates/{template_id}/") + resp = self._delete(url, **kwargs) + return self.parse_response(resp, None) + + def check_objects_permission( + self, + objects: Union[CheckBulkACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Check if objects have required permission + + Args: + objects: Objects to check permissions for (either as + CheckBulkACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=BulkACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 User doesn't have permission + """ + body = (objects.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(objects) else objects) + url = self.gen_url("acl/") + resp = self._post(url, json=body, **kwargs) + return self.parse_response(resp, BulkACLSchema) + + def check_object_permission( + self, + object_type: str, + object_key: str, + permission: str, + **kwargs, + ) -> Response: + """ + Check if a particular object has required permission + + Args: + object_type: Type of the object + object_key: ID of the object + permission: Permission to check + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 User doesn't have permission + """ + url = self.gen_url(f"acl/{object_type}/{object_key}/{permission}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, None) + + def get_combined_permissions( + self, + object_type: str, + object_key: str, + **kwargs, + ) -> Response: + """ + List of permissions for the user + + Args: + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=CombinedPermissionsSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + url = self.gen_url(f"acl/{object_type}/{object_key}/permissions/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, CombinedPermissionsSchema) + + def check_objects_have_permission( + self, + object_type: str, + permission: str, + object_keys: Union[ACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Check if objects have required permission + + Args: + object_type: Type of the object + permission: Permission to check + object_keys: Object keys to check + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=BulkACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 User doesn't have permission + """ + body = (object_keys.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(object_keys) else object_keys) + url = self.gen_url(f"acl/{object_type}/{permission}/") + resp = self._post(url, json=body, **kwargs) + return self.parse_response(resp, BulkACLSchema) + + def create_acls( + self, + object_type: str, + acls: Union[CreateACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Create a new ACL for multiple objects + + Args: + object_type: Type of the object + acls: ACLs to create (either as CreateACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=CreateACLsResultSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + body = (acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls) + url = self.gen_url(f"acl/{object_type}/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, CreateACLsResultSchema) + + def create_bulk_acls( + self, + object_type: str, + acls: Union[CreateMultipleACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Create a new ACL for multiple objects with multiple permissions + + Args: + object_type: Type of the object + acls: ACLs to create (either as CreateMultipleACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + body = (acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls) + url = self.gen_url(f"acl/{object_type}/bulk/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, None) + + def create_acls_for_content( + self, + object_type: str, + acls: Union[CreateBulkACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Create a new ACL for content of multiple objects + + Args: + object_type: Type of the object + acls: ACLs to create (either as CreateBulkACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 501 Invalid object type + """ + body = (acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls) + url = self.gen_url(f"acl/{object_type}/content/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, None) + + def delete_acls( + self, + object_type: str, + acls: Union[DeleteACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Delete ACLs for multiple objects + + Args: + object_type: Type of the object + acls: ACLs to delete (either as DeleteACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + body = (acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls) + url = self.gen_url(f"acl/{object_type}/") + resp = self._delete(url, json=body, **kwargs) + return self.parse_response(resp, None) + + def delete_acls_for_content( + self, + object_type: str, + acls: Union[DeleteBulkACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Delete ACLs for content of multiple objects + + Args: + object_type: Type of the object + acls: ACLs to delete (either as DeleteBulkACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + body = (acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls) + url = self.gen_url(f"acl/{object_type}/content/") + resp = self._delete(url, json=body, **kwargs) + return self.parse_response(resp, None) + + def get_group_acl( + self, + group_id: str, + object_type: str, + object_key: str, + **kwargs, + ) -> Response: + """ + List of group permissions for an object + + Args: + group_id: ID of the group + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=GroupACLSchema) + + Raises: + - 401 Token is invalid + - 404 Group doesn't have permissions + """ + url = self.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, GroupACLSchema) + + def check_group_permission( + self, + group_id: str, + object_type: str, + object_key: str, + permission: str, + **kwargs, + ) -> Response: + """ + Check if group has particular permission for an object + + Args: + group_id: ID of the group + object_type: Type of the object + object_key: ID of the object + permission: Permission to check + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 Group doesn't have particular permission + """ + url = self.gen_url(f"groups/{group_id}/acl/{object_type}/{object_key}/" + f"{permission}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, None) + + def delete_group_acl( + self, + group_id: str, + object_type: str, + object_key: str, + **kwargs, + ) -> Response: + """ + Delete a particular ACL by id for an object + + Args: + group_id: ID of the group + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL does not exist + """ + url = self.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/") + resp = self._delete(url, **kwargs) + return self.parse_response(resp, None) + + def get_user_acl( + self, + user_id: str, + object_type: str, + object_key: str, + **kwargs, + ) -> Response: + """ + List of user permissions for an object + + Args: + user_id: ID of the user + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=UserACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + url = self.gen_url(f"users/{user_id}/acl/{object_type}/{object_key}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, UserACLSchema) + + def update_user_acl( + self, + user_id: str, + object_type: str, + object_key: str, + acl: Union[UserACLSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Update or create user ACL for an object + + Args: + user_id: ID of the user + object_type: Type of the object + object_key: ID of the object + acl: User ACL data (either as UserACLSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=UserACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL does not exist + """ + body = (acl.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acl) else acl) + url = self.gen_url(f"users/{user_id}/acl/{object_type}/{object_key}/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, UserACLSchema) + + def delete_user_acl( + self, + user_id: str, + object_type: str, + object_key: str, + **kwargs, + ) -> Response: + """ + Delete a user ACL for an object + + Args: + user_id: ID of the user + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL does not exist + """ + url = self.gen_url(f"users/{user_id}/acl/{object_type}/{object_key}/") + resp = self._delete(url, **kwargs) + return self.parse_response(resp, None) + + def check_user_permission( + self, + user_id: str, + object_type: str, + object_key: str, + permission: str, + **kwargs, + ) -> Response: + """ + Returns a user ACL for an object + + Args: + user_id: ID of the user + object_type: Type of the object + object_key: ID of the object + permission: Permission to check + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=UserACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 User does not have permission + """ + url = self.gen_url(f"users/{user_id}/acl/{object_type}/{object_key}/" + f"{permission}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, UserACLSchema) + + def list_share_acls( + self, + object_type: str, + object_key: str, + **kwargs, + ) -> Response: + """ + List of share ACLs + + Args: + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=SharesACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + url = self.gen_url(f"shares/{object_type}/{object_key}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, SharesACLSchema) + + def create_share_acls( + self, + share_id: str, + object_type: str, + acls: Union[CreateShareACLsSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Create a new ACL for multiple share objects + + Args: + share_id: ID of the share + object_type: Type of the object + acls: Share ACLs data (either as CreateShareACLsSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=CreateACLsResultSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + body = (acls.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acls) else acls) + url = self.gen_url(f"shares/{share_id}/acl/{object_type}/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, CreateACLsResultSchema) + + def get_share_acl( + self, + share_id: str, + object_type: str, + object_key: str, + **kwargs, + ) -> Response: + """ + List of share permissions for an object + + Args: + share_id: ID of the share + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ShareACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + url = self.gen_url(f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, ShareACLSchema) + + def create_share_acl( + self, + share_id: str, + object_type: str, + object_key: str, + acl: Union[ShareACLSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Create a new share ACL for an object + + Args: + share_id: ID of the share + object_type: Type of the object + object_key: ID of the object + acl: Share ACL data (either as ShareACLSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ShareACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + """ + body = (acl.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acl) else acl) + url = self.gen_url(f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/") + resp = self._post(url, json=body, **kwargs) + return self.parse_response(resp, ShareACLSchema) + + def update_share_acl( + self, + share_id: str, + object_type: str, + object_key: str, + acl: Union[ShareACLSchema, Dict[str, Any]], + exclude_defaults: bool = True, + **kwargs, + ) -> Response: + """ + Update share ACL for an object + + Args: + share_id: ID of the share + object_type: Type of the object + object_key: ID of the object + acl: Share ACL data (either as ShareACLSchema or dict) + exclude_defaults: Whether to exclude default values when dumping + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=ShareACLSchema) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL does not exist + """ + body = (acl.model_dump(exclude_defaults=exclude_defaults) + if is_pydantic_model(acl) else acl) + url = self.gen_url(f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/") + resp = self._put(url, json=body, **kwargs) + return self.parse_response(resp, ShareACLSchema) + + def delete_share_acl( + self, + share_id: str, + object_type: str, + object_key: str, + **kwargs, + ) -> Response: + """ + Delete a share ACL for an object + + Args: + share_id: ID of the share + object_type: Type of the object + object_key: ID of the object + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 404 ACL does not exist + """ + url = self.gen_url(f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/") + resp = self._delete(url, **kwargs) + return self.parse_response(resp, None) + + def check_share_permission( + self, + share_id: str, + object_type: str, + object_key: str, + permission: str, + **kwargs, + ) -> Response: + """ + Returns a share ACL for an object + + Args: + share_id: ID of the share + object_type: Type of the object + object_key: ID of the object + permission: Permission to check + **kwargs: Additional kwargs to pass to the request + + Returns: + Response(model=None) + + Raises: + - 400 Bad request + - 401 Token is invalid + - 403 User does not have permission + """ + url = self.gen_url(f"shares/{share_id}/acl/{object_type}/" + f"{object_key}/{permission}/") + resp = self._get(url, **kwargs) + return self.parse_response(resp, None) diff --git a/pythonik/tests/test_acls.py b/pythonik/tests/test_acls.py new file mode 100644 index 0000000..91a8082 --- /dev/null +++ b/pythonik/tests/test_acls.py @@ -0,0 +1,1443 @@ +# pythonik/tests/test_acls.py +import uuid + +import pytest +import requests_mock + +from pythonik.client import PythonikClient +from pythonik.models.acls import ( + ACLTemplateSchema, + CheckBulkACLsSchema, +) +from pythonik.specs.acls import AclsSpec + + +def test_apply_template_permissions(): + """Test applying template permissions to an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"acl/templates/{template_id}/{object_type}/{object_key}/") + m.post(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().apply_template_permissions( + template_id=template_id, + object_type=object_type, + object_key=object_key, + ignore_reindexing=True, + restrict_acls_collection_id="collection-uuid", + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "POST" + + # Verify query parameters - lowercase comparison for boolean values + assert "ignore_reindexing=" in m.last_request.url + assert ("restrict_acls_collection_id=collection-uuid" + in m.last_request.url) + + +def test_apply_group_permissions(): + """Test applying group permissions to an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + permissions = ["read", "write"] + + # Expected response data + response_data = { + "group_id": group_id, + "object_key": object_key, + "object_type": object_type, + "permissions": permissions, + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/") + m.put(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().apply_group_permissions( + group_id=group_id, + object_type=object_type, + object_key=object_key, + permissions=permissions, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert request_json["permissions"] == permissions + + +def test_apply_group_permissions_invalid_permission(): + """Test applying group permissions with invalid permission raises ValueError.""" + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + permissions = ["invalid_permission"] + + # Create client + client = PythonikClient(app_id=app_id, auth_token=auth_token, timeout=3) + + # Make request with invalid permissions and verify it raises ValueError + with pytest.raises(ValueError) as exc_info: + client.acls().apply_group_permissions( + group_id=group_id, + object_type=object_type, + object_key=object_key, + permissions=permissions, + ) + + # Check error message contains valid permissions + assert "Value must be one of" in str(exc_info.value) + assert "read" in str(exc_info.value) + assert "write" in str(exc_info.value) + assert "delete" in str(exc_info.value) + assert "change-acl" in str(exc_info.value) + + +def test_fetch_object_permissions(): + """Test fetching permissions for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Expected response data + response_data = { + "users_acl": [{ + "user_id": str(uuid.uuid4()), + "permissions": ["read", "write"] + }], + "groups_acl": [{ + "group_id": str(uuid.uuid4()), + "permissions": ["read"] + }], + "propagating_users_acl": [], + "propagating_groups_acl": [], + "inherited_users_acl": [], + "inherited_groups_acl": [], + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/{object_key}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().list_object_permissions( + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert len(response.data.users_acl) == 1 + assert len(response.data.groups_acl) == 1 + assert response.data.users_acl[0].permissions == ["read", "write"] + assert response.data.groups_acl[0].permissions == ["read"] + + +def test_fetch_acl_templates(): + """Test fetching ACL templates.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + + # Expected response data + response_data = { + "objects": [{ + "id": template_id, + "name": "Test Template", + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + }] + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url("acl/templates/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().list_acl_templates() + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert len(response.data.objects) == 1 + assert str(response.data.objects[0].id) == template_id + assert response.data.objects[0].name == "Test Template" + + +def test_create_acl_template(): + """Test creating an ACL template.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + template_name = "New Template" + + # Expected response data + response_data = { + "id": template_id, + "name": template_name, + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url("acl/templates/") + m.post(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().create_acl_template(name=template_name) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "POST" + + # Verify request body + request_json = m.last_request.json() + assert request_json["name"] == template_name + + # Verify response data + assert str(response.data.id) == template_id + assert response.data.name == template_name + + +def test_get_acl_template(): + """Test getting an ACL template.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + template_name = "Test Template" + + # Expected response data + response_data = { + "id": template_id, + "name": template_name, + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/templates/{template_id}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().get_acl_template(template_id=template_id) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert str(response.data.id) == template_id + assert response.data.name == template_name + + +def test_update_acl_template(): + """Test updating an ACL template.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + template_name = "Updated Template" + + # Create template data + template_data = ACLTemplateSchema(name=template_name) + + # Expected response data + response_data = { + "id": template_id, + "name": template_name, + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/templates/{template_id}/") + m.put(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().update_acl_template( + template_id=template_id, + template=template_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert request_json["name"] == template_name + + # Verify response data + assert str(response.data.id) == template_id + assert response.data.name == template_name + + +def test_partial_update_acl_template(): + """Test partially updating an ACL template.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + template_name = "Partially Updated Template" + + # Create template data + template_data = {"name": template_name} # Using dict instead of model + + # Expected response data + response_data = { + "id": template_id, + "name": template_name, + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/templates/{template_id}/") + m.patch(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().partial_update_acl_template( + template_id=template_id, + template=template_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PATCH" + + # Verify request body + request_json = m.last_request.json() + assert request_json["name"] == template_name + + # Verify response data + assert str(response.data.id) == template_id + assert response.data.name == template_name + + +def test_delete_acl_template(): + """Test deleting an ACL template.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + template_id = str(uuid.uuid4()) + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/templates/{template_id}/") + m.delete(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().delete_acl_template(template_id=template_id) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "DELETE" + + +def test_check_objects_permission(): + """Test checking permissions for multiple objects.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_key1 = str(uuid.uuid4()) + object_key2 = str(uuid.uuid4()) + + # Create request data + check_data = CheckBulkACLsSchema( + objects=[{ + "object_keys": [object_key1, object_key2], + "object_type": "assets", + "permissions": ["read", "write"], + }]) + + # Expected response data + response_data = { + "access_granted": [object_key1], + "access_denied": [object_key2], + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url("acl/") + m.post(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().check_objects_permission(objects=check_data) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "POST" + + # Verify request body + request_json = m.last_request.json() + assert "objects" in request_json + assert len(request_json["objects"]) == 1 + assert object_key1 in request_json["objects"][0]["object_keys"] + assert object_key2 in request_json["objects"][0]["object_keys"] + + # Verify response data + assert response.data.access_granted == [object_key1] + assert response.data.access_denied == [object_key2] + + +def test_check_object_permission(): + """Test checking permission for a single object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + permission = "read" + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"acl/{object_type}/{object_key}/{permission}/") + m.get(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().check_object_permission( + object_type=object_type, + object_key=object_key, + permission=permission, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "GET" + + +def test_get_combined_permissions(): + """Test getting combined permissions for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Expected response data + response_data = {"permissions": ["read", "write", "delete"]} + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"acl/{object_type}/{object_key}/permissions/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().get_combined_permissions( + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert response.data.permissions == ["read", "write", "delete"] + + +def test_check_objects_have_permission(): + """Test checking if objects have a specific permission.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + permission = "read" + object_key1 = str(uuid.uuid4()) + object_key2 = str(uuid.uuid4()) + + # Create request data + object_keys = {"object_keys": [object_key1, object_key2]} + + # Expected response data + response_data = { + "access_granted": [object_key1], + "access_denied": [object_key2], + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/{permission}/") + m.post(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().check_objects_have_permission( + object_type=object_type, + permission=permission, + object_keys=object_keys, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "POST" + + # Verify request body + request_json = m.last_request.json() + assert "object_keys" in request_json + assert object_key1 in request_json["object_keys"] + assert object_key2 in request_json["object_keys"] + + # Verify response data + assert response.data.access_granted == [object_key1] + assert response.data.access_denied == [object_key2] + + +def test_create_acls(): + """Test creating ACLs for multiple objects.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key1 = str(uuid.uuid4()) + object_key2 = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + + # Create request data with all UUIDs as strings + acls_data = { + "object_keys": [str(object_key1), + str(object_key2)], + "object_type": object_type, + "permissions": ["read", "write"], + "user_ids": [str(user_id)], + "group_ids": [str(group_id)], + "mode": "APPEND", + } + + # Expected response data + response_data = { + "updated_object_keys": [str(object_key1), + str(object_key2)] + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/") + m.put(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().create_acls( + object_type=object_type, + acls=acls_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert str(object_key1) in request_json["object_keys"] + assert str(object_key2) in request_json["object_keys"] + assert request_json["permissions"] == ["read", "write"] + assert str(user_id) in request_json["user_ids"] + assert str(group_id) in request_json["group_ids"] + assert request_json["mode"] == "APPEND" + + # Verify response data + assert response.data.updated_object_keys == [ + str(object_key1), + str(object_key2), + ] + + +def test_create_bulk_acls(): + """Test creating ACLs for multiple objects with multiple permissions.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key1 = str(uuid.uuid4()) + object_key2 = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + + # Create request data + acls_data = { + "objects": [ + { + "object_keys": [str(object_key1)], + "permissions": ["read", "write"], + "user_ids": [str(user_id)], + "mode": "APPEND", + }, + { + "object_keys": [str(object_key2)], + "permissions": ["read"], + "user_ids": [str(user_id)], + "mode": "OVERWRITE", + }, + ] + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/bulk/") + m.put(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().create_bulk_acls( + object_type=object_type, + acls=acls_data, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert "objects" in request_json + assert len(request_json["objects"]) == 2 + assert str(object_key1) in request_json["objects"][0]["object_keys"] + assert request_json["objects"][0]["permissions"] == ["read", "write"] + assert str(object_key2) in request_json["objects"][1]["object_keys"] + assert request_json["objects"][1]["permissions"] == ["read"] + + +def test_create_acls_for_content(): + """Test creating ACLs for content of multiple objects.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "collections" + object_id1 = str(uuid.uuid4()) + object_id2 = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + + # Create request data + acls_data = { + "object_ids": [str(object_id1), str(object_id2)], + "permissions": ["read", "write"], + "user_ids": [str(user_id)], + "group_ids": [str(group_id)], + "include_assets": True, + "include_collections": True, + "mode": "APPEND", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/content/") + m.put(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().create_acls_for_content( + object_type=object_type, + acls=acls_data, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert str(object_id1) in request_json["object_ids"] + assert str(object_id2) in request_json["object_ids"] + assert request_json["permissions"] == ["read", "write"] + assert str(user_id) in request_json["user_ids"] + assert str(group_id) in request_json["group_ids"] + assert request_json["include_assets"] is True + assert request_json["include_collections"] is True + assert request_json["mode"] == "APPEND" + + +def test_delete_acls(): + """Test deleting ACLs for multiple objects.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key1 = str(uuid.uuid4()) + object_key2 = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + + # Create request data + acls_data = { + "object_keys": [str(object_key1), + str(object_key2)], + "object_type": object_type, + "user_ids": [str(user_id)], + "group_ids": [str(group_id)], + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/") + m.delete(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().delete_acls( + object_type=object_type, + acls=acls_data, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "DELETE" + + # Verify request body + request_json = m.last_request.json() + assert str(object_key1) in request_json["object_keys"] + assert str(object_key2) in request_json["object_keys"] + assert str(user_id) in request_json["user_ids"] + assert str(group_id) in request_json["group_ids"] + + +def test_delete_acls_for_content(): + """Test deleting ACLs for content of multiple objects.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "collections" + object_id1 = str(uuid.uuid4()) + object_id2 = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + + # Create request data + acls_data = { + "object_ids": [str(object_id1), str(object_id2)], + "user_ids": [str(user_id)], + "group_ids": [str(group_id)], + "include_assets": True, + "include_collections": True, + "object_type": object_type, + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"acl/{object_type}/content/") + m.delete(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().delete_acls_for_content( + object_type=object_type, + acls=acls_data, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "DELETE" + + # Verify request body + request_json = m.last_request.json() + assert str(object_id1) in request_json["object_ids"] + assert str(object_id2) in request_json["object_ids"] + assert str(user_id) in request_json["user_ids"] + assert str(group_id) in request_json["group_ids"] + assert request_json["include_assets"] is True + assert request_json["include_collections"] is True + + +def test_get_group_acl(): + """Test getting group ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Expected response data + response_data = { + "group_id": group_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read", "write"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().get_group_acl( + group_id=group_id, + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert str(response.data.group_id) == group_id + assert response.data.object_key == object_key + assert response.data.object_type == object_type + assert response.data.permissions == ["read", "write"] + + +def test_check_group_permission(): + """Test checking group permission for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + permission = "read" + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/{permission}/") + m.get(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().check_group_permission( + group_id=group_id, + object_type=object_type, + object_key=object_key, + permission=permission, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "GET" + + +def test_delete_group_acl(): + """Test deleting group ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + group_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"groups/{group_id}/acl/{object_type}/{object_key}/") + m.delete(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().delete_group_acl( + group_id=group_id, + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "DELETE" + + +def test_get_user_acl(): + """Test getting user ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Expected response data + response_data = { + "user_id": user_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read", "write"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"users/{user_id}/acl/{object_type}/{object_key}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().get_user_acl( + user_id=user_id, + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert str(response.data.user_id) == user_id + assert response.data.object_key == object_key + assert response.data.object_type == object_type + assert response.data.permissions == ["read", "write"] + + +def test_update_user_acl(): + """Test updating user ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Create ACL data + acl_data = {"permissions": ["read", "write"]} + + # Expected response data + response_data = { + "user_id": user_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read", "write"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"users/{user_id}/acl/{object_type}/{object_key}/") + m.put(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().update_user_acl( + user_id=user_id, + object_type=object_type, + object_key=object_key, + acl=acl_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert request_json["permissions"] == ["read", "write"] + + # Verify response data + assert str(response.data.user_id) == user_id + assert response.data.object_key == object_key + assert response.data.object_type == object_type + assert response.data.permissions == ["read", "write"] + + +def test_delete_user_acl(): + """Test deleting user ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"users/{user_id}/acl/{object_type}/{object_key}/") + m.delete(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().delete_user_acl( + user_id=user_id, + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "DELETE" + + +def test_check_user_permission(): + """Test checking user permission for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + user_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + permission = "read" + + # Expected response data + response_data = { + "user_id": user_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read", "write"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"users/{user_id}/acl/{object_type}/{object_key}/{permission}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().check_user_permission( + user_id=user_id, + object_type=object_type, + object_key=object_key, + permission=permission, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert str(response.data.user_id) == user_id + assert response.data.permissions == ["read", "write"] + + +def test_fetch_share_acls(): + """Test fetching share ACLs for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + + # Expected response data + response_data = { + "objects": [{ + "share_id": share_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + }], + "total": + 1, + "page": + 1, + "pages": + 1, + "per_page": + 10, + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url(f"shares/{object_type}/{object_key}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().list_share_acls( + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert len(response.data.objects) == 1 + assert str(response.data.objects[0].share_id) == share_id + assert response.data.objects[0].object_key == object_key + assert response.data.objects[0].object_type == object_type + assert response.data.objects[0].permissions == ["read"] + + +def test_create_share_acls(): + """Test creating share ACLs for multiple objects.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + object_type = "assets" + object_key1 = str(uuid.uuid4()) + object_key2 = str(uuid.uuid4()) + + # Create ACLs data + acls_data = { + "object_keys": [str(object_key1), + str(object_key2)], + "object_type": object_type, + "permissions": ["read"], + "share_id": str(share_id), + } + + # Expected response data + response_data = { + "updated_object_keys": [str(object_key1), + str(object_key2)] + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"shares/{share_id}/acl/{object_type}/") + m.put(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().create_share_acls( + share_id=share_id, + object_type=object_type, + acls=acls_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert str(object_key1) in request_json["object_keys"] + assert str(object_key2) in request_json["object_keys"] + assert request_json["permissions"] == ["read"] + assert request_json["object_type"] == object_type + + # Verify response data + assert response.data.updated_object_keys == [ + str(object_key1), + str(object_key2), + ] + + +def test_get_share_acl(): + """Test getting share ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Expected response data + response_data = { + "share_id": share_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"shares/{share_id}/acl/{object_type}/{object_key}/") + m.get(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().get_share_acl( + share_id=share_id, + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "GET" + + # Verify response data + assert str(response.data.share_id) == share_id + assert response.data.object_key == object_key + assert response.data.object_type == object_type + assert response.data.permissions == ["read"] + + +def test_create_share_acl(): + """Test creating share ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Create ACL data + acl_data = {"permissions": ["read"]} + + # Expected response data + response_data = { + "share_id": share_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"shares/{share_id}/acl/{object_type}/{object_key}/") + m.post(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().create_share_acl( + share_id=share_id, + object_type=object_type, + object_key=object_key, + acl=acl_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "POST" + + # Verify request body + request_json = m.last_request.json() + assert request_json["permissions"] == ["read"] + + # Verify response data + assert str(response.data.share_id) == share_id + assert response.data.object_key == object_key + assert response.data.object_type == object_type + assert response.data.permissions == ["read"] + + +def test_update_share_acl(): + """Test updating share ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Create ACL data + acl_data = {"permissions": ["read", "write"]} + + # Expected response data + response_data = { + "share_id": share_id, + "object_key": object_key, + "object_type": object_type, + "permissions": ["read", "write"], + "date_created": "2025-05-20T09:50:09Z", + "date_modified": "2025-05-20T09:50:09Z", + } + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"shares/{share_id}/acl/{object_type}/{object_key}/") + m.put(mock_address, json=response_data) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().update_share_acl( + share_id=share_id, + object_type=object_type, + object_key=object_key, + acl=acl_data, + ) + + # Verify the response and request + assert response.response.ok + assert m.called + assert m.last_request.method == "PUT" + + # Verify request body + request_json = m.last_request.json() + assert request_json["permissions"] == ["read", "write"] + + # Verify response data + assert str(response.data.share_id) == share_id + assert response.data.object_key == object_key + assert response.data.object_type == object_type + assert response.data.permissions == ["read", "write"] + + +def test_delete_share_acl(): + """Test deleting share ACL for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"shares/{share_id}/acl/{object_type}/{object_key}/") + m.delete(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().delete_share_acl( + share_id=share_id, + object_type=object_type, + object_key=object_key, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "DELETE" + + +def test_check_share_permission(): + """Test checking share permission for an object.""" + with requests_mock.Mocker() as m: + app_id = str(uuid.uuid4()) + auth_token = str(uuid.uuid4()) + share_id = str(uuid.uuid4()) + object_type = "assets" + object_key = str(uuid.uuid4()) + permission = "read" + + # Mock the API endpoint + mock_address = AclsSpec.gen_url( + f"shares/{share_id}/acl/{object_type}/{object_key}/{permission}/") + m.get(mock_address, status_code=204) + + # Create client and make request + client = PythonikClient(app_id=app_id, + auth_token=auth_token, + timeout=3) + response = client.acls().check_share_permission( + share_id=share_id, + object_type=object_type, + object_key=object_key, + permission=permission, + ) + + # Verify the response and request + assert response.response.status_code == 204 + assert m.called + assert m.last_request.method == "GET" diff --git a/pythonik/tests/test_internal_utils.py b/pythonik/tests/test_internal_utils.py new file mode 100644 index 0000000..314c72d --- /dev/null +++ b/pythonik/tests/test_internal_utils.py @@ -0,0 +1,96 @@ +# pythonik/tests/test_internal_utils.py +import uuid + +from pydantic import BaseModel + +from pythonik.exceptions import PythonikException +from pythonik.specs._internal_utils import is_pydantic_model + + +class PydanticV1StyleModel: + """Mock class that mimics a Pydantic v1 model.""" + + __fields__ = {"test": "field"} + + def dict(self): + return {"test": "value"} + + +class PydanticV2StyleModel: + """Mock class that mimics a Pydantic v2 model.""" + + model_fields = {"test": "field"} + + def model_dump(self): + return {"test": "value"} + + +class RealPydanticModel(BaseModel): + """A real Pydantic model for testing.""" + + id: str + name: str + + +class NonPydanticClass: + """A regular class that is not a Pydantic model.""" + + def __init__(self): + self.value = "test" + + +def test_is_pydantic_model_with_real_model(): + """Test is_pydantic_model with a real Pydantic model.""" + model = RealPydanticModel(id=str(uuid.uuid4()), name="Test Model") + assert is_pydantic_model(model) is True + + +def test_is_pydantic_model_with_none(): + """Test is_pydantic_model with None.""" + assert is_pydantic_model(None) is False + + +def test_is_pydantic_model_with_non_model(): + """Test is_pydantic_model with a non-model class instance.""" + non_model = NonPydanticClass() + assert is_pydantic_model(non_model) is False + + +def test_is_pydantic_model_with_v1_style_mock(): + """Test is_pydantic_model with a class that looks like a Pydantic v1 model.""" + v1_model = PydanticV1StyleModel() + assert is_pydantic_model(v1_model) is True + + +def test_is_pydantic_model_with_v2_style_mock(): + """Test is_pydantic_model with a class that looks like a Pydantic v2 model.""" + v2_model = PydanticV2StyleModel() + assert is_pydantic_model(v2_model) is True + + +def test_is_pydantic_model_with_dict(): + """Test is_pydantic_model with a dictionary.""" + dict_data = {"id": str(uuid.uuid4()), "name": "Test Dict"} + assert is_pydantic_model(dict_data) is False + + +def test_is_pydantic_model_with_exception(): + """Test is_pydantic_model when an exception is raised.""" + + class ExceptionRaisingModel: + """A model that raises an exception when properties are accessed.""" + + @property + def dict(self): + raise PythonikException("Test exception") + + @property + def model_dump(self): + raise PythonikException("Test exception") + + @property + def __fields__(self): + raise PythonikException("Test exception") + + model = ExceptionRaisingModel() + assert is_pydantic_model(model) is False