diff --git a/local_units/permissions.py b/local_units/permissions.py index 0e5ff775a..1fb27e4d3 100644 --- a/local_units/permissions.py +++ b/local_units/permissions.py @@ -1,3 +1,4 @@ +from django.contrib.auth.models import Permission from rest_framework import permissions from api.models import Country @@ -24,7 +25,7 @@ def has_object_permission(self, request, view, obj): class IsAuthenticatedForLocalUnit(permissions.BasePermission): - message = "Only validators or superusers are allowed to update Local Units" + message = "Only Country Admins, Local Unit Validators, Region Admins, or Superusers are allowed to update Local Units." def has_object_permission(self, request, view, obj): if request.method not in ["PUT", "PATCH"]: @@ -35,6 +36,28 @@ def has_object_permission(self, request, view, obj): if user.is_superuser: return True + country_id = obj.country_id + country = Country.objects.get(id=country_id) + region_id = country.region_id + # Country admin specific permissions + country_admin_ids = [ + int(codename.replace("country_admin_", "")) + for codename in Permission.objects.filter( + group__user=user, + codename__startswith="country_admin_", + ).values_list("codename", flat=True) + ] + # Regional admin specific permissions + region_admin_ids = [ + int(codename.replace("region_admin_", "")) + for codename in Permission.objects.filter( + group__user=user, + codename__startswith="region_admin_", + ).values_list("codename", flat=True) + ] + if country_id in country_admin_ids or region_id in region_admin_ids: + return True + return ( get_local_unit_country_validators(obj).filter(id=user.id).exists() or get_local_unit_region_validators(obj).filter(id=user.id).exists() diff --git a/local_units/test_views.py b/local_units/test_views.py index 9d09c82ed..768b71013 100644 --- a/local_units/test_views.py +++ b/local_units/test_views.py @@ -428,21 +428,28 @@ def setUp(self): ) self.local_unit_type = LocalUnitType.objects.create(code=1, name="administrative") - + management.call_command("make_permissions") management.call_command("make_local_unit_validator_permissions") - + self.country_admin_user = UserFactory.create() self.country_validator_user = UserFactory.create() self.region_validator_user = UserFactory.create() self.global_validator_user = UserFactory.create() # permissions + country_admin_codename = f"country_admin_{self.country.id}" country_codename = f"local_unit_country_validator_{self.local_unit_type.id}_{self.country.id}" region_codename = f"local_unit_region_validator_{self.local_unit_type.id}_{self.region.id}" global_codename = f"local_unit_global_validator_{self.local_unit_type.id}" + country_admin_permission = Permission.objects.get(codename=country_admin_codename) country_permission = Permission.objects.get(codename=country_codename) region_permission = Permission.objects.get(codename=region_codename) global_permission = Permission.objects.get(codename=global_codename) + # Country admin group + country__admin_group_name = "%s Admins" % self.country.name + country__admin_group = Group.objects.get(name=country__admin_group_name) + country__admin_group.permissions.add(country_admin_permission) + self.country_admin_user.groups.add(country__admin_group) # Country validator group country_group_name = f"Local unit validator for {self.local_unit_type.name} {self.country.name}" country_group = Group.objects.get(name=country_group_name) @@ -875,6 +882,93 @@ def test_create_local_unit_with_externally_managed_country_and_type(self): self.assertEqual(response.status_code, 400) self.assertEqual(LocalUnitChangeRequest.objects.count(), 0) + def test_country_region_admin_permission_for_local_unit_update(self): + region1 = RegionFactory.create(name=2, label="Asia Pacific") + + region2 = RegionFactory.create(name=0, label="Africa") + + country = CountryFactory.create( + name="India", + iso3="IND", + record_type=CountryType.COUNTRY, + is_deprecated=False, + independent=True, + region=region1, + ) + self.asia_admin = UserFactory.create(email="asia@admin.com") + self.africa_admin = UserFactory.create(email="africa@admin.com") + self.india_admin = UserFactory.create(email="india@admin.com") + # India admin setup + management.call_command("make_permissions") + country_admin_codename = f"country_admin_{country.id}" + country_admin_permission = Permission.objects.get(codename=country_admin_codename) + country_admin_group_name = "%s Admins" % country.name + country_admin_group = Group.objects.get(name=country_admin_group_name) + country_admin_group.permissions.add(country_admin_permission) + self.india_admin.groups.add(country_admin_group) + # Asia admin + asia_admin_codename = f"region_admin_{region1.id}" + asia_admin_permission = Permission.objects.get(codename=asia_admin_codename) + asia_admin_group_name = "%s Regional Admins" % region1.name + asia_admin_group = Group.objects.get(name=asia_admin_group_name) + asia_admin_group.permissions.add(asia_admin_permission) + self.asia_admin.groups.add(asia_admin_group) + + # Africa admin + africa_admin_codename = f"region_admin_{region2.id}" + africa_admin_permission = Permission.objects.get(codename=africa_admin_codename) + africa_admin_group_name = "%s Regional Admins" % region2.name + africa_admin_group = Group.objects.get(name=africa_admin_group_name) + africa_admin_group.permissions.add(africa_admin_permission) + self.africa_admin.groups.add(africa_admin_group) + + local_unit = LocalUnitFactory.create( + country=country, + type=self.local_unit_type, + draft=False, + status=LocalUnit.Status.VALIDATED, + date_of_data="2023-08-08", + ) + local_unit2 = LocalUnitFactory.create( + country=country, + type=self.local_unit_type, + draft=False, + status=LocalUnit.Status.VALIDATED, + date_of_data="2023-08-08", + ) + url = f"/api/v2/local-units/{local_unit.id}/" + data = { + "local_branch_name": "Updated local branch name", + "update_reason_overview": "Needed update for testing", + "type": self.local_unit_type.id, + "location_json": { + "lat": 20.5937, + "lng": 78.9629, + }, + } + response = self.client.patch(url, data=data, format="json") + self.assert_401(response) + # Test: different country admin + self.authenticate(self.country_admin_user) + response = self.client.patch(url, data=data, format="json") + self.assert_403(response) + # Test: actual country admin + self.authenticate(self.india_admin) + response = self.client.patch(url, data=data, format="json") + self.assert_200(response) + self.assertEqual(response.data["local_branch_name"], "Updated local branch name") + + url = f"/api/v2/local-units/{local_unit2.id}/" + # Test: different region admin + self.authenticate(self.africa_admin) + response = self.client.patch(url, data=data, format="json") + self.assert_403(response) + # Test: same region admin + self.authenticate(self.asia_admin) + response = self.client.patch(url, data=data, format="json") + self.assert_200(response) + self.assertEqual(response.data["local_branch_name"], "Updated local branch name") + class TestExternallyManagedLocalUnit(APITestCase): def setUp(self):