diff --git a/Makefile b/Makefile index b45c181..496b47c 100644 --- a/Makefile +++ b/Makefile @@ -23,18 +23,24 @@ docker-compose-netbox-plugin-down: .PHONY: docker-compose-netbox-plugin-test docker-compose-netbox-plugin-test: - -@$(DOCKER_COMPOSE) -f $(DOCKER_PATH)/docker-compose.yaml -f $(DOCKER_PATH)/docker-compose.test.yaml run -u root --rm netbox ./manage.py test $(TEST_FLAGS) --keepdb $(TEST_SELECTOR) - @$(MAKE) docker-compose-netbox-plugin-down + @$(DOCKER_COMPOSE) -f $(DOCKER_PATH)/docker-compose.yaml -f $(DOCKER_PATH)/docker-compose.test.yaml run -u root --rm netbox ./manage.py test $(TEST_FLAGS) --keepdb $(TEST_SELECTOR); \ + EXIT_CODE=$$?; \ + $(MAKE) docker-compose-netbox-plugin-down; \ + exit $$EXIT_CODE .PHONY: docker-compose-netbox-plugin-test-lint docker-compose-netbox-plugin-test-lint: - -@$(DOCKER_COMPOSE) -f $(DOCKER_PATH)/docker-compose.yaml -f $(DOCKER_PATH)/docker-compose.test.yaml run -u root --rm netbox ruff check --output-format=github netbox_diode_plugin - @$(MAKE) docker-compose-netbox-plugin-down + @$(DOCKER_COMPOSE) -f $(DOCKER_PATH)/docker-compose.yaml -f $(DOCKER_PATH)/docker-compose.test.yaml run -u root --rm netbox ruff check --output-format=github netbox_diode_plugin; \ + EXIT_CODE=$$?; \ + $(MAKE) docker-compose-netbox-plugin-down; \ + exit $$EXIT_CODE .PHONY: docker-compose-netbox-plugin-test-cover docker-compose-netbox-plugin-test-cover: - -@$(DOCKER_COMPOSE) -f $(DOCKER_PATH)/docker-compose.yaml -f $(DOCKER_PATH)/docker-compose.test.yaml run --rm -u root -e COVERAGE_FILE=/opt/netbox/netbox/coverage/.coverage netbox sh -c "coverage run --source=netbox_diode_plugin --omit=*/migrations/* ./manage.py test --keepdb $(TEST_SELECTOR) && coverage xml -o /opt/netbox/netbox/coverage/report.xml && coverage report -m | tee /opt/netbox/netbox/coverage/report.txt" - @$(MAKE) docker-compose-netbox-plugin-down + @$(DOCKER_COMPOSE) -f $(DOCKER_PATH)/docker-compose.yaml -f $(DOCKER_PATH)/docker-compose.test.yaml run --rm -u root -e COVERAGE_FILE=/opt/netbox/netbox/coverage/.coverage netbox sh -c "coverage run --source=netbox_diode_plugin --omit=*/migrations/* ./manage.py test --keepdb $(TEST_SELECTOR) && coverage xml -o /opt/netbox/netbox/coverage/report.xml && coverage report -m | tee /opt/netbox/netbox/coverage/report.txt"; \ + EXIT_CODE=$$?; \ + $(MAKE) docker-compose-netbox-plugin-down; \ + exit $$EXIT_CODE .PHONY: docker-compose-generate-matching-docs docker-compose-generate-matching-docs: diff --git a/docker/Dockerfile-diode-netbox-plugin b/docker/Dockerfile similarity index 100% rename from docker/Dockerfile-diode-netbox-plugin rename to docker/Dockerfile diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index af8e518..f844987 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -4,7 +4,7 @@ services: image: netboxcommunity/netbox:v4.4.2-3.4.1-diode-netbox-plugin build: context: . - dockerfile: Dockerfile-diode-netbox-plugin + dockerfile: Dockerfile pull: true depends_on: - netbox-postgres diff --git a/docker/netbox/configuration/configuration.py b/docker/netbox/configuration/configuration.py index d459441..c4270be 100644 --- a/docker/netbox/configuration/configuration.py +++ b/docker/netbox/configuration/configuration.py @@ -1,7 +1,7 @@ #### -## We recommend to not edit this file. -## Create separate files to overwrite the settings. -## See `extra.py` as an example. +# We recommend to not edit this file. +# Create separate files to overwrite the settings. +# See `extra.py` as an example. #### import re @@ -16,10 +16,11 @@ # NetBox-Docker Helper functions ### + # Read secret from file def _read_secret(secret_name: str, default: str | None = None) -> str | None: try: - f = open('/run/secrets/' + secret_name, encoding='utf-8') + f = open("/run/secrets/" + secret_name, encoding="utf-8") except OSError: return default else: @@ -31,8 +32,11 @@ def _read_secret(secret_name: str, default: str | None = None) -> str | None: # If the `map_fn` is defined, then `map_fn` is invoked and the value (that was read from the environment or the default value if not found) # is passed to it as a parameter. The value returned from `map_fn` is then the return value of this function. # The `map_fn` is not invoked, if the value (that was read from the environment or the default value if not found) is None. -def _environ_get_and_map(variable_name: str, default: str | None = None, - map_fn: Callable[[str], Any | None] = None) -> Any | None: +def _environ_get_and_map( + variable_name: str, + default: str | None = None, + map_fn: Callable[[str], Any | None] = None, +) -> Any | None: env_value = environ.get(variable_name, default) if env_value is None: @@ -44,12 +48,9 @@ def _environ_get_and_map(variable_name: str, default: str | None = None, return map_fn(env_value) -def _AS_BOOL(value): - return value.lower() == 'true' -def _AS_INT(value): - return int(value) -def _AS_LIST(value): - return list(filter(None, value.split(' '))) +_AS_BOOL = lambda value: value.lower() == "true" +_AS_INT = lambda value: int(value) +_AS_LIST = lambda value: list(filter(None, value.split(" "))) _BASE_DIR = dirname(dirname(abspath(__file__))) @@ -63,25 +64,27 @@ def _AS_LIST(value): # access to the server via any other hostnames. The first FQDN in the list will be treated as the preferred name. # # Example: ALLOWED_HOSTS = ['netbox.example.com', 'netbox.internal.local'] -ALLOWED_HOSTS = environ.get('ALLOWED_HOSTS', '*').split(' ') +ALLOWED_HOSTS = environ.get("ALLOWED_HOSTS", "*").split(" ") # ensure that '*' or 'localhost' is always in ALLOWED_HOSTS (needed for health checks) -if '*' not in ALLOWED_HOSTS and 'localhost' not in ALLOWED_HOSTS: - ALLOWED_HOSTS.append('localhost') +if "*" not in ALLOWED_HOSTS and "localhost" not in ALLOWED_HOSTS: + ALLOWED_HOSTS.append("localhost") # PostgreSQL database configuration. See the Django documentation for a complete list of available parameters: # https://docs.djangoproject.com/en/stable/ref/settings/#databases DATABASE = { - 'NAME': environ.get('DB_NAME', 'netbox'), # Database name - 'USER': environ.get('DB_USER', ''), # PostgreSQL username - 'PASSWORD': _read_secret('db_password', environ.get('DB_PASSWORD', '')), + "NAME": environ.get("DB_NAME", "netbox"), # Database name + "USER": environ.get("DB_USER", ""), # PostgreSQL username + "PASSWORD": _read_secret("db_password", environ.get("DB_PASSWORD", "")), # PostgreSQL password - 'HOST': environ.get('DB_HOST', 'localhost'), # Database server - 'PORT': environ.get('DB_PORT', ''), # Database port (leave blank for default) - 'OPTIONS': {'sslmode': environ.get('DB_SSLMODE', 'prefer')}, + "HOST": environ.get("DB_HOST", "localhost"), # Database server + "PORT": environ.get("DB_PORT", ""), # Database port (leave blank for default) + "OPTIONS": {"sslmode": environ.get("DB_SSLMODE", "prefer")}, # Database connection SSLMODE - 'CONN_MAX_AGE': _environ_get_and_map('DB_CONN_MAX_AGE', '300', _AS_INT), + "CONN_MAX_AGE": _environ_get_and_map("DB_CONN_MAX_AGE", "300", _AS_INT), # Max database connection age - 'DISABLE_SERVER_SIDE_CURSORS': _environ_get_and_map('DB_DISABLE_SERVER_SIDE_CURSORS', 'False', _AS_BOOL), + "DISABLE_SERVER_SIDE_CURSORS": _environ_get_and_map( + "DB_DISABLE_SERVER_SIDE_CURSORS", "False", _AS_BOOL + ), # Disable the use of server-side cursors transaction pooling } @@ -89,26 +92,38 @@ def _AS_LIST(value): # configuration exists for each. Full connection details are required in both sections, and it is strongly recommended # to use two separate database IDs. REDIS = { - 'tasks': { - 'HOST': environ.get('REDIS_HOST', 'localhost'), - 'PORT': _environ_get_and_map('REDIS_PORT', 6379, _AS_INT), - 'USERNAME': environ.get('REDIS_USERNAME', ''), - 'PASSWORD': _read_secret('redis_password', environ.get('REDIS_PASSWORD', '')), - 'DATABASE': _environ_get_and_map('REDIS_DATABASE', 0, _AS_INT), - 'SSL': _environ_get_and_map('REDIS_SSL', 'False', _AS_BOOL), - 'INSECURE_SKIP_TLS_VERIFY': _environ_get_and_map('REDIS_INSECURE_SKIP_TLS_VERIFY', 'False', _AS_BOOL), + "tasks": { + "HOST": environ.get("REDIS_HOST", "localhost"), + "PORT": _environ_get_and_map("REDIS_PORT", 6379, _AS_INT), + "USERNAME": environ.get("REDIS_USERNAME", ""), + "PASSWORD": _read_secret("redis_password", environ.get("REDIS_PASSWORD", "")), + "DATABASE": _environ_get_and_map("REDIS_DATABASE", 0, _AS_INT), + "SSL": _environ_get_and_map("REDIS_SSL", "False", _AS_BOOL), + "INSECURE_SKIP_TLS_VERIFY": _environ_get_and_map( + "REDIS_INSECURE_SKIP_TLS_VERIFY", "False", _AS_BOOL + ), }, - 'caching': { - 'HOST': environ.get('REDIS_CACHE_HOST', environ.get('REDIS_HOST', 'localhost')), - 'PORT': _environ_get_and_map('REDIS_CACHE_PORT', environ.get('REDIS_PORT', '6379'), _AS_INT), - 'USERNAME': environ.get('REDIS_CACHE_USERNAME', environ.get('REDIS_USERNAME', '')), - 'PASSWORD': _read_secret('redis_cache_password', - environ.get('REDIS_CACHE_PASSWORD', environ.get('REDIS_PASSWORD', ''))), - 'DATABASE': _environ_get_and_map('REDIS_CACHE_DATABASE', '1', _AS_INT), - 'SSL': _environ_get_and_map('REDIS_CACHE_SSL', environ.get('REDIS_SSL', 'False'), _AS_BOOL), - 'INSECURE_SKIP_TLS_VERIFY': _environ_get_and_map('REDIS_CACHE_INSECURE_SKIP_TLS_VERIFY', - environ.get('REDIS_INSECURE_SKIP_TLS_VERIFY', 'False'), - _AS_BOOL), + "caching": { + "HOST": environ.get("REDIS_CACHE_HOST", environ.get("REDIS_HOST", "localhost")), + "PORT": _environ_get_and_map( + "REDIS_CACHE_PORT", environ.get("REDIS_PORT", "6379"), _AS_INT + ), + "USERNAME": environ.get( + "REDIS_CACHE_USERNAME", environ.get("REDIS_USERNAME", "") + ), + "PASSWORD": _read_secret( + "redis_cache_password", + environ.get("REDIS_CACHE_PASSWORD", environ.get("REDIS_PASSWORD", "")), + ), + "DATABASE": _environ_get_and_map("REDIS_CACHE_DATABASE", "1", _AS_INT), + "SSL": _environ_get_and_map( + "REDIS_CACHE_SSL", environ.get("REDIS_SSL", "False"), _AS_BOOL + ), + "INSECURE_SKIP_TLS_VERIFY": _environ_get_and_map( + "REDIS_CACHE_INSECURE_SKIP_TLS_VERIFY", + environ.get("REDIS_INSECURE_SKIP_TLS_VERIFY", "False"), + _AS_BOOL, + ), }, } @@ -116,7 +131,7 @@ def _AS_LIST(value): # For optimal security, SECRET_KEY should be at least 50 characters in length and contain a mix of letters, numbers, and # symbols. NetBox will not run without this defined. For more information, see # https://docs.djangoproject.com/en/stable/ref/settings/#std:setting-SECRET_KEY -SECRET_KEY = _read_secret('secret_key', environ.get('SECRET_KEY', '')) +SECRET_KEY = _read_secret("secret_key", environ.get("SECRET_KEY", "")) ######################### # # @@ -130,70 +145,77 @@ def _AS_LIST(value): # # ['John Doe', 'jdoe@example.com'], # ] -if 'ALLOWED_URL_SCHEMES' in environ: - ALLOWED_URL_SCHEMES = _environ_get_and_map('ALLOWED_URL_SCHEMES', None, _AS_LIST) +if "ALLOWED_URL_SCHEMES" in environ: + ALLOWED_URL_SCHEMES = _environ_get_and_map("ALLOWED_URL_SCHEMES", None, _AS_LIST) # Optionally display a persistent banner at the top and/or bottom of every page. HTML is allowed. To display the same # content in both banners, define BANNER_TOP and set BANNER_BOTTOM = BANNER_TOP. -if 'BANNER_TOP' in environ: - BANNER_TOP = environ.get('BANNER_TOP', None) -if 'BANNER_BOTTOM' in environ: - BANNER_BOTTOM = environ.get('BANNER_BOTTOM', None) +if "BANNER_TOP" in environ: + BANNER_TOP = environ.get("BANNER_TOP", None) +if "BANNER_BOTTOM" in environ: + BANNER_BOTTOM = environ.get("BANNER_BOTTOM", None) # Text to include on the login page above the login form. HTML is allowed. -if 'BANNER_LOGIN' in environ: - BANNER_LOGIN = environ.get('BANNER_LOGIN', None) +if "BANNER_LOGIN" in environ: + BANNER_LOGIN = environ.get("BANNER_LOGIN", None) # Maximum number of days to retain logged changes. Set to 0 to retain changes indefinitely. (Default: 90) -if 'CHANGELOG_RETENTION' in environ: - CHANGELOG_RETENTION = _environ_get_and_map('CHANGELOG_RETENTION', None, _AS_INT) +if "CHANGELOG_RETENTION" in environ: + CHANGELOG_RETENTION = _environ_get_and_map("CHANGELOG_RETENTION", None, _AS_INT) # Maximum number of days to retain job results (scripts and reports). Set to 0 to retain job results in the database indefinitely. (Default: 90) -if 'JOB_RETENTION' in environ: - JOB_RETENTION = _environ_get_and_map('JOB_RETENTION', None, _AS_INT) +if "JOB_RETENTION" in environ: + JOB_RETENTION = _environ_get_and_map("JOB_RETENTION", None, _AS_INT) # JOBRESULT_RETENTION was renamed to JOB_RETENTION in the v3.5.0 release of NetBox. For backwards compatibility, map JOBRESULT_RETENTION to JOB_RETENTION -elif 'JOBRESULT_RETENTION' in environ: - JOB_RETENTION = _environ_get_and_map('JOBRESULT_RETENTION', None, _AS_INT) +elif "JOBRESULT_RETENTION" in environ: + JOB_RETENTION = _environ_get_and_map("JOBRESULT_RETENTION", None, _AS_INT) # API Cross-Origin Resource Sharing (CORS) settings. If CORS_ORIGIN_ALLOW_ALL is set to True, all origins will be # allowed. Otherwise, define a list of allowed origins using either CORS_ORIGIN_WHITELIST or # CORS_ORIGIN_REGEX_WHITELIST. For more information, see https://github.com/ottoyiu/django-cors-headers -CORS_ORIGIN_ALLOW_ALL = _environ_get_and_map('CORS_ORIGIN_ALLOW_ALL', 'False', _AS_BOOL) -CORS_ORIGIN_WHITELIST = _environ_get_and_map('CORS_ORIGIN_WHITELIST', 'https://localhost', _AS_LIST) -CORS_ORIGIN_REGEX_WHITELIST = [re.compile(r) for r in _environ_get_and_map('CORS_ORIGIN_REGEX_WHITELIST', '', _AS_LIST)] +CORS_ORIGIN_ALLOW_ALL = _environ_get_and_map("CORS_ORIGIN_ALLOW_ALL", "False", _AS_BOOL) +CORS_ORIGIN_WHITELIST = _environ_get_and_map( + "CORS_ORIGIN_WHITELIST", "https://localhost", _AS_LIST +) +CORS_ORIGIN_REGEX_WHITELIST = [ + re.compile(r) + for r in _environ_get_and_map("CORS_ORIGIN_REGEX_WHITELIST", "", _AS_LIST) +] # Set to True to enable server debugging. WARNING: Debugging introduces a substantial performance penalty and may reveal # sensitive information about your installation. Only enable debugging while performing testing. # Never enable debugging on a production system. -DEBUG = _environ_get_and_map('DEBUG', 'False', _AS_BOOL) +DEBUG = _environ_get_and_map("DEBUG", "True", _AS_BOOL) # This parameter serves as a safeguard to prevent some potentially dangerous behavior, # such as generating new database schema migrations. # Set this to True only if you are actively developing the NetBox code base. -DEVELOPER = _environ_get_and_map('DEVELOPER', 'False', _AS_BOOL) +DEVELOPER = _environ_get_and_map("DEVELOPER", "False", _AS_BOOL) # Email settings EMAIL = { - 'SERVER': environ.get('EMAIL_SERVER', 'localhost'), - 'PORT': _environ_get_and_map('EMAIL_PORT', 25, _AS_INT), - 'USERNAME': environ.get('EMAIL_USERNAME', ''), - 'PASSWORD': _read_secret('email_password', environ.get('EMAIL_PASSWORD', '')), - 'USE_SSL': _environ_get_and_map('EMAIL_USE_SSL', 'False', _AS_BOOL), - 'USE_TLS': _environ_get_and_map('EMAIL_USE_TLS', 'False', _AS_BOOL), - 'SSL_CERTFILE': environ.get('EMAIL_SSL_CERTFILE', ''), - 'SSL_KEYFILE': environ.get('EMAIL_SSL_KEYFILE', ''), - 'TIMEOUT': _environ_get_and_map('EMAIL_TIMEOUT', 10, _AS_INT), # seconds - 'FROM_EMAIL': environ.get('EMAIL_FROM', ''), + "SERVER": environ.get("EMAIL_SERVER", "localhost"), + "PORT": _environ_get_and_map("EMAIL_PORT", 25, _AS_INT), + "USERNAME": environ.get("EMAIL_USERNAME", ""), + "PASSWORD": _read_secret("email_password", environ.get("EMAIL_PASSWORD", "")), + "USE_SSL": _environ_get_and_map("EMAIL_USE_SSL", "False", _AS_BOOL), + "USE_TLS": _environ_get_and_map("EMAIL_USE_TLS", "False", _AS_BOOL), + "SSL_CERTFILE": environ.get("EMAIL_SSL_CERTFILE", ""), + "SSL_KEYFILE": environ.get("EMAIL_SSL_KEYFILE", ""), + "TIMEOUT": _environ_get_and_map("EMAIL_TIMEOUT", 10, _AS_INT), # seconds + "FROM_EMAIL": environ.get("EMAIL_FROM", ""), } # Enforcement of unique IP space can be toggled on a per-VRF basis. To enforce unique IP space within the global table # (all prefixes and IP addresses not assigned to a VRF), set ENFORCE_GLOBAL_UNIQUE to True. -if 'ENFORCE_GLOBAL_UNIQUE' in environ: - ENFORCE_GLOBAL_UNIQUE = _environ_get_and_map('ENFORCE_GLOBAL_UNIQUE', None, _AS_BOOL) +if "ENFORCE_GLOBAL_UNIQUE" in environ: + ENFORCE_GLOBAL_UNIQUE = _environ_get_and_map( + "ENFORCE_GLOBAL_UNIQUE", None, _AS_BOOL + ) # Exempt certain models from the enforcement of view permissions. Models listed here will be viewable by all users and # by anonymous users. List models in the form `.`. Add '*' to this list to exempt all models. -EXEMPT_VIEW_PERMISSIONS = _environ_get_and_map('EXEMPT_VIEW_PERMISSIONS', '', _AS_LIST) +EXEMPT_VIEW_PERMISSIONS = _environ_get_and_map("EXEMPT_VIEW_PERMISSIONS", "", _AS_LIST) # HTTP proxies NetBox should use when sending outbound HTTP requests (e.g. for webhooks). # HTTP_PROXIES = { @@ -203,52 +225,71 @@ def _AS_LIST(value): # IP addresses recognized as internal to the system. The debugging toolbar will be available only to clients accessing # NetBox from an internal IP. -INTERNAL_IPS = _environ_get_and_map('INTERNAL_IPS', '127.0.0.1 ::1', _AS_LIST) +INTERNAL_IPS = _environ_get_and_map("INTERNAL_IPS", "127.0.0.1 ::1", _AS_LIST) # Enable GraphQL API. -if 'GRAPHQL_ENABLED' in environ: - GRAPHQL_ENABLED = _environ_get_and_map('GRAPHQL_ENABLED', None, _AS_BOOL) +if "GRAPHQL_ENABLED" in environ: + GRAPHQL_ENABLED = _environ_get_and_map("GRAPHQL_ENABLED", None, _AS_BOOL) # # Enable custom logging. Please see the Django documentation for detailed guidance on configuring custom logs: # # https://docs.djangoproject.com/en/stable/topics/logging/ # LOGGING = {} +LOGGING = { + "version": 1, + "disable_existing_loggers": False, + "handlers": { + "console": { + "class": "logging.StreamHandler", + }, + }, + "root": { + "handlers": ["console"], + "level": "INFO", + }, + "loggers": { + "netbox.*": { + "handlers": ["console"], + "level": "INFO", + }, + }, +} # Automatically reset the lifetime of a valid session upon each authenticated request. Enables users to remain # authenticated to NetBox indefinitely. -LOGIN_PERSISTENCE = _environ_get_and_map('LOGIN_PERSISTENCE', 'False', _AS_BOOL) +LOGIN_PERSISTENCE = _environ_get_and_map("LOGIN_PERSISTENCE", "False", _AS_BOOL) # Setting this to True will permit only authenticated users to access any part of NetBox. By default, anonymous users # are permitted to access most data in NetBox (excluding secrets) but not make any changes. -LOGIN_REQUIRED = _environ_get_and_map('LOGIN_REQUIRED', 'False', _AS_BOOL) +LOGIN_REQUIRED = _environ_get_and_map("LOGIN_REQUIRED", "False", _AS_BOOL) # The length of time (in seconds) for which a user will remain logged into the web UI before being prompted to # re-authenticate. (Default: 1209600 [14 days]) -LOGIN_TIMEOUT = _environ_get_and_map('LOGIN_TIMEOUT', 1209600, _AS_INT) +LOGIN_TIMEOUT = _environ_get_and_map("LOGIN_TIMEOUT", 1209600, _AS_INT) # Setting this to True will display a "maintenance mode" banner at the top of every page. -if 'MAINTENANCE_MODE' in environ: - MAINTENANCE_MODE = _environ_get_and_map('MAINTENANCE_MODE', None, _AS_BOOL) +if "MAINTENANCE_MODE" in environ: + MAINTENANCE_MODE = _environ_get_and_map("MAINTENANCE_MODE", None, _AS_BOOL) # Maps provider -if 'MAPS_URL' in environ: - MAPS_URL = environ.get('MAPS_URL', None) +if "MAPS_URL" in environ: + MAPS_URL = environ.get("MAPS_URL", None) # An API consumer can request an arbitrary number of objects =by appending the "limit" parameter to the URL (e.g. # "?limit=1000"). This setting defines the maximum limit. Setting it to 0 or None will allow an API consumer to request # all objects by specifying "?limit=0". -if 'MAX_PAGE_SIZE' in environ: - MAX_PAGE_SIZE = _environ_get_and_map('MAX_PAGE_SIZE', None, _AS_INT) +if "MAX_PAGE_SIZE" in environ: + MAX_PAGE_SIZE = _environ_get_and_map("MAX_PAGE_SIZE", None, _AS_INT) # The file path where uploaded media such as image attachments are stored. A trailing slash is not needed. Note that # the default value of this setting is derived from the installed location. -MEDIA_ROOT = environ.get('MEDIA_ROOT', join(_BASE_DIR, 'media')) +MEDIA_ROOT = environ.get("MEDIA_ROOT", join(_BASE_DIR, "media")) # Expose Prometheus monitoring metrics at the HTTP endpoint '/metrics' -METRICS_ENABLED = _environ_get_and_map('METRICS_ENABLED', 'False', _AS_BOOL) +METRICS_ENABLED = _environ_get_and_map("METRICS_ENABLED", "False", _AS_BOOL) # Determine how many objects to display per page within a list. (Default: 50) -if 'PAGINATE_COUNT' in environ: - PAGINATE_COUNT = _environ_get_and_map('PAGINATE_COUNT', None, _AS_INT) +if "PAGINATE_COUNT" in environ: + PAGINATE_COUNT = _environ_get_and_map("PAGINATE_COUNT", None, _AS_INT) # # Enable installed plugins. Add the name of each plugin to the list. # PLUGINS = [] @@ -260,68 +301,84 @@ def _AS_LIST(value): # When determining the primary IP address for a device, IPv6 is preferred over IPv4 by default. Set this to True to # prefer IPv4 instead. -if 'PREFER_IPV4' in environ: - PREFER_IPV4 = _environ_get_and_map('PREFER_IPV4', None, _AS_BOOL) +if "PREFER_IPV4" in environ: + PREFER_IPV4 = _environ_get_and_map("PREFER_IPV4", None, _AS_BOOL) # The default value for the amperage field when creating new power feeds. -if 'POWERFEED_DEFAULT_AMPERAGE' in environ: - POWERFEED_DEFAULT_AMPERAGE = _environ_get_and_map('POWERFEED_DEFAULT_AMPERAGE', None, _AS_INT) +if "POWERFEED_DEFAULT_AMPERAGE" in environ: + POWERFEED_DEFAULT_AMPERAGE = _environ_get_and_map( + "POWERFEED_DEFAULT_AMPERAGE", None, _AS_INT + ) # The default value (percentage) for the max_utilization field when creating new power feeds. -if 'POWERFEED_DEFAULT_MAX_UTILIZATION' in environ: - POWERFEED_DEFAULT_MAX_UTILIZATION = _environ_get_and_map('POWERFEED_DEFAULT_MAX_UTILIZATION', None, _AS_INT) +if "POWERFEED_DEFAULT_MAX_UTILIZATION" in environ: + POWERFEED_DEFAULT_MAX_UTILIZATION = _environ_get_and_map( + "POWERFEED_DEFAULT_MAX_UTILIZATION", None, _AS_INT + ) # The default value for the voltage field when creating new power feeds. -if 'POWERFEED_DEFAULT_VOLTAGE' in environ: - POWERFEED_DEFAULT_VOLTAGE = _environ_get_and_map('POWERFEED_DEFAULT_VOLTAGE', None, _AS_INT) +if "POWERFEED_DEFAULT_VOLTAGE" in environ: + POWERFEED_DEFAULT_VOLTAGE = _environ_get_and_map( + "POWERFEED_DEFAULT_VOLTAGE", None, _AS_INT + ) # Rack elevation size defaults, in pixels. For best results, the ratio of width to height should be roughly 10:1. -if 'RACK_ELEVATION_DEFAULT_UNIT_HEIGHT' in environ: - RACK_ELEVATION_DEFAULT_UNIT_HEIGHT = _environ_get_and_map('RACK_ELEVATION_DEFAULT_UNIT_HEIGHT', None, _AS_INT) -if 'RACK_ELEVATION_DEFAULT_UNIT_WIDTH' in environ: - RACK_ELEVATION_DEFAULT_UNIT_WIDTH = _environ_get_and_map('RACK_ELEVATION_DEFAULT_UNIT_WIDTH', None, _AS_INT) +if "RACK_ELEVATION_DEFAULT_UNIT_HEIGHT" in environ: + RACK_ELEVATION_DEFAULT_UNIT_HEIGHT = _environ_get_and_map( + "RACK_ELEVATION_DEFAULT_UNIT_HEIGHT", None, _AS_INT + ) +if "RACK_ELEVATION_DEFAULT_UNIT_WIDTH" in environ: + RACK_ELEVATION_DEFAULT_UNIT_WIDTH = _environ_get_and_map( + "RACK_ELEVATION_DEFAULT_UNIT_WIDTH", None, _AS_INT + ) # Remote authentication support -REMOTE_AUTH_ENABLED = _environ_get_and_map('REMOTE_AUTH_ENABLED', 'False', _AS_BOOL) -REMOTE_AUTH_BACKEND = _environ_get_and_map('REMOTE_AUTH_BACKEND', 'netbox.authentication.RemoteUserBackend', _AS_LIST) -REMOTE_AUTH_HEADER = environ.get('REMOTE_AUTH_HEADER', 'HTTP_REMOTE_USER') -REMOTE_AUTH_AUTO_CREATE_USER = _environ_get_and_map('REMOTE_AUTH_AUTO_CREATE_USER', 'False', _AS_BOOL) -REMOTE_AUTH_DEFAULT_GROUPS = _environ_get_and_map('REMOTE_AUTH_DEFAULT_GROUPS', '', _AS_LIST) +REMOTE_AUTH_ENABLED = _environ_get_and_map("REMOTE_AUTH_ENABLED", "False", _AS_BOOL) +REMOTE_AUTH_BACKEND = _environ_get_and_map( + "REMOTE_AUTH_BACKEND", "netbox.authentication.RemoteUserBackend", _AS_LIST +) +REMOTE_AUTH_HEADER = environ.get("REMOTE_AUTH_HEADER", "HTTP_REMOTE_USER") +REMOTE_AUTH_AUTO_CREATE_USER = _environ_get_and_map( + "REMOTE_AUTH_AUTO_CREATE_USER", "False", _AS_BOOL +) +REMOTE_AUTH_DEFAULT_GROUPS = _environ_get_and_map( + "REMOTE_AUTH_DEFAULT_GROUPS", "", _AS_LIST +) # REMOTE_AUTH_DEFAULT_PERMISSIONS = {} # This repository is used to check whether there is a new release of NetBox available. Set to None to disable the # version check or use the URL below to check for release in the official NetBox repository. -RELEASE_CHECK_URL = environ.get('RELEASE_CHECK_URL', None) +RELEASE_CHECK_URL = environ.get("RELEASE_CHECK_URL", None) # RELEASE_CHECK_URL = 'https://api.github.com/repos/netbox-community/netbox/releases' # Maximum execution time for background tasks, in seconds. -RQ_DEFAULT_TIMEOUT = _environ_get_and_map('RQ_DEFAULT_TIMEOUT', 300, _AS_INT) +RQ_DEFAULT_TIMEOUT = _environ_get_and_map("RQ_DEFAULT_TIMEOUT", 300, _AS_INT) # The name to use for the csrf token cookie. -CSRF_COOKIE_NAME = environ.get('CSRF_COOKIE_NAME', 'csrftoken') +CSRF_COOKIE_NAME = environ.get("CSRF_COOKIE_NAME", "csrftoken") # Cross-Site-Request-Forgery-Attack settings. If Netbox is sitting behind a reverse proxy, you might need to set the CSRF_TRUSTED_ORIGINS flag. # Django 4.0 requires to specify the URL Scheme in this setting. An example environment variable could be specified like: # CSRF_TRUSTED_ORIGINS=https://demo.netbox.dev http://demo.netbox.dev -CSRF_TRUSTED_ORIGINS = _environ_get_and_map('CSRF_TRUSTED_ORIGINS', '', _AS_LIST) +CSRF_TRUSTED_ORIGINS = _environ_get_and_map("CSRF_TRUSTED_ORIGINS", "", _AS_LIST) # The name to use for the session cookie. -SESSION_COOKIE_NAME = environ.get('SESSION_COOKIE_NAME', 'sessionid') +SESSION_COOKIE_NAME = environ.get("SESSION_COOKIE_NAME", "sessionid") # By default, NetBox will store session data in the database. Alternatively, a file path can be specified here to use # local file storage instead. (This can be useful for enabling authentication on a standby instance with read-only # database access.) Note that the user as which NetBox runs must have read and write permissions to this path. -SESSION_FILE_PATH = environ.get('SESSION_FILE_PATH', environ.get('SESSIONS_ROOT', None)) +SESSION_FILE_PATH = environ.get("SESSION_FILE_PATH", environ.get("SESSIONS_ROOT", None)) # Time zone (default: UTC) -TIME_ZONE = environ.get('TIME_ZONE', 'UTC') +TIME_ZONE = environ.get("TIME_ZONE", "UTC") # Date/time formatting. See the following link for supported formats: # https://docs.djangoproject.com/en/stable/ref/templates/builtins/#date -DATE_FORMAT = environ.get('DATE_FORMAT', 'N j, Y') -SHORT_DATE_FORMAT = environ.get('SHORT_DATE_FORMAT', 'Y-m-d') -TIME_FORMAT = environ.get('TIME_FORMAT', 'g:i a') -SHORT_TIME_FORMAT = environ.get('SHORT_TIME_FORMAT', 'H:i:s') -DATETIME_FORMAT = environ.get('DATETIME_FORMAT', 'N j, Y g:i a') -SHORT_DATETIME_FORMAT = environ.get('SHORT_DATETIME_FORMAT', 'Y-m-d H:i') -BASE_PATH = environ.get('BASE_PATH', '') +DATE_FORMAT = environ.get("DATE_FORMAT", "N j, Y") +SHORT_DATE_FORMAT = environ.get("SHORT_DATE_FORMAT", "Y-m-d") +TIME_FORMAT = environ.get("TIME_FORMAT", "g:i a") +SHORT_TIME_FORMAT = environ.get("SHORT_TIME_FORMAT", "H:i:s") +DATETIME_FORMAT = environ.get("DATETIME_FORMAT", "N j, Y g:i a") +SHORT_DATETIME_FORMAT = environ.get("SHORT_DATETIME_FORMAT", "Y-m-d H:i") +BASE_PATH = environ.get("BASE_PATH", "") diff --git a/docker/netbox/configuration/extra.py b/docker/netbox/configuration/extra.py index 8bd1337..ff6fb6d 100644 --- a/docker/netbox/configuration/extra.py +++ b/docker/netbox/configuration/extra.py @@ -1,26 +1,26 @@ #### -## This file contains extra configuration options that can't be configured -## directly through environment variables. +# This file contains extra configuration options that can't be configured +# directly through environment variables. #### -## Specify one or more name and email address tuples representing NetBox administrators. These people will be notified of -## application errors (assuming correct email settings are provided). +# Specify one or more name and email address tuples representing NetBox administrators. These people will be notified of +# application errors (assuming correct email settings are provided). # ADMINS = [ # # ['John Doe', 'jdoe@example.com'], # ] -## URL schemes that are allowed within links in NetBox +# URL schemes that are allowed within links in NetBox # ALLOWED_URL_SCHEMES = ( # 'file', 'ftp', 'ftps', 'http', 'https', 'irc', 'mailto', 'sftp', 'ssh', 'tel', 'telnet', 'tftp', 'vnc', 'xmpp', # ) -## Enable installed plugins. Add the name of each plugin to the list. +# Enable installed plugins. Add the name of each plugin to the list. # from netbox.configuration.configuration import PLUGINS # PLUGINS.append('my_plugin') -## Plugins configuration settings. These settings are used by various plugins that the user may have installed. -## Each key in the dictionary is the name of an installed plugin and its value is a dictionary of settings. +# Plugins configuration settings. These settings are used by various plugins that the user may have installed. +# Each key in the dictionary is the name of an installed plugin and its value is a dictionary of settings. # from netbox.configuration.configuration import PLUGINS_CONFIG # PLUGINS_CONFIG['my_plugin'] = { # 'foo': 'bar', @@ -28,12 +28,12 @@ # } -## Remote authentication support +# Remote authentication support # REMOTE_AUTH_DEFAULT_PERMISSIONS = {} -## By default uploaded media is stored on the local filesystem. Using Django-storages is also supported. Provide the -## class path of the storage driver in STORAGE_BACKEND and any configuration options in STORAGE_CONFIG. For example: +# By default uploaded media is stored on the local filesystem. Using Django-storages is also supported. Provide the +# class path of the storage driver in STORAGE_BACKEND and any configuration options in STORAGE_CONFIG. For example: # STORAGE_BACKEND = 'storages.backends.s3boto3.S3Boto3Storage' # STORAGE_CONFIG = { # 'AWS_ACCESS_KEY_ID': 'Key ID', @@ -43,7 +43,7 @@ # } -## This file can contain arbitrary Python code, e.g.: +# This file can contain arbitrary Python code, e.g.: # from datetime import datetime # now = datetime.now().strftime("%d/%m/%Y %H:%M:%S") # BANNER_TOP = f'This instance started on {now}.' diff --git a/docker/netbox/configuration/ldap/extra.py b/docker/netbox/configuration/ldap/extra.py deleted file mode 100644 index 4505197..0000000 --- a/docker/netbox/configuration/ldap/extra.py +++ /dev/null @@ -1,28 +0,0 @@ -#### -## This file contains extra configuration options that can't be configured -## directly through environment variables. -## All vairables set here overwrite any existing found in ldap_config.py -#### - -# # This Python script inherits all the imports from ldap_config.py -# from django_auth_ldap.config import LDAPGroupQuery # Imported since not in ldap_config.py - -# # Sets a base requirement of membetship to netbox-user-ro, netbox-user-rw, or netbox-user-admin. -# AUTH_LDAP_REQUIRE_GROUP = ( -# LDAPGroupQuery("cn=netbox-user-ro,ou=groups,dc=example,dc=com") -# | LDAPGroupQuery("cn=netbox-user-rw,ou=groups,dc=example,dc=com") -# | LDAPGroupQuery("cn=netbox-user-admin,ou=groups,dc=example,dc=com") -# ) - -# # Sets LDAP Flag groups variables with example. -# AUTH_LDAP_USER_FLAGS_BY_GROUP = { -# "is_staff": ( -# LDAPGroupQuery("cn=netbox-user-ro,ou=groups,dc=example,dc=com") -# | LDAPGroupQuery("cn=netbox-user-rw,ou=groups,dc=example,dc=com") -# | LDAPGroupQuery("cn=netbox-user-admin,ou=groups,dc=example,dc=com") -# ), -# "is_superuser": "cn=netbox-user-admin,ou=groups,dc=example,dc=com", -# } - -# # Sets LDAP Mirror groups variables with example groups -# AUTH_LDAP_MIRROR_GROUPS = ["netbox-user-ro", "netbox-user-rw", "netbox-user-admin"] diff --git a/docker/netbox/configuration/ldap/ldap_config.py b/docker/netbox/configuration/ldap/ldap_config.py deleted file mode 100644 index 32743c7..0000000 --- a/docker/netbox/configuration/ldap/ldap_config.py +++ /dev/null @@ -1,113 +0,0 @@ -from importlib import import_module -from os import environ - -import ldap -from django_auth_ldap.config import LDAPSearch - - -# Read secret from file -def _read_secret(secret_name, default=None): - try: - f = open('/run/secrets/' + secret_name, encoding='utf-8') - except OSError: - return default - else: - with f: - return f.readline().strip() - - -# Import and return the group type based on string name -def _import_group_type(group_type_name): - mod = import_module('django_auth_ldap.config') - try: - return getattr(mod, group_type_name)() - except: - return None - - -# Server URI -AUTH_LDAP_SERVER_URI = environ.get('AUTH_LDAP_SERVER_URI', '') - -# The following may be needed if you are binding to Active Directory. -AUTH_LDAP_CONNECTION_OPTIONS = { - ldap.OPT_REFERRALS: 0 -} - -AUTH_LDAP_BIND_AS_AUTHENTICATING_USER = environ.get('AUTH_LDAP_BIND_AS_AUTHENTICATING_USER', 'False').lower() == 'true' - -# Set the DN and password for the NetBox service account if needed. -if not AUTH_LDAP_BIND_AS_AUTHENTICATING_USER: - AUTH_LDAP_BIND_DN = environ.get('AUTH_LDAP_BIND_DN', '') - AUTH_LDAP_BIND_PASSWORD = _read_secret('auth_ldap_bind_password', environ.get('AUTH_LDAP_BIND_PASSWORD', '')) - -# Set a string template that describes any user’s distinguished name based on the username. -AUTH_LDAP_USER_DN_TEMPLATE = environ.get('AUTH_LDAP_USER_DN_TEMPLATE', None) - -# Enable STARTTLS for ldap authentication. -AUTH_LDAP_START_TLS = environ.get('AUTH_LDAP_START_TLS', 'False').lower() == 'true' - -# Include this setting if you want to ignore certificate errors. This might be needed to accept a self-signed cert. -# Note that this is a NetBox-specific setting which sets: -# ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) -LDAP_IGNORE_CERT_ERRORS = environ.get('LDAP_IGNORE_CERT_ERRORS', 'False').lower() == 'true' - -# Include this setting if you want to validate the LDAP server certificates against a CA certificate directory on your server -# Note that this is a NetBox-specific setting which sets: -# ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, LDAP_CA_CERT_DIR) -LDAP_CA_CERT_DIR = environ.get('LDAP_CA_CERT_DIR', None) - -# Include this setting if you want to validate the LDAP server certificates against your own CA. -# Note that this is a NetBox-specific setting which sets: -# ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, LDAP_CA_CERT_FILE) -LDAP_CA_CERT_FILE = environ.get('LDAP_CA_CERT_FILE', None) - -AUTH_LDAP_USER_SEARCH_BASEDN = environ.get('AUTH_LDAP_USER_SEARCH_BASEDN', '') -AUTH_LDAP_USER_SEARCH_ATTR = environ.get('AUTH_LDAP_USER_SEARCH_ATTR', 'sAMAccountName') -AUTH_LDAP_USER_SEARCH_FILTER: str = environ.get( - 'AUTH_LDAP_USER_SEARCH_FILTER', f'({AUTH_LDAP_USER_SEARCH_ATTR}=%(user)s)' -) - -AUTH_LDAP_USER_SEARCH = LDAPSearch( - AUTH_LDAP_USER_SEARCH_BASEDN, ldap.SCOPE_SUBTREE, AUTH_LDAP_USER_SEARCH_FILTER -) - -# This search ought to return all groups to which the user belongs. django_auth_ldap uses this to determine group -# heirarchy. - -AUTH_LDAP_GROUP_SEARCH_BASEDN = environ.get('AUTH_LDAP_GROUP_SEARCH_BASEDN', '') -AUTH_LDAP_GROUP_SEARCH_CLASS = environ.get('AUTH_LDAP_GROUP_SEARCH_CLASS', 'group') - -AUTH_LDAP_GROUP_SEARCH_FILTER: str = environ.get( - 'AUTH_LDAP_GROUP_SEARCH_FILTER', f'(objectclass={AUTH_LDAP_GROUP_SEARCH_CLASS})' -) -AUTH_LDAP_GROUP_SEARCH = LDAPSearch( - AUTH_LDAP_GROUP_SEARCH_BASEDN, ldap.SCOPE_SUBTREE, AUTH_LDAP_GROUP_SEARCH_FILTER -) -AUTH_LDAP_GROUP_TYPE = _import_group_type(environ.get('AUTH_LDAP_GROUP_TYPE', 'GroupOfNamesType')) - -# Define a group required to login. -AUTH_LDAP_REQUIRE_GROUP = environ.get('AUTH_LDAP_REQUIRE_GROUP_DN') - -# Define special user types using groups. Exercise great caution when assigning superuser status. -AUTH_LDAP_USER_FLAGS_BY_GROUP = {} - -if AUTH_LDAP_REQUIRE_GROUP is not None: - AUTH_LDAP_USER_FLAGS_BY_GROUP = { - "is_active": environ.get('AUTH_LDAP_REQUIRE_GROUP_DN', ''), - "is_staff": environ.get('AUTH_LDAP_IS_ADMIN_DN', ''), - "is_superuser": environ.get('AUTH_LDAP_IS_SUPERUSER_DN', '') - } - -# For more granular permissions, we can map LDAP groups to Django groups. -AUTH_LDAP_FIND_GROUP_PERMS = environ.get('AUTH_LDAP_FIND_GROUP_PERMS', 'True').lower() == 'true' -AUTH_LDAP_MIRROR_GROUPS = environ.get('AUTH_LDAP_MIRROR_GROUPS', '').lower() == 'true' - -# Cache groups for one hour to reduce LDAP traffic -AUTH_LDAP_CACHE_TIMEOUT = int(environ.get('AUTH_LDAP_CACHE_TIMEOUT', 3600)) - -# Populate the Django user from the LDAP directory. -AUTH_LDAP_USER_ATTR_MAP = { - "first_name": environ.get('AUTH_LDAP_ATTR_FIRSTNAME', 'givenName'), - "last_name": environ.get('AUTH_LDAP_ATTR_LASTNAME', 'sn'), - "email": environ.get('AUTH_LDAP_ATTR_MAIL', 'mail') -} diff --git a/docker/netbox/configuration/logging.py b/docker/netbox/configuration/logging.py index f145c5c..891b0d5 100644 --- a/docker/netbox/configuration/logging.py +++ b/docker/netbox/configuration/logging.py @@ -15,6 +15,7 @@ }, }, } + # # Remove first comment(#) on each line to implement this working logging example. # # Add LOGLEVEL environment variable to netbox if you use this example & want a different log level. # from os import environ diff --git a/docker/netbox/configuration/plugins.py b/docker/netbox/configuration/plugins.py index c6deec2..ae4c5b0 100644 --- a/docker/netbox/configuration/plugins.py +++ b/docker/netbox/configuration/plugins.py @@ -8,22 +8,3 @@ "netbox_diode_plugin", "netbox_branching", ] - -# PLUGINS_CONFIG = { -# "netbox_diode_plugin": { -# # Auto-provision users for Diode plugin -# "auto_provision_users": True, -# -# # Diode gRPC target for communication with Diode server -# "diode_target_override": "grpc://localhost:8080/diode", -# -# # User allowed for Diode to NetBox communication -# "diode_to_netbox_username": "diode-to-netbox", -# -# # User allowed for NetBox to Diode communication -# "netbox_to_diode_username": "netbox-to-diode", -# -# # User allowed for data ingestion -# "diode_username": "diode-ingestion", -# }, -# } diff --git a/docker/v4.2.3/netbox/docker-entrypoint.sh b/docker/v4.2.3/netbox/docker-entrypoint.sh old mode 100755 new mode 100644 diff --git a/docker/v4.2.3/netbox/launch-netbox.sh b/docker/v4.2.3/netbox/launch-netbox.sh old mode 100755 new mode 100644 diff --git a/netbox_diode_plugin/api/urls.py b/netbox_diode_plugin/api/urls.py index c41963a..c48397d 100644 --- a/netbox_diode_plugin/api/urls.py +++ b/netbox_diode_plugin/api/urls.py @@ -5,12 +5,13 @@ from django.urls import include, path from netbox.api.routers import NetBoxRouter -from .views import ApplyChangeSetView, GenerateDiffView +from .views import ApplyChangeSetView, GenerateDiffView, GetDefaultBranchView router = NetBoxRouter() urlpatterns = [ path("apply-change-set/", ApplyChangeSetView.as_view()), path("generate-diff/", GenerateDiffView.as_view()), + path("default-branch/", GetDefaultBranchView.as_view()), path("", include(router.urls)), ] diff --git a/netbox_diode_plugin/api/views.py b/netbox_diode_plugin/api/views.py index 9ed9ba6..00e80ad 100644 --- a/netbox_diode_plugin/api/views.py +++ b/netbox_diode_plugin/api/views.py @@ -76,6 +76,35 @@ def post(self, request, *args, **kwargs): traceback.print_exc() raise + def _get_branch_schema_id(self, request): + """Get branch schema ID from request header or settings.""" + branch_schema_id = request.headers.get("X-NetBox-Branch") + + # If no branch specified in header, check for default branch in settings + if not branch_schema_id and Branch is not None: + try: + from netbox_diode_plugin.models import Setting + settings = Setting.objects.first() + if settings and settings.branch: + branch_schema_id = settings.branch.schema_id + logger.debug( + f"Using default branch from settings: {settings.branch.name} ({branch_schema_id})" + ) + except Exception as e: + logger.warning(f"Could not retrieve default branch from settings: {e}") + + return branch_schema_id + + def _add_branch_to_result(self, result, branch_schema_id): + """Add branch information to the result if branch is available.""" + if branch_schema_id and Branch is not None: + try: + branch = Branch.objects.get(schema_id=branch_schema_id) + result.change_set.branch = {"id": branch.schema_id, "name": branch.name} + except Branch.DoesNotExist: + sanitized_branch_id = branch_schema_id.replace('\n', '').replace('\r', '') + logger.warning(f"Branch with ID {sanitized_branch_id} does not exist") + def _post(self, request, *args, **kwargs): entity = request.data.get("entity") object_type = request.data.get("object_type") @@ -128,16 +157,8 @@ def _post(self, request, *args, **kwargs): ) result = generate_changeset(original_entity_data, object_type) - branch_schema_id = request.headers.get("X-NetBox-Branch") - - # If branch schema ID is provided and branching plugin is installed, get branch name - if branch_schema_id and Branch is not None: - try: - branch = Branch.objects.get(schema_id=branch_schema_id) - result.change_set.branch = {"id": branch.schema_id, "name": branch.name} - except Branch.DoesNotExist: - sanitized_branch_id = branch_schema_id.replace('\n', '').replace('\r', '') - logger.warning(f"Branch with ID {sanitized_branch_id} does not exist") + branch_schema_id = self._get_branch_schema_id(request) + self._add_branch_to_result(result, branch_schema_id) return Response(result.to_dict(), status=result.get_status_code()) @@ -190,3 +211,32 @@ def _post(self, request, *args, **kwargs): ) return Response(result.to_dict(), status=result.get_status_code()) + + +class GetDefaultBranchView(views.APIView): + """GetDefaultBranch view.""" + + authentication_classes = [DiodeOAuth2Authentication] + permission_classes = [IsAuthenticated, require_scopes(SCOPE_NETBOX_READ)] + + def get(self, request, *args, **kwargs): + """Get default branch from settings.""" + branch_data = None + + # Check for default branch in settings + if Branch is not None: + try: + from netbox_diode_plugin.models import Setting + settings = Setting.objects.first() + if settings and settings.branch: + branch_data = { + "id": settings.branch.schema_id, + "name": settings.branch.name + } + logger.debug( + f"Default branch from settings: {settings.branch.name} ({settings.branch.schema_id})" + ) + except Exception as e: + logger.warning(f"Could not retrieve default branch from settings: {e}") + + return Response({"branch": branch_data}) diff --git a/netbox_diode_plugin/forms.py b/netbox_diode_plugin/forms.py index 05de3bd..f8f905d 100644 --- a/netbox_diode_plugin/forms.py +++ b/netbox_diode_plugin/forms.py @@ -3,7 +3,6 @@ """Diode NetBox Plugin - Forms.""" from django import forms from django.utils.translation import gettext_lazy as _ -from netbox.forms import NetBoxModelForm from netbox.plugins import get_plugin_config from utilities.forms.rendering import FieldSet @@ -15,12 +14,21 @@ ) -class SettingsForm(NetBoxModelForm): +class SettingsForm(forms.ModelForm): """Settings form.""" + # Define branch as a custom field (not part of the model directly) + branch = forms.ModelChoiceField( + queryset=None, + required=False, + label="Branch", + help_text="Select an active branch for Diode. Leave empty to use the main schema.", + ) + fieldsets = ( FieldSet( "diode_target", + "branch", ), ) @@ -28,7 +36,7 @@ class Meta: """Meta class.""" model = Setting - fields = ("diode_target",) + fields = ("diode_target",) # Only include actual model fields def __init__(self, *args, **kwargs): """Initialize the form.""" @@ -44,6 +52,43 @@ def __init__(self, *args, **kwargs): "This field is not allowed to be modified." ) + # Handle branch field based on netbox_branching plugin availability + from django.conf import settings as django_settings + + if "netbox_branching" in django_settings.PLUGINS: + # Branching plugin is installed, configure the branch field + try: + from netbox_branching.models import Branch + + self.fields["branch"].queryset = Branch.objects.filter(status="ready") + + # Set initial value from branch_id + if self.instance and self.instance.branch_id: + try: + self.fields["branch"].initial = Branch.objects.get(id=self.instance.branch_id) + except Branch.DoesNotExist: + pass + except ImportError: + # Plugin is in PLUGINS but not actually available, remove the field + self.fields.pop("branch", None) + else: + # Branching plugin is not installed, remove the branch field + self.fields.pop("branch", None) + + def save(self, commit=True): + """Save the form and update branch_id.""" + instance = super().save(commit=False) + + # Update branch_id from the branch field + if "branch" in self.cleaned_data: + branch = self.cleaned_data["branch"] + instance.branch_id = branch.id if branch else None + + if commit: + instance.save() + + return instance + class ClientCredentialForm(forms.Form): """Form for adding client credentials.""" diff --git a/netbox_diode_plugin/migrations/0007_setting_model_cleanup.py b/netbox_diode_plugin/migrations/0007_setting_model_cleanup.py new file mode 100644 index 0000000..af94284 --- /dev/null +++ b/netbox_diode_plugin/migrations/0007_setting_model_cleanup.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python +# Copyright 2025 NetBox Labs, Inc. +"""Diode NetBox Plugin - Database migrations.""" + +from django.db import migrations, models + + +class Migration(migrations.Migration): + """Clean up Setting model by removing unused fields.""" + + dependencies = [ + ("netbox_diode_plugin", "0006_clientcredentials_alter_setting_diode_target"), + ] + + operations = [ + migrations.RemoveField( + model_name="setting", + name="custom_field_data", + ), + migrations.RemoveField( + model_name='setting', + name='created', + ), + migrations.RemoveField( + model_name='setting', + name='last_updated', + ), + ] diff --git a/netbox_diode_plugin/migrations/0008_setting_branch.py b/netbox_diode_plugin/migrations/0008_setting_branch.py new file mode 100644 index 0000000..f1dd543 --- /dev/null +++ b/netbox_diode_plugin/migrations/0008_setting_branch.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# Copyright 2025 NetBox Labs, Inc. +"""Diode NetBox Plugin - Database migrations.""" + +from django.db import migrations, models + + +class Migration(migrations.Migration): + """Add optional branch_id field to Setting model.""" + + dependencies = [ + ("netbox_diode_plugin", "0007_setting_model_cleanup"), + ] + + operations = [ + migrations.AddField( + model_name="setting", + name="branch_id", + field=models.BigIntegerField( + blank=True, + null=True, + help_text="ID of the branch for NetBox Branching plugin integration", + ), + ), + ] diff --git a/netbox_diode_plugin/models.py b/netbox_diode_plugin/models.py index 9a473f8..847e917 100644 --- a/netbox_diode_plugin/models.py +++ b/netbox_diode_plugin/models.py @@ -6,7 +6,7 @@ from django.core.exceptions import ValidationError from django.db import models from django.urls import reverse -from netbox.models import NetBoxModel +from utilities.querysets import RestrictedQuerySet def diode_target_validator(target): @@ -20,11 +20,21 @@ def diode_target_validator(target): raise ValidationError(exc) -class Setting(NetBoxModel): - """Setting model.""" +class Setting(models.Model): + """ + Setting model. + + Simple model without change logging, excluded from branching. + """ diode_target = models.CharField(max_length=255, validators=[diode_target_validator]) - tags = None + branch_id = models.BigIntegerField( + null=True, + blank=True, + help_text="ID of the branch for NetBox Branching plugin integration", + ) + + objects = RestrictedQuerySet.as_manager() class Meta: """Meta class.""" @@ -40,6 +50,39 @@ def get_absolute_url(self): """Return absolute URL.""" return reverse("plugins:netbox_diode_plugin:settings") + @property + def branch(self): + """ + Return the Branch object if branch_id is set and branching plugin is installed. + + Returns None if: + - branch_id is not set + - branching plugin is not installed + - branch with given ID does not exist + """ + if not self.branch_id: + return None + + try: + from netbox_branching.models import Branch + return Branch.objects.get(id=self.branch_id) + except (ImportError, Exception): + return None + + @branch.setter + def branch(self, branch_obj): + """Set branch_id from a Branch object.""" + if branch_obj is None: + self.branch_id = None + else: + self.branch_id = branch_obj.id + + @property + def branch_schema_id(self): + """Return the branch schema_id if branch is set.""" + branch = self.branch + return branch.schema_id if branch else None + class UnmanagedModelManager(models.Manager): """Manager for unmanaged models that prevents database queries.""" diff --git a/netbox_diode_plugin/templates/diode/settings.html b/netbox_diode_plugin/templates/diode/settings.html index 3572029..36cae4d 100644 --- a/netbox_diode_plugin/templates/diode/settings.html +++ b/netbox_diode_plugin/templates/diode/settings.html @@ -6,14 +6,12 @@ {% block title %}{% trans "Settings" %}{% endblock %} {% block controls %} -{% if not is_diode_target_overridden %}
{% block control-buttons %} {% url 'plugins:netbox_diode_plugin:settings_edit' as edit_url %} {% include "buttons/edit.html" with url=edit_url %} {% endblock control-buttons %}
-{% endif %} {% endblock controls %} {% block content %} @@ -25,6 +23,18 @@ {% trans "Diode target" %} {{ diode_target }} + {% if has_branching_plugin %} + + {% trans "Branch" %} + + {% if branch %} + {{ branch.name }} + {% else %} + Main (default) + {% endif %} + + + {% endif %} diff --git a/netbox_diode_plugin/tests/test_api_get_default_branch.py b/netbox_diode_plugin/tests/test_api_get_default_branch.py new file mode 100644 index 0000000..cd370a4 --- /dev/null +++ b/netbox_diode_plugin/tests/test_api_get_default_branch.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python +# Copyright 2025 NetBox Labs, Inc. +"""Diode NetBox Plugin - GetDefaultBranch API Tests.""" + +import logging +from types import SimpleNamespace +from unittest import mock + +from rest_framework import status +from utilities.testing import APITestCase + +from netbox_diode_plugin.api.authentication import DiodeOAuth2Authentication +from netbox_diode_plugin.models import Setting +from netbox_diode_plugin.plugin_config import get_diode_user + +logger = logging.getLogger(__name__) + + +class GetDefaultBranchViewTestCase(APITestCase): + """Test cases for GetDefaultBranchView.""" + + def setUp(self): + """Set up the test case.""" + self.url = "/netbox/api/plugins/diode/default-branch/" + + self.authorization_header = {"HTTP_AUTHORIZATION": "Bearer mocked_oauth_token"} + self.diode_user = SimpleNamespace( + user=get_diode_user(), + token_scopes=["netbox:read", "netbox:write"], + token_data={"scope": "netbox:read netbox:write"} + ) + + self.introspect_patcher = mock.patch.object( + DiodeOAuth2Authentication, + '_introspect_token', + return_value=self.diode_user + ) + self.introspect_patcher.start() + + def tearDown(self): + """Clean up after tests.""" + self.introspect_patcher.stop() + super().tearDown() + + def test_get_default_branch_unauthenticated(self): + """Test that unauthenticated requests are rejected.""" + response = self.client.get(self.url) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_get_default_branch_without_read_scope(self): + """Test that requests without netbox:read scope are rejected.""" + # Mock user with only write scope + user_without_read = SimpleNamespace( + user=get_diode_user(), + token_scopes=["netbox:write"], + token_data={"scope": "netbox:write"} + ) + + with mock.patch.object( + DiodeOAuth2Authentication, + '_introspect_token', + return_value=user_without_read + ): + response = self.client.get(self.url, **self.authorization_header) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_get_default_branch_no_branching_plugin(self): + """Test response when branching plugin is not installed.""" + # Create a setting without branch + Setting.objects.create(diode_target="grpc://localhost:8080/diode") + + # Mock Branch as None (simulating plugin not installed) + with mock.patch('netbox_diode_plugin.api.views.Branch', None): + response = self.client.get(self.url, **self.authorization_header) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("branch", response.json()) + self.assertIsNone(response.json()["branch"]) + + def test_get_default_branch_no_settings(self): + """Test response when no settings exist.""" + # Ensure no settings exist + Setting.objects.all().delete() + + response = self.client.get(self.url, **self.authorization_header) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("branch", response.json()) + self.assertIsNone(response.json()["branch"]) + + def test_get_default_branch_settings_without_branch(self): + """Test response when settings exist but branch is not set.""" + # Create a setting without branch + Setting.objects.create(diode_target="grpc://localhost:8080/diode", branch_id=None) + + response = self.client.get(self.url, **self.authorization_header) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("branch", response.json()) + self.assertIsNone(response.json()["branch"]) + + def test_get_default_branch_with_branching_plugin_and_branch_set(self): + """Test response when branching plugin is installed and branch is set.""" + # Create a mock Branch object + mock_branch = mock.Mock() + mock_branch.schema_id = "branch-123" + mock_branch.name = "main" + mock_branch.id = 1 + + # Create a setting with branch_id + Setting.objects.create( + diode_target="grpc://localhost:8080/diode", + branch_id=1 + ) + + # Mock the Branch model and query + mock_branch_model = mock.Mock() + mock_branch_model.objects.get.return_value = mock_branch + + with mock.patch('netbox_diode_plugin.api.views.Branch', mock_branch_model): + with mock.patch.object(Setting, 'branch', new_callable=mock.PropertyMock) as mock_branch_property: + mock_branch_property.return_value = mock_branch + + response = self.client.get(self.url, **self.authorization_header) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("branch", response.json()) + self.assertIsNotNone(response.json()["branch"]) + self.assertEqual(response.json()["branch"]["id"], "branch-123") + self.assertEqual(response.json()["branch"]["name"], "main") + + def test_get_default_branch_exception_handling(self): + """Test that exceptions during branch retrieval are handled gracefully.""" + # Create a setting with branch_id + Setting.objects.create( + diode_target="grpc://localhost:8080/diode", + branch_id=1 + ) + + # Mock Branch model to exist but raise exception on query + mock_branch_model = mock.Mock() + + with mock.patch('netbox_diode_plugin.api.views.Branch', mock_branch_model): + with mock.patch.object(Setting, 'branch', new_callable=mock.PropertyMock) as mock_branch_property: + # Simulate an exception when accessing the branch property + mock_branch_property.side_effect = Exception("Database error") + + response = self.client.get(self.url, **self.authorization_header) + + # Should return 200 with null branch due to exception handling + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("branch", response.json()) + self.assertIsNone(response.json()["branch"]) + + def test_get_default_branch_with_valid_authentication(self): + """Test that authenticated requests with proper scope are successful.""" + response = self.client.get(self.url, **self.authorization_header) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("branch", response.json()) + # Response structure is correct even if branch is None + self.assertIsInstance(response.json(), dict) diff --git a/netbox_diode_plugin/tests/test_models.py b/netbox_diode_plugin/tests/test_models.py index 09424ec..6621e5e 100644 --- a/netbox_diode_plugin/tests/test_models.py +++ b/netbox_diode_plugin/tests/test_models.py @@ -1,6 +1,9 @@ #!/usr/bin/env python # Copyright 2025 NetBox Labs, Inc. """Diode NetBox Plugin - Tests.""" +from unittest import mock + +from django.apps import apps from django.core.exceptions import ValidationError from django.test import TestCase @@ -17,19 +20,98 @@ def test_validators(self): with self.assertRaises(ValidationError): setting.clean_fields() - def test_str(self): """Check Setting model string representation.""" setting = Setting(diode_target="http://localhost:8080") self.assertEqual(str(setting), "") - def test_absolute_url(self): """Check Setting model absolute URL.""" setting = Setting() self.assertEqual(setting.get_absolute_url(), "/netbox/plugins/diode/settings/") - def test_tags_disabled(self): - """Check Setting model has tags disabled.""" - setting = Setting(diode_target="http://localhost:8080") - self.assertIsNone(setting.tags) + def test_branch_id_field_exists(self): + """Check Setting model has branch_id field.""" + setting = Setting(diode_target="grpc://localhost:8080/diode") + self.assertIsNone(setting.branch_id) + + # Set branch_id + setting.branch_id = 123 + self.assertEqual(setting.branch_id, 123) + + def test_branch_property_returns_none_when_no_branch_id(self): + """Check branch property returns None when branch_id is not set.""" + setting = Setting(diode_target="grpc://localhost:8080/diode") + self.assertIsNone(setting.branch) + + def test_branch_property_returns_none_when_plugin_not_installed(self): + """Check branch property returns None when branching plugin is not installed.""" + setting = Setting(diode_target="grpc://localhost:8080/diode", branch_id=123) + + # Mock the import to simulate plugin not being available + with mock.patch.dict('sys.modules', {'netbox_branching.models': None}): + self.assertIsNone(setting.branch) + + def test_branch_property_returns_branch_when_available(self): + """Check branch property returns Branch object when available.""" + if not apps.is_installed("netbox_branching"): + self.skipTest("netbox_branching plugin not installed") + + from netbox_branching.models import Branch + + # Create a test branch + branch = Branch.objects.create(name="test-branch") + + setting = Setting(diode_target="grpc://localhost:8080/diode", branch_id=branch.id) + + # Check branch property returns the correct branch + self.assertEqual(setting.branch.id, branch.id) + self.assertEqual(setting.branch.name, "test-branch") + + # Clean up + branch.delete() + + def test_branch_setter(self): + """Check branch setter updates branch_id.""" + if not apps.is_installed("netbox_branching"): + self.skipTest("netbox_branching plugin not installed") + + from netbox_branching.models import Branch + + # Create a test branch + branch = Branch.objects.create(name="test-branch-setter") + + setting = Setting(diode_target="grpc://localhost:8080/diode") + + # Use setter to assign branch + setting.branch = branch + self.assertEqual(setting.branch_id, branch.id) + + # Set to None + setting.branch = None + self.assertIsNone(setting.branch_id) + + # Clean up + branch.delete() + + def test_branch_schema_id_property(self): + """Check branch_schema_id property returns schema_id when branch is set.""" + if not apps.is_installed("netbox_branching"): + self.skipTest("netbox_branching plugin not installed") + + from netbox_branching.models import Branch + + # Create a test branch + branch = Branch.objects.create(name="test-branch-schema") + + setting = Setting(diode_target="grpc://localhost:8080/diode", branch_id=branch.id) + + # Check branch_schema_id returns the schema_id + self.assertEqual(setting.branch_schema_id, branch.schema_id) + + # Check it returns None when no branch + setting.branch_id = None + self.assertIsNone(setting.branch_schema_id) + + # Clean up + branch.delete() diff --git a/netbox_diode_plugin/tests/test_views.py b/netbox_diode_plugin/tests/test_views.py index 80620f6..c158d73 100644 --- a/netbox_diode_plugin/tests/test_views.py +++ b/netbox_diode_plugin/tests/test_views.py @@ -3,6 +3,7 @@ """Diode NetBox Plugin - Tests.""" from unittest import mock +from django.contrib import messages from django.contrib.auth import get_user_model from django.contrib.auth.models import AnonymousUser from django.contrib.messages.middleware import MessageMiddleware @@ -158,8 +159,8 @@ def test_settings_update_post_redirects_to_login_page_for_unauthenticated_user( self.assertEqual(response.status_code, status.HTTP_302_FOUND) self.assertEqual(response.url, f"/netbox/login/?next={self.path}") - def test_settings_update_disallowed_on_get_method(self): - """Test that the accessing settings edit is not allowed with diode target override.""" + def test_settings_update_allowed_on_get_method_with_override(self): + """Test that accessing settings edit shows info message when diode target is overridden.""" with mock.patch( "netbox_diode_plugin.views.get_plugin_config" ) as mock_get_plugin_config: @@ -173,7 +174,7 @@ def test_settings_update_disallowed_on_get_method(self): "netbox_diode_plugin.change_setting", ) - request = self.request_factory.post(self.path) + request = self.request_factory.get(self.path) request.user = user request.htmx = None @@ -185,25 +186,22 @@ def test_settings_update_disallowed_on_get_method(self): middleware.process_request(request) request.session.save() - setattr(request, "session", "session") - messages = FallbackStorage(request) - request._messages = messages - self.view.setup(request) response = self.view.get(request) - self.assertEqual(response.status_code, status.HTTP_302_FOUND) - self.assertEqual( - response.url, reverse("plugins:netbox_diode_plugin:settings") - ) - self.assertEqual(len(request._messages._queued_messages), 1) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + # Check that the message was added + storage = messages.get_messages(request) + message_list = list(storage) + self.assertEqual(len(message_list), 1) self.assertEqual( - str(request._messages._queued_messages[0]), - "The Diode target is not allowed to be modified.", + str(message_list[0]), + "The Diode target field is disabled because it is overridden in the plugin configuration.", ) - def test_settings_update_disallowed_on_post_method(self): - """Test that the updating settings is not allowed with diode target override.""" + def test_settings_update_allowed_on_post_method_with_override(self): + """Test that updating settings succeeds when diode target is overridden (field is disabled in form).""" with mock.patch( "netbox_diode_plugin.views.get_plugin_config" ) as mock_get_plugin_config: @@ -237,12 +235,8 @@ def test_settings_update_disallowed_on_post_method(self): self.view.setup(request) response = self.view.post(request) + # Should succeed and redirect to settings view self.assertEqual(response.status_code, status.HTTP_302_FOUND) self.assertEqual( response.url, reverse("plugins:netbox_diode_plugin:settings") ) - self.assertEqual(len(request._messages._queued_messages), 1) - self.assertEqual( - str(request._messages._queued_messages[0]), - "The Diode target is not allowed to be modified.", - ) diff --git a/netbox_diode_plugin/tests/v4.2.3/tests/test_views.py b/netbox_diode_plugin/tests/v4.2.3/tests/test_views.py index 80620f6..c158d73 100644 --- a/netbox_diode_plugin/tests/v4.2.3/tests/test_views.py +++ b/netbox_diode_plugin/tests/v4.2.3/tests/test_views.py @@ -3,6 +3,7 @@ """Diode NetBox Plugin - Tests.""" from unittest import mock +from django.contrib import messages from django.contrib.auth import get_user_model from django.contrib.auth.models import AnonymousUser from django.contrib.messages.middleware import MessageMiddleware @@ -158,8 +159,8 @@ def test_settings_update_post_redirects_to_login_page_for_unauthenticated_user( self.assertEqual(response.status_code, status.HTTP_302_FOUND) self.assertEqual(response.url, f"/netbox/login/?next={self.path}") - def test_settings_update_disallowed_on_get_method(self): - """Test that the accessing settings edit is not allowed with diode target override.""" + def test_settings_update_allowed_on_get_method_with_override(self): + """Test that accessing settings edit shows info message when diode target is overridden.""" with mock.patch( "netbox_diode_plugin.views.get_plugin_config" ) as mock_get_plugin_config: @@ -173,7 +174,7 @@ def test_settings_update_disallowed_on_get_method(self): "netbox_diode_plugin.change_setting", ) - request = self.request_factory.post(self.path) + request = self.request_factory.get(self.path) request.user = user request.htmx = None @@ -185,25 +186,22 @@ def test_settings_update_disallowed_on_get_method(self): middleware.process_request(request) request.session.save() - setattr(request, "session", "session") - messages = FallbackStorage(request) - request._messages = messages - self.view.setup(request) response = self.view.get(request) - self.assertEqual(response.status_code, status.HTTP_302_FOUND) - self.assertEqual( - response.url, reverse("plugins:netbox_diode_plugin:settings") - ) - self.assertEqual(len(request._messages._queued_messages), 1) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + # Check that the message was added + storage = messages.get_messages(request) + message_list = list(storage) + self.assertEqual(len(message_list), 1) self.assertEqual( - str(request._messages._queued_messages[0]), - "The Diode target is not allowed to be modified.", + str(message_list[0]), + "The Diode target field is disabled because it is overridden in the plugin configuration.", ) - def test_settings_update_disallowed_on_post_method(self): - """Test that the updating settings is not allowed with diode target override.""" + def test_settings_update_allowed_on_post_method_with_override(self): + """Test that updating settings succeeds when diode target is overridden (field is disabled in form).""" with mock.patch( "netbox_diode_plugin.views.get_plugin_config" ) as mock_get_plugin_config: @@ -237,12 +235,8 @@ def test_settings_update_disallowed_on_post_method(self): self.view.setup(request) response = self.view.post(request) + # Should succeed and redirect to settings view self.assertEqual(response.status_code, status.HTTP_302_FOUND) self.assertEqual( response.url, reverse("plugins:netbox_diode_plugin:settings") ) - self.assertEqual(len(request._messages._queued_messages), 1) - self.assertEqual( - str(request._messages._queued_messages[0]), - "The Diode target is not allowed to be modified.", - ) diff --git a/netbox_diode_plugin/views.py b/netbox_diode_plugin/views.py index 79f9cc6..32460ef 100644 --- a/netbox_diode_plugin/views.py +++ b/netbox_diode_plugin/views.py @@ -160,9 +160,15 @@ def get(self, request): diode_target = diode_target_override or settings.diode_target + # Check if branching plugin is available + from django.conf import settings as django_settings + has_branching_plugin = "netbox_branching" in django_settings.PLUGINS + context = { "diode_target": diode_target, "is_diode_target_overridden": diode_target_override is not None, + "branch": settings.branch if has_branching_plugin else None, + "has_branching_plugin": has_branching_plugin, } return render(request, "diode/settings.html", context) @@ -190,11 +196,10 @@ def get(self, request, *args, **kwargs): "netbox_diode_plugin", "diode_target_override" ) if diode_target_override: - messages.error( + messages.info( request, - "The Diode target is not allowed to be modified.", + "The Diode target field is disabled because it is overridden in the plugin configuration.", ) - return redirect("plugins:netbox_diode_plugin:settings") settings = Setting.objects.get() kwargs["pk"] = settings.pk @@ -206,16 +211,6 @@ def post(self, request, *args, **kwargs): if ret := self.check_authentication(request): return ret - diode_target_override = get_plugin_config( - "netbox_diode_plugin", "diode_target_override" - ) - if diode_target_override: - messages.error( - request, - "The Diode target is not allowed to be modified.", - ) - return redirect("plugins:netbox_diode_plugin:settings") - settings = Setting.objects.get() kwargs["pk"] = settings.pk