-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserial_thread.py
More file actions
204 lines (164 loc) · 7.35 KB
/
serial_thread.py
File metadata and controls
204 lines (164 loc) · 7.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""
Serial Thread Module
Background thread for serial I/O and position polling
This thread handles:
- Non-blocking reads from serial port
- Processing command queue (actual writes)
- Periodic position requests (M114)
- Periodic endstop status requests (M119)
- Detection of blocking commands (G28, G29, M999)
"""
import time
import logging
from typing import Optional
from PyQt5.QtCore import QThread, pyqtSignal
import serial
import config
logger = logging.getLogger(__name__)
class SerialThread(QThread):
"""
Background thread for serial communication.
Emits serialSignal with received data strings.
Special signal value "SERIAL-DISCONNECTED" indicates connection lost.
"""
serialSignal = pyqtSignal(str)
# Blocking commands that pause status polling
BLOCKING_COMMANDS = ['G28', 'G29', 'M999']
def __init__(self, serial_manager, parent=None):
"""
Initialize serial thread.
Args:
serial_manager: SerialManager instance for I/O
parent: Optional Qt parent object
"""
super().__init__(parent)
self.serial_manager = serial_manager
self.running = True
self.elapsed_time = time.time()
self.endstop_check_time = time.time()
self.status_polling_paused = False
self.blocking_command_start_time = 0.0
def stop(self) -> None:
"""Gracefully stop the thread."""
self.running = False
def run(self) -> None:
"""Main thread loop with non-blocking reads and command queue processing."""
while self.running:
if not self.serial_manager.isOpen():
time.sleep(0.1)
continue
try:
# Check if connection is still alive
try:
bytes_available = self.serial_manager.inWaiting()
except (OSError, serial.SerialException):
self.serialSignal.emit("SERIAL-DISCONNECTED")
logger.warning("Lost Serial connection!")
break
current_time = time.time()
# Process command queue
self._process_command_queue(current_time)
# Check for timeout on paused polling
self._check_polling_timeout(current_time)
# Send periodic status requests
self._send_status_requests(current_time)
# Read available data
self._read_serial_data(bytes_available, current_time)
# Small sleep to prevent busy-waiting
time.sleep(config.SERIAL_THREAD_SLEEP)
except Exception as e:
logger.exception("Unexpected error in serial thread")
time.sleep(0.1)
logger.info("Serial thread stopped")
def _process_command_queue(self, current_time: float) -> None:
"""Process next command from queue if available."""
command = self.serial_manager.get_next_command()
if command:
success = self.serial_manager._write_internal(command)
if not success:
self.serialSignal.emit("SERIAL-DISCONNECTED")
self.running = False
return
# Check if this is a blocking command
command_str = command.decode('UTF-8', errors='replace').strip().upper()
for block_cmd in self.BLOCKING_COMMANDS:
if command_str.startswith(block_cmd):
self.status_polling_paused = True
self.blocking_command_start_time = current_time
logger.info(f"Pausing status polling for blocking command: {command_str}")
break
def _check_polling_timeout(self, current_time: float) -> None:
"""Check for timeout on paused polling and force resume if needed."""
if self.status_polling_paused:
time_paused = current_time - self.blocking_command_start_time
if time_paused >= config.BLOCKING_COMMAND_MAX_PAUSE:
self.status_polling_paused = False
logger.warning(
f"Forcing resume of status polling after {time_paused:.1f}s timeout "
f"(max: {config.BLOCKING_COMMAND_MAX_PAUSE}s)"
)
# Request immediate position update
self._request_immediate_status()
def _send_status_requests(self, current_time: float) -> None:
"""Send periodic M114 and M119 requests if not paused."""
if self.status_polling_paused:
return
# Position request (M114)
if current_time - self.elapsed_time > config.SERIAL_STATUS_REQUEST_INTERVAL:
self.elapsed_time = current_time
try:
self.serial_manager.write(b"M114\n", priority=True)
except Exception as e:
logger.error(f"Error queuing status request: {e}")
# Endstop request (M119)
if current_time - self.endstop_check_time > config.SERIAL_ENDSTOP_REQUEST_INTERVAL:
self.endstop_check_time = current_time
try:
self.serial_manager.write(b"M119\n", priority=True)
except Exception as e:
logger.error(f"Error queuing endstop request: {e}")
def _read_serial_data(self, bytes_available: int, current_time: float) -> None:
"""Read and emit available serial data."""
if bytes_available == 0:
return
try:
# Batch processing: estimate lines available
estimated_lines = max(1, min(10, bytes_available // 30))
for _ in range(estimated_lines):
if self.serial_manager.inWaiting() == 0:
break
data_bytes = self.serial_manager.readline()
if data_bytes:
data_str = data_bytes.decode('UTF-8', errors='replace').strip()
if data_str:
self.serialSignal.emit(data_str)
self._check_blocking_command_complete(data_str, current_time)
except (OSError, serial.SerialException) as e:
logger.error(f"Error reading from serial: {e}")
self.serialSignal.emit("SERIAL-DISCONNECTED")
self.running = False
def _check_blocking_command_complete(self, data: str, current_time: float) -> None:
"""Check if blocking command has completed and resume polling."""
if not self.status_polling_paused:
return
if "ok" not in data.lower():
return
time_elapsed = current_time - self.blocking_command_start_time
if time_elapsed >= config.BLOCKING_COMMAND_MIN_PAUSE:
self.status_polling_paused = False
logger.info(
f"Resuming status polling after blocking command completed "
f"({time_elapsed:.1f}s elapsed)"
)
self._request_immediate_status()
else:
logger.debug(
f"Received 'ok' but only {time_elapsed:.1f}s elapsed "
f"(need {config.BLOCKING_COMMAND_MIN_PAUSE}s), waiting..."
)
def _request_immediate_status(self) -> None:
"""Request immediate position and endstop status."""
self.serial_manager.write(b"M114\n", priority=True)
self.serial_manager.write(b"M119\n", priority=True)
# Backwards compatibility alias
SerialThreadClass = SerialThread