Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ base64 = "0.22.1"

[dev.dependencies]
tokio-tungstenite = { version = "0.24.0", features = ["native-tls"] }
tokio-test = "0.4"
tempfile = "3.0"
mockall = "0.12"
18 changes: 13 additions & 5 deletions aura/tests/test_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,39 @@
from PIL import Image
import subprocess
from aura.dataset import PairsGenerator
import time


@pytest.fixture
def random_image():
"""Generate a random image for testing."""
width, height = 100, 100
image = Image.new("RGB", (width, height), "blue")
image = Image.new("RGB", (width, height), "blue")
return image


@pytest.fixture
def ensure_ollama_running():
"""Ensure Ollama server is running, skip test if it's not available."""
try:
subprocess.run(["ollama", "list"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
subprocess.run(
["ollama", "list"],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except FileNotFoundError:
pytest.skip("Ollama CLI is not installed.")
except subprocess.CalledProcessError:
pytest.skip("Ollama is not running or not connected to the app.")



def test_generate_text_description(random_image, ensure_ollama_running):
"""
Test _generate_text_description using Ollama. Skips if Ollama is unavailable.
"""
description = PairsGenerator._generate_text_description(random_image, model="ollama/bakllava")
description = PairsGenerator._generate_text_description(
random_image, model="ollama/bakllava"
)

assert isinstance(description, str)
assert len(description) > 0
Expand Down
35 changes: 22 additions & 13 deletions aura/tests/test_processing_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,49 @@
import pytest
import numpy as np
import cv2
import os
import logging
from pathlib import Path
import tempfile

import cv2
import numpy as np
import pytest
import requests
from io import BytesIO
from aura.camera import ProcessingPipeline, FaceNotFoundException

from aura.camera import FaceNotFoundException, ProcessingPipeline


def download_image(url):
response = requests.get(url)
return cv2.imdecode(
np.frombuffer(response.content, np.uint8),
cv2.IMREAD_COLOR
)
return cv2.imdecode(np.frombuffer(response.content, np.uint8), cv2.IMREAD_COLOR)


@pytest.fixture
def pipeline():
with tempfile.TemporaryDirectory() as tmp_dir:
yield ProcessingPipeline(log_path=tmp_dir, verbose=2)


@pytest.fixture
def image_with_face():
url = "https://www.yourtango.com/sites/default/files/image_blog/habits-of-truly-nice-people.png"
return download_image(url)


@pytest.fixture
def image_without_face():
url = "https://www.bsr.org/images/heroes/bsr-focus-nature-hero.jpg"
return download_image(url)


def test_initialization(pipeline):
assert pipeline.verbose == 2
assert os.path.exists(pipeline.log_path)
assert pipeline.face_cascade is not None


def test_invalid_verbosity():
with pytest.raises(ValueError):
ProcessingPipeline(verbose=3)


def test_face_detection_with_face(pipeline, image_with_face):
bbox = pipeline.get_bounding_box(image_with_face)
assert bbox is not None
Expand All @@ -49,24 +52,29 @@ def test_face_detection_with_face(pipeline, image_with_face):
assert all(isinstance(val, (int, np.int32, np.int64)) for val in [x, y, w, h])
assert w > 0 and h > 0


def test_face_detection_without_face(pipeline, image_without_face):
bbox = pipeline.get_bounding_box(image_without_face)
assert bbox is None


def test_annotation(pipeline, image_with_face):
annotated = pipeline.annotate_face(image_with_face)
assert annotated.shape == image_with_face.shape
# Check if the log file was created
log_files = list(Path(pipeline.current_log_dir).glob("bbox.jpg"))
assert len(log_files) == 1


def test_image_processing_with_face(pipeline, image_with_face):
processed = pipeline.process_image(image_with_face)

assert processed.shape == (3, 224, 224), f"Unexpected shape: {processed.shape}"
assert processed.dtype == np.float32, f"Unexpected dtype: {processed.dtype}"
assert 0 <= processed.min() <= processed.max() <= 1, "Pixel values not normalized to [0, 1]"

assert 0 <= processed.min() <= processed.max() <= 1, (
"Pixel values not normalized to [0, 1]"
)

if pipeline.verbose > 0:
log_files = list(Path(pipeline.current_log_dir).glob("processed.jpg"))
assert len(log_files) == 1, "Processed image log not found"
Expand All @@ -76,6 +84,7 @@ def test_image_processing_without_face(pipeline, image_without_face):
with pytest.raises(FaceNotFoundException):
pipeline.process_image(image_without_face)


def test_grayscale_conversion(pipeline, image_with_face):
gray = pipeline._convert_to_gray(image_with_face)
assert len(gray.shape) == 2
Expand Down
51 changes: 33 additions & 18 deletions aura/tests/test_provider.py
Original file line number Diff line number Diff line change
@@ -1,100 +1,115 @@
import pytest
from unittest.mock import patch

import numpy as np
import cv2
from unittest.mock import patch, MagicMock
import pytest

from aura.dataset import DatasetProvider


@pytest.fixture
def dataset_provider():
with patch('kagglehub.dataset_download') as mock_download:
with patch("kagglehub.dataset_download") as mock_download:
mock_download.return_value = "mock/path"
with patch.object(DatasetProvider, '_collect_files') as mock_collect:
with patch.object(DatasetProvider, "_collect_files") as mock_collect:
mock_data = [(np.zeros((64, 64, 3)), 0, "happy") for _ in range(10)]
mock_collect.return_value = mock_data
provider = DatasetProvider(target_size=(224, 224), split = 0.7)
provider = DatasetProvider(target_size=(224, 224), split=0.7)
return provider



def test_collect_files_with_augmentation(dataset_provider):
mock_files = [("mock/path", [], ["image1.jpg", "image2.jpg"])]
with patch("os.walk", return_value=mock_files):
with patch.object(dataset_provider, '_get_emotion', return_value="happy"):
with patch('cv2.imread', return_value=np.zeros((64, 64, 3))):
with patch.object(dataset_provider, "_get_emotion", return_value="happy"):
with patch("cv2.imread", return_value=np.zeros((64, 64, 3))):
dataset = dataset_provider._collect_files("mock/path", augment=True)
assert len(dataset) == 8
assert len(dataset) == 8
assert all(len(item) == 3 for item in dataset)
first_img, first_label, first_emotion = dataset[0]
assert isinstance(first_img, np.ndarray)
assert isinstance(first_label, int)
assert isinstance(first_emotion, str)


def test_collect_files_without_augmentation(dataset_provider):
mock_files = [("mock/path", [], ["image1.jpg", "image2.jpg"])]
with patch("os.walk", return_value=mock_files):
with patch.object(dataset_provider, '_get_emotion', return_value="happy"):
with patch('cv2.imread', return_value=np.zeros((64, 64, 3))):
with patch.object(dataset_provider, "_get_emotion", return_value="happy"):
with patch("cv2.imread", return_value=np.zeros((64, 64, 3))):
dataset = dataset_provider._collect_files("mock/path", augment=False)
assert len(dataset) == 2
assert len(dataset) == 2
assert all(len(item) == 3 for item in dataset)
first_img, first_label, first_emotion = dataset[0]
assert isinstance(first_img, np.ndarray)
assert isinstance(first_label, int)
assert isinstance(first_emotion, str)


def test_init(dataset_provider):
assert dataset_provider.target_size == (224, 224)
assert len(dataset_provider.emotion_labels) == 8
assert len(dataset_provider.train) == 7
assert len(dataset_provider.test) == 3


def test_sample_valid_index(dataset_provider):
img, label, emotion = dataset_provider.sample(0, source = "test")
img, label, emotion = dataset_provider.sample(0, source="test")
assert isinstance(img, np.ndarray)
assert img.shape == (64, 64, 3)
assert isinstance(label, int)
assert 0 <= label <= 7
assert emotion in dataset_provider.emotion_labels


def test_sample_invalid_index(dataset_provider):
with pytest.raises(ValueError):
dataset_provider.sample(100, source = "train")
dataset_provider.sample(100, source="train")


def test_get_next_image_batch(dataset_provider):
batch_size = 2
batch_generator = dataset_provider.get_next_image_batch(batch_size, source = "train")
batch_generator = dataset_provider.get_next_image_batch(batch_size, source="train")
first_batch = next(batch_generator)

assert len(first_batch) == batch_size
assert isinstance(first_batch[0][0], np.ndarray)
assert isinstance(first_batch[0][1], int)


def test_get_next_image_batch_invalid_size(dataset_provider):
with pytest.raises(ValueError):
next(dataset_provider.get_next_image_batch(100, source = "train"))
next(dataset_provider.get_next_image_batch(100, source="train"))


def test_resize_image(dataset_provider):
test_image = np.zeros((100, 150, 3), dtype=np.uint8)
resized = dataset_provider._resize_image(test_image)
assert resized.shape == (224, 224, 3)


def test_random_rotation(dataset_provider):
test_image = np.zeros((64, 64, 3), dtype=np.uint8)
rotated = dataset_provider._random_rotation(test_image)
assert rotated.shape == test_image.shape


def test_flip(dataset_provider):
test_image = np.zeros((64, 64, 3), dtype=np.uint8)
flipped = dataset_provider._flip(test_image)
assert flipped.shape == test_image.shape


def test_random_brightness(dataset_provider):
test_image = np.zeros((64, 64, 3), dtype=np.uint8)
brightened = dataset_provider._random_brightness(test_image)
assert brightened.shape == test_image.shape


def test_get_emotion_invalid_path(dataset_provider):
with pytest.raises(ValueError):
dataset_provider._get_emotion("invalid_path")


def test_set_black_background(dataset_provider):
test_image = np.ones((64, 64, 3), dtype=np.uint8) * 255
processed = dataset_provider._set_black_background(test_image, threshold=20)
Expand Down
21 changes: 16 additions & 5 deletions aura/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,41 @@
import time
from aura.queue import QueueManager, Priority, UserState, QueueError


@pytest.fixture
def queue_manager():
return QueueManager(5, 100, 3)


def test_queue_initialization(queue_manager):
assert queue_manager.max_session_time == 300
assert queue_manager.max_queue_size == 100
assert queue_manager.max_reconnect_attempts == 3


def test_add_to_queue(queue_manager):
position = queue_manager.add_to_queue("user1", Priority.normal())
assert position == 1


def test_priority_ordering(queue_manager):
queue_manager.add_to_queue("user1", Priority.normal())
queue_manager.add_to_queue("user2", Priority.high())
queue_manager.add_to_queue("user3", Priority.low())

queue_list = list(queue_manager.queue)
assert queue_list[0].id == "user2" # High priority first
assert queue_list[1].id == "user1" # Normal priority second
assert queue_list[2].id == "user3" # Low priority last
assert queue_list[0].id == "user2"
assert queue_list[1].id == "user1"
assert queue_list[2].id == "user3"


def test_duplicate_user(queue_manager):
queue_manager.add_to_queue("user1", None)
with pytest.raises(QueueError) as exc_info:
queue_manager.add_to_queue("user1", None)
assert "already in queue" in str(exc_info.value).lower()


def test_queue_full():
queue = QueueManager(5, 2, 3)
queue.add_to_queue("user1", None)
Expand All @@ -39,31 +45,36 @@ def test_queue_full():
queue.add_to_queue("user3", None)
assert "queue is full" in str(exc_info.value).lower()


def test_state_transitions(queue_manager):
queue_manager.add_to_queue("user1", None)
queue_manager.update_user_state("user1", UserState.CONNECTING)
queue_manager.update_user_state("user1", UserState.CONNECTED)
queue_manager.update_user_state("user1", UserState.DISCONNECTED)


def test_cleanup_timeouts(queue_manager):
queue_manager.add_to_queue("user1", None)
# Force timeout by waiting
time.sleep(31)
time.sleep(31)
timed_out = queue_manager.cleanup_timeouts()
assert len(timed_out) == 1
assert timed_out[0] == "user1"


def test_remove_from_queue(queue_manager):
queue_manager.add_to_queue("user1", None)
assert queue_manager.remove_from_queue("user1") is True
assert queue_manager.remove_from_queue("nonexistent") is False


def test_invalid_state_transition(queue_manager):
queue_manager.add_to_queue("user1", None)
with pytest.raises(QueueError) as exc_info:
queue_manager.update_user_state("user1", UserState.DISCONNECTED)
assert "invalid state transition" in str(exc_info.value).lower()


def test_user_not_found(queue_manager):
with pytest.raises(QueueError) as exc_info:
queue_manager.update_user_state("nonexistent", UserState.CONNECTED)
Expand Down
Loading
Loading