Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
The diff you're trying to view is too large. We only load the first 3000 changed files.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

backend/test_sessions/
backend/transcriptions/
backend/env/

backend/rosetta-env/
experiments/
Expand Down
22 changes: 16 additions & 6 deletions backend/eye_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@
previous_position = None
is_tracking = False
my_eyetracker = None
gaze_buffer = deque(maxlen=5)
gaze_buffer = deque(maxlen=5)
threshold_distance = 30


current_gaze_position = (0, 0)

def gaze_data_callback(gaze_data):
global global_gaze_data
global_gaze_data = gaze_data

def subscribe_to_gaze_data(eyetracker):
print("INFO: Subscribing to gaze data for eye tracker with serial number {0}.".format(eyetracker.serial_number))
print("INFO: Subscribing to gaze data for eye tracker...")
eyetracker.subscribe_to(tr.EYETRACKER_GAZE_DATA, gaze_data_callback, as_dictionary=True)

def unsubscribe_from_gaze_data(eyetracker):
print("INFO: Unsubscribing from gaze data for eye tracker with serial number {0}.".format(eyetracker.serial_number))
print("INFO: Unsubscribing from gaze data for eye tracker...")
eyetracker.unsubscribe_from(tr.EYETRACKER_GAZE_DATA, gaze_data_callback)

def smooth_move_to(x, y, previous_x, previous_y, smoothing=0.5):
Expand All @@ -30,14 +33,13 @@ def smooth_move_to(x, y, previous_x, previous_y, smoothing=0.5):
return target_x, target_y

def distance(point1, point2):
"""Helper function to calculate the distance between two points."""
return math.sqrt((point1[0] - point2[0]) ** 2 + (point1[1] - point2[1]) ** 2)

def track_gaze():
global previous_position, is_tracking, my_eyetracker, gaze_buffer
global previous_position, is_tracking, my_eyetracker, gaze_buffer, current_gaze_position

found_eyetrackers = tr.find_all_eyetrackers()
if len(found_eyetrackers) == 0:
if not found_eyetrackers:
print("INFO: No eye trackers found.")
return

Expand All @@ -58,6 +60,9 @@ def track_gaze():
avg_x = sum([g[0] for g in gaze_buffer]) / gaze_buffer.maxlen
avg_y = sum([g[1] for g in gaze_buffer]) / gaze_buffer.maxlen


current_gaze_position = (avg_x, avg_y)

if previous_position is None:
previous_position = (avg_x, avg_y)

Expand All @@ -77,3 +82,8 @@ def start_eye_tracking():
def stop_eye_tracking():
global is_tracking
is_tracking = False


def get_current_gaze_position():

return current_gaze_position
104 changes: 68 additions & 36 deletions backend/interaction_logger.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,95 @@
import queue
import time
import settings
import os

interaction_queue = queue.Queue()

def get_selector_keyword(selector):

if selector and (selector.startswith('/') or selector.startswith('(')):
return 'xpath'

return 'xpath'

def get_test_file():

if not settings.test_file:
return None

test_dir = os.path.dirname(settings.test_file)
os.makedirs(test_dir, exist_ok=True)

return settings.test_file

def log_interaction(interaction):
"""
Logs an interaction to the test file.
@param interaction: The string representing the interaction (click or voice command)
@param test_file: The path to the test file where interactions are logged
"""

text = define_interaction(interaction)
test_file = get_test_file()

if test_file and text:
with open(test_file, "a") as f:
f.write(f"{text}\n")
try:
with open(test_file, "a", encoding="utf-8") as f:
f.write(f"{text}\n")
except Exception as e:
print(f"ERROR: Could not write interaction to {test_file}: {e}")

def define_interaction(interaction):
"""
Makes a string representation of the interaction
@param interaction: The interaction dictionary
@return: The string representation of the interaction
"""


text = None
if interaction["type"] == "click":
if interaction["href"]:
text = f'\tAnd I click on tag with href "{interaction["href"]}"'
elif interaction["id"]:
text = f'\tAnd I click on tag with id "{interaction["id"]}"'
else:
text = f'\tAnd I click on tag with xpath "{interaction["xpath"]}"'
elif interaction["type"] == "input":
text = f'\tAnd I input "{interaction["text"]}"'
elif interaction["type"] == "enter":
text = f'\tAnd I hit enter'
elif interaction["type"] == "back":
interaction_type = interaction.get("type")
selector = interaction.get("xpath")


ignored_types = ["initialState", "zoomChange", "go"]
if interaction_type in ignored_types:
return None


if interaction_type == "click" and selector:
keyword = get_selector_keyword(selector)
text = f'\tAnd I click on element with {keyword} "{selector}"'

elif interaction_type == "input" and selector:
value = interaction.get("value", "")
keyword = get_selector_keyword(selector)
text = f'\tAnd I type "{value}" into field with {keyword} "{selector}"'

elif interaction_type == "keypress" and interaction.get("key") == "Enter" and selector:
keyword = get_selector_keyword(selector)
text = f'\tAnd I press the "Enter" key on element with {keyword} "{selector}"'

elif interaction_type == "back":
text = f'\tAnd I go back'
elif interaction["type"] == "forward":

elif interaction_type == "forward":
text = f'\tAnd I go forward'
elif interaction["type"] == "go":
if interaction["direction"] > 0:
text = f'\tAnd I scroll down'


elif interaction_type == "viewportChange":
viewport_data = interaction.get("viewport")
if viewport_data:
width = viewport_data.get("width")
height = viewport_data.get("height")
if width is not None and height is not None:
text = f'\tAnd I set the viewport to {width}x{height}'
else:
print("WARNING: viewportChange action missing width or height.")
else:
text = f'\tAnd I scroll up'

print("WARNING: viewportChange action missing viewport data.")

elif interaction_type:
print(f"WARNING: Unhandled interaction type: {interaction_type}")

return text


def main():
while True:
try:
interaction = interaction_queue.get()
print(f"INFO: Logging interaction {interaction}")
interaction = interaction_queue.get(timeout=1)
print(f"INFO: Received interaction {interaction.get('type')}")
log_interaction(interaction)
except queue.Empty:
continue
except Exception as e:
print(f"INFO: Error logging interaction: {e}")
time.sleep(1)
print(f"ERROR: Error in logger loop: {e}")
136 changes: 93 additions & 43 deletions backend/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from flask import Flask, jsonify, request
from flask_cors import CORS
from websocket_server import start_websocket_server, message_queue
from websocket_server import start_websocket_server
import threading
import eye_tracking
import voice_control
Expand All @@ -12,91 +12,141 @@
app = Flask(__name__)
CORS(app)

# Threads and global variables

tracking_thread = None
voice_thread = None
logging_thread = None

is_tracking = False
start_time = None

current_mode = None
initial_state_written = False

@app.route('/status', methods=['GET'])
def status():
return jsonify({"status": "Server is running"}), 200


@app.route('/start', methods=['POST'])
def start_tracking():
global tracking_thread, voice_thread, logging_thread, is_tracking, start_time
start_time = time.strftime('%Y-%m-%d_%H-%M-%S')

settings.test_file = os.path.join(settings.TEST_DIRECTORY, f"test_session_{start_time}.feature")
settings.transcription_file = os.path.join(settings.TRANSCRIPTION_DIR, f"transcription_{start_time}.log")
global tracking_thread, voice_thread, logging_thread, is_tracking, start_time, current_mode, initial_state_written
start_time = time.strftime('%Y-%m-%d_%H-%M-%S')

data = request.get_json()
language = request.headers.get('Language', 'en')
page_name = data.get('pageName')
page_url = data.get('pageUrl')
current_mode = data.get('mode', 'eye-voice')

custom_path = data.get('filePath')
filename = f"test_session_{start_time}.feature"

if custom_path:
if os.path.isdir(custom_path):
test_file_path = os.path.join(custom_path, filename)
else:
test_file_path = custom_path
else:
test_file_path = os.path.join(settings.OUTPUT_DIR, filename)

settings.test_file = test_file_path

output_dir = os.path.dirname(settings.test_file)
transcription_filename = f"transcription_{start_time}.txt"
settings.transcription_file = os.path.join(output_dir, transcription_filename)

if tracking_thread is None or not tracking_thread.is_alive():
is_tracking = True
initial_state_written = False

os.makedirs(os.path.dirname(settings.test_file), exist_ok=True)


with open(settings.test_file, "w") as f:
with open(settings.test_file, "w", encoding="utf-8") as f:
f.write(f"Feature: Replay of session on {time.strftime('%b %d at %I:%M:%S %p')}\n\n")
f.write("@user1 @web\n")
f.write(f'Scenario: User interacts with the web page named "{page_name}"\n\n')
f.write(f'\tGiven I navigate to page "{page_url}"\n')

tracking_thread = threading.Thread(target=eye_tracking.start_eye_tracking)
tracking_thread.start()

voice_thread = threading.Thread(target=voice_control.main, args=(language,))
voice_thread.start()
if current_mode == 'eye-voice':
tracking_thread = threading.Thread(target=eye_tracking.start_eye_tracking)
tracking_thread.start()
voice_thread = threading.Thread(target=voice_control.main, args=(language,))
voice_thread.start()

logging_thread = threading.Thread(target=interaction_logger.main, daemon=True)
logging_thread.start()

return jsonify({"status": f"Eye tracking and voice control started in {language}"}), 200
return jsonify({"status": f"Session started in {current_mode} mode"}), 200
else:
return jsonify({"status": f"Eye tracking is already running in {language}"}), 400

return jsonify({"status": f"A session is already running"}), 400

@app.route('/stop', methods=['GET'])
def stop_tracking():
global is_tracking

if is_tracking:
is_tracking = False
eye_tracking.stop_eye_tracking()
voice_control.stop_voice_control()

return jsonify({"status": "Eye tracking stopped"}), 200
if current_mode == 'eye-voice':
eye_tracking.stop_eye_tracking()
voice_control.stop_voice_control()
return jsonify({"status": "Session stopped"}), 200
else:
return jsonify({"status": "Eye tracking is not running"}), 400

return jsonify({"status": "A session is not running"}), 400

@app.route('/tag-info', methods=['POST'])
def tag_info():
@app.route('/record-action', methods=['POST'])
def record_action():
global initial_state_written
data = request.get_json()
tag_name = data.get('tagName')
href = data.get('href')
element_id = data.get('id')
class_name = data.get('className')
xpath = data.get('xpath')

if is_tracking:
interaction_logger.interaction_queue.put({
"type": "click",
"selector": tag_name,
"href": href,
"id": element_id,
"xpath": xpath})
if not is_tracking:
return jsonify({"status": "Not recording"}), 400

action_type = data.get('type')


if not initial_state_written and action_type == 'initialState':
try:
viewport = data.get('viewport')
pixel_ratio = data.get('devicePixelRatio')

if viewport and pixel_ratio is not None:
width = viewport['width']
height = viewport['height']

with open(settings.test_file, 'r', encoding='utf-8') as f:
lines = f.readlines()

navigate_index = -1
for i, line in enumerate(lines):
if 'Given I navigate to page' in line:
navigate_index = i
break

if navigate_index != -1:
viewport_step = f"\tGiven I set the viewport to {width}x{height}\n"
zoom_step = f"\tAnd I set zoom ratio to {pixel_ratio}\n"
lines.insert(navigate_index, viewport_step)
lines.insert(navigate_index + 1, zoom_step)

with open(settings.test_file, 'w', encoding='utf-8') as f:
f.writelines(lines)

initial_state_written = True
return jsonify({"status": "Initial state recorded"}), 200
else:
print("WARNING: initialState action missing data. Passing to logger.")

except Exception as e:
print(f"ERROR: Failed to write initial state steps: {e}")



return jsonify({"status": "Tag information received"}), 200


if action_type in ['initialState', 'zoomChange']:
return jsonify({"status": "State change event ignored"}), 200


interaction_logger.interaction_queue.put(data)
return jsonify({"status": "Action recorded"}), 200

if __name__ == '__main__':
start_websocket_server()
app.run(host='0.0.0.0', port=5001) # flask app
app.run(host='0.0.0.0', port=5001)
Binary file added backend/requirements-win.txt
Binary file not shown.
Loading