Skip to content

Commit d0669dd

Browse files
committed
add text processor for data channel
1 parent f6026a8 commit d0669dd

File tree

9 files changed

+382
-32
lines changed

9 files changed

+382
-32
lines changed

.vscode/launch.json

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,26 @@
2323
},
2424
"justMyCode": true,
2525
"python": "${command:python.interpreterPath}"
26+
},
27+
{
28+
"name": "PyTrickle Demo with Text Publishing",
29+
"type": "debugpy",
30+
"request": "launch",
31+
"program": "${workspaceFolder}/examples/process_video_with_text.py",
32+
"console": "integratedTerminal",
33+
"cwd": "${workspaceFolder}/examples",
34+
"env": {
35+
"ORCH_URL": "https://localhost:9995",
36+
"ORCH_SECRET": "orch-secret",
37+
"CAPABILITY_NAME": "trickle-stream-example",
38+
"CAPABILITY_DESCRIPTION": "Flip video upside down using pytrickle",
39+
"CAPABILITY_URL": "http://localhost:8000",
40+
"CAPABILITY_PRICE_PER_UNIT": "0",
41+
"CAPABILITY_PRICE_SCALING": "1",
42+
"CAPABILITY_CAPACITY": "1"
43+
},
44+
"justMyCode": true,
45+
"python": "${command:python.interpreterPath}"
2646
}
2747
]
2848
}

examples/process_video_example.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
intensity = 0.8
1919
delay = 0.0
2020
ready = False
21+
_stream_processor = None # Reference to StreamProcessor for text publishing
2122

2223
def load_model(**kwargs):
2324
"""Initialize processor state - called during model loading phase."""
@@ -37,7 +38,7 @@ def load_model(**kwargs):
3738

3839
async def process_video(frame: VideoFrame) -> VideoFrame:
3940
"""Apply horizontal flip and green hue using OpenCV."""
40-
global intensity, ready, delay
41+
global intensity, ready, delay, _stream_processor
4142

4243
# Simulated processing time
4344
time.sleep(delay)
@@ -122,6 +123,11 @@ async def process_video(frame: VideoFrame) -> VideoFrame:
122123
# Move to same device as original tensor
123124
result_tensor = result_tensor.to(frame.tensor.device)
124125

126+
# Example: Publish processing stats occasionally
127+
if _stream_processor and hasattr(frame, 'timestamp') and frame.timestamp % 1000 == 0:
128+
stats_text = f"Processed video frame with intensity {intensity} at timestamp {frame.timestamp}"
129+
await _stream_processor.publish_data_output(stats_text)
130+
125131
return frame.replace_tensor(result_tensor)
126132

127133
def update_params(params: dict):
@@ -147,4 +153,8 @@ def update_params(params: dict):
147153
name="green-processor",
148154
port=8000
149155
)
156+
157+
# Set reference for text publishing
158+
_stream_processor = processor
159+
150160
processor.run()
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Video/Audio Passthrough with Text Publishing using StreamProcessor
4+
5+
This example demonstrates:
6+
- Video passthrough (no processing)
7+
- Audio passthrough (no processing)
8+
- Text publishing every 400 audio frames using simple text queue
9+
"""
10+
11+
import logging
12+
import json
13+
import time
14+
from pytrickle import StreamProcessor
15+
from pytrickle.frames import VideoFrame, AudioFrame
16+
from typing import List
17+
18+
logging.basicConfig(level=logging.INFO)
19+
logger = logging.getLogger(__name__)
20+
21+
# Global state
22+
audio_frame_count = 0
23+
text_publish_interval = 400
24+
ready = False
25+
start_time = None
26+
_stream_processor = None # Reference to StreamProcessor for text publishing
27+
28+
def load_model(**kwargs):
29+
"""Initialize processor state - called during model loading phase."""
30+
global text_publish_interval, ready, start_time
31+
32+
logger.info(f"load_model called with kwargs: {kwargs}")
33+
34+
# Set processor variables from kwargs or use defaults
35+
text_publish_interval = kwargs.get('text_publish_interval', 400)
36+
text_publish_interval = max(1, int(text_publish_interval))
37+
38+
ready = True
39+
start_time = time.time()
40+
logger.info(f"✅ Video/Audio passthrough with text publishing ready (interval: {text_publish_interval} frames)")
41+
42+
async def process_video(frame: VideoFrame) -> VideoFrame:
43+
"""Pass through video frames unchanged."""
44+
global ready
45+
46+
if not ready:
47+
return frame
48+
49+
# Simply pass through the video frame without any processing
50+
return frame
51+
52+
async def process_audio(frame: AudioFrame) -> List[AudioFrame]:
53+
"""Pass through audio frames and publish text data periodically."""
54+
global audio_frame_count, text_publish_interval, ready, start_time, _stream_processor
55+
56+
if not ready:
57+
return [frame]
58+
59+
# Increment frame counter
60+
audio_frame_count += 1
61+
62+
# Check if we should publish text data
63+
if audio_frame_count % text_publish_interval == 0:
64+
# Calculate elapsed time
65+
elapsed_time = time.time() - start_time
66+
67+
# Create JSONL data with audio processing statistics
68+
text_data = {
69+
"type": "audio_stats",
70+
"timestamp": time.time(),
71+
"elapsed_time_seconds": round(elapsed_time, 2),
72+
"total_audio_frames": audio_frame_count,
73+
"frames_per_second": round(audio_frame_count / elapsed_time, 2) if elapsed_time > 0 else 0,
74+
"frame_shape": list(frame.samples.shape) if hasattr(frame, 'samples') else None,
75+
"sample_rate": getattr(frame, 'sample_rate', None),
76+
"channels": getattr(frame, 'channels', None),
77+
"message": f"Processed {audio_frame_count} audio frames in {elapsed_time:.2f} seconds"
78+
}
79+
80+
# Publish as JSONL - just add text to the queue!
81+
jsonl_line = json.dumps(text_data)
82+
if _stream_processor:
83+
await _stream_processor.publish_data_output(jsonl_line)
84+
85+
logger.info(f"📊 Published stats: {audio_frame_count} frames, {elapsed_time:.2f}s elapsed")
86+
87+
# Pass through the audio frame unchanged
88+
return [frame]
89+
90+
def update_params(params: dict):
91+
"""Update text publishing interval."""
92+
global text_publish_interval
93+
94+
if "text_publish_interval" in params:
95+
old = text_publish_interval
96+
text_publish_interval = max(1, int(params["text_publish_interval"]))
97+
if old != text_publish_interval:
98+
logger.info(f"Text publish interval: {old}{text_publish_interval} frames")
99+
100+
if "reset_counter" in params and params["reset_counter"]:
101+
global audio_frame_count, start_time
102+
audio_frame_count = 0
103+
start_time = time.time()
104+
logger.info("🔄 Reset audio frame counter and timer")
105+
106+
107+
# Create and run StreamProcessor
108+
if __name__ == "__main__":
109+
processor = StreamProcessor(
110+
video_processor=process_video,
111+
audio_processor=process_audio,
112+
model_loader=load_model,
113+
param_updater=update_params,
114+
name="passthrough-with-text",
115+
port=8000
116+
)
117+
118+
# Set reference for text publishing
119+
_stream_processor = processor
120+
121+
logger.info("🚀 Starting passthrough processor with text publishing...")
122+
logger.info(f"📝 Will publish JSONL stats every {text_publish_interval} audio frames")
123+
logger.info("🔧 Update parameters via /api/update_params:")
124+
logger.info(" - text_publish_interval: number of frames between text publications")
125+
logger.info(" - reset_counter: true to reset frame counter and timer")
126+
127+
processor.run()

pytrickle/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from .server import StreamServer
1717
from .protocol import TrickleProtocol
1818
from .frames import (
19-
VideoFrame, AudioFrame, VideoOutput, AudioOutput,
19+
VideoFrame, AudioFrame, VideoOutput, AudioOutput, TextOutput,
2020
FrameBuffer,
2121
)
2222
from .state import StreamState
@@ -45,6 +45,7 @@
4545
"AudioFrame",
4646
"VideoOutput",
4747
"AudioOutput",
48+
"TextOutput",
4849
"TricklePublisher",
4950
"TrickleSubscriber",
5051
"BaseStreamManager",

0 commit comments

Comments
 (0)