diff --git a/netbox_branching/signal_receivers.py b/netbox_branching/signal_receivers.py index 23621ec..93fb048 100644 --- a/netbox_branching/signal_receivers.py +++ b/netbox_branching/signal_receivers.py @@ -1,16 +1,19 @@ import logging from functools import partial +from django.contrib.contenttypes.models import ContentType from django.db import DEFAULT_DB_ALIAS from django.db.models.signals import post_migrate, post_save, pre_delete from django.dispatch import receiver from django.utils import timezone from django.utils.translation import gettext_lazy as _ +from django.core.exceptions import ValidationError from core.choices import ObjectChangeActionChoices from core.models import ObjectChange, ObjectType from extras.events import process_event_rules from extras.models import EventRule +from netbox.signals import post_clean from netbox_branching import signals from utilities.exceptions import AbortRequest from utilities.serialization import serialize_object @@ -25,9 +28,78 @@ 'handle_branch_event', 'record_change_diff', 'validate_branch_deletion', + 'validate_branching_operations', + 'validate_object_deletion_in_branch', ) +def check_object_accessible_in_branch(branch, model, object_id): + """ + Check if an object is accessible for operations in a branch. + + An object is accessible if it either exists in main or was created in the branch. + This prevents operations on objects that were deleted in main. + + Args: + branch: The Branch instance + model: The model class + object_id: The primary key of the object + + Returns: + True if the object is accessible (exists in main or was created in branch), False otherwise + """ + with deactivate_branch(): + try: + model.objects.get(pk=object_id) + return True + except model.DoesNotExist: + pass + + # Object doesn't exist in main - check if it was created in the branch + content_type = ContentType.objects.get_for_model(model) + return ChangeDiff.objects.filter( + branch=branch, + object_type=content_type, + object_id=object_id, + action=ObjectChangeActionChoices.ACTION_CREATE + ).exists() + + +@receiver(post_clean) +def validate_branching_operations(sender, instance, **kwargs): + """ + Validate that branching operations are valid (e.g., not modifying deleted objects). + """ + branch = active_branch.get() + + # Only validate if we're in a branch and this model supports branching + if branch is None: + return + + # Check if this model supports branching + try: + object_type = ObjectType.objects.get_for_model(instance.__class__) + if 'branching' not in object_type.features: + return + except ObjectType.DoesNotExist: + return + + # For updates, check if the object exists in main or was created in the branch + if hasattr(instance, 'pk') and instance.pk is not None: + model = instance.__class__ + if not check_object_accessible_in_branch(branch, model, instance.pk): + # Object was deleted in main, not created in branch + raise ValidationError( + _( + "Cannot modify {model_name} '{object_name}' because it has been deleted in the main branch. " + "Sync with the main branch to update." + ).format( + model_name=model._meta.verbose_name, + object_name=str(instance) + ) + ) + + @receiver(post_save, sender=ObjectChange) def record_change_diff(instance, **kwargs): """ @@ -79,12 +151,31 @@ def record_change_diff(instance, **kwargs): current_data = None else: model = instance.changed_object_type.model_class() + if not check_object_accessible_in_branch(branch, model, instance.changed_object_id): + # Object was deleted in main, not created in branch + raise AbortRequest( + _( + "Cannot {action} {model_name} '{object_name}' because it has been deleted " + "in the main branch. Sync with the main branch to update." + ).format( + action=instance.action.lower(), + model_name=model._meta.verbose_name, + object_name=str(instance.changed_object) + ) + ) + + # Check if object exists in main to determine if we need to get current_data with deactivate_branch(): - obj = model.objects.get(pk=instance.changed_object_id) - if hasattr(obj, 'serialize_object'): - current_data = obj.serialize_object(exclude=['created', 'last_updated']) - else: - current_data = serialize_object(obj, exclude=['created', 'last_updated']) + try: + obj = model.objects.get(pk=instance.changed_object_id) + # Object exists in main, get its current state + if hasattr(obj, 'serialize_object'): + current_data = obj.serialize_object(exclude=['created', 'last_updated']) + else: + current_data = serialize_object(obj, exclude=['created', 'last_updated']) + except model.DoesNotExist: + # Object was created in branch, so there's no current state in main + current_data = None diff = ChangeDiff( branch=branch, object=instance.changed_object, @@ -136,6 +227,44 @@ def handle_branch_event(event_type, branch, user=None, **kwargs): signals.post_revert.connect(partial(handle_branch_event, event_type=BRANCH_REVERTED)) +@receiver(pre_delete) +def validate_object_deletion_in_branch(sender, instance, **kwargs): + """ + Validate that objects being deleted in a branch still exist in main. + """ + # Skip Branch objects - they have their own validation + if sender == Branch: + return + + # Only validate if we're in a branch + branch = active_branch.get() + if branch is None: + return + + # Check if this model supports branching + try: + object_type = ObjectType.objects.get_for_model(instance.__class__) + if 'branching' not in object_type.features: + return + except ObjectType.DoesNotExist: + return + + # For deletions, check if the object exists in main or was created in the branch + if hasattr(instance, 'pk') and instance.pk is not None: + model = instance.__class__ + if not check_object_accessible_in_branch(branch, model, instance.pk): + # Object was deleted in main, not created in branch + raise AbortRequest( + _( + "Cannot delete {model_name} '{object_name}' because it has been deleted in the main branch. " + "Sync with the main branch to update." + ).format( + model_name=model._meta.verbose_name, + object_name=str(instance) + ) + ) + + @receiver(pre_delete, sender=Branch) def validate_branch_deletion(sender, instance, **kwargs): """