diff --git a/VERSION b/VERSION index ee74734a..80895903 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.1.0 +4.3.0 diff --git a/bot/database/models.py b/bot/database/models.py index 9776d089..63d5e6af 100644 --- a/bot/database/models.py +++ b/bot/database/models.py @@ -67,6 +67,7 @@ class ClipType(Enum): SELECTED = "selected" ADJUSTED = "adjusted" SINGLE = "single" + TIKTAK = "tiktak" @dataclass diff --git a/bot/factory/subscribed_permission_level_factory.py b/bot/factory/subscribed_permission_level_factory.py index 95ce0d8f..d01149b6 100644 --- a/bot/factory/subscribed_permission_level_factory.py +++ b/bot/factory/subscribed_permission_level_factory.py @@ -40,6 +40,7 @@ SendClipHandler, SerialContextHandler, SnapClipHandler, + TikTakHandler, TranscriptionHandler, ) from bot.middlewares import ( @@ -80,6 +81,7 @@ def _create_handler_classes(self) -> List[Type[BotMessageHandler]]: SendClipHandler, SerialContextHandler, SnapClipHandler, + TikTakHandler, TranscriptionHandler, ] diff --git a/bot/handlers/not_sending_videos/semantic_search_handler.py b/bot/handlers/not_sending_videos/semantic_search_handler.py index 633e9fa6..38b98361 100644 --- a/bot/handlers/not_sending_videos/semantic_search_handler.py +++ b/bot/handlers/not_sending_videos/semantic_search_handler.py @@ -23,8 +23,15 @@ get_no_query_provided_message, ) from bot.search.filter_applicator import FilterApplicator -from bot.search.semantic_segments_finder import SemanticSearchMode -from bot.utils.constants import EpisodeMetadataKeys +from bot.search.semantic_segments_finder import ( + SemanticSearchMode, + SemanticSegmentsFinder, +) +from bot.settings import settings +from bot.utils.constants import ( + EpisodeMetadataKeys, + SegmentKeys, +) class SemanticSearchHandler(SemanticBotHandler): @@ -88,12 +95,14 @@ async def _handle_semantic_results( unique = self._deduplicate_semantic_results(results, mode) - if mode != SemanticSearchMode.EPISODE: - await DatabaseManager.insert_last_search( - chat_id=self._message.get_chat_id(), - quote=query, - segments=json.dumps(unique), - ) + if mode == SemanticSearchMode.EPISODE: + unique = await self.__enrich_episode_results_with_clip_times(unique, active_series) + + await DatabaseManager.insert_last_search( + chat_id=self._message.get_chat_id(), + quote=query, + segments=json.dumps(unique), + ) response = self.__format_response(unique, query, mode) @@ -105,6 +114,20 @@ async def _handle_semantic_results( ), ) + async def __enrich_episode_results_with_clip_times( + self, episodes: list, active_series: str, + ) -> list: + episode_ids = [ep.get("episode_id") for ep in episodes if ep.get("episode_id")] + first_times = await SemanticSegmentsFinder.fetch_first_dialogue_times( + episode_ids, active_series, self._logger, + ) + for ep in episodes: + episode_id = ep.get("episode_id", "") + first_time = first_times.get(episode_id, 0.0) + ep[SegmentKeys.START_TIME] = first_time + ep[SegmentKeys.END_TIME] = first_time + settings.SENSODCINEK_PREVIEW_DURATION_SEC + return episodes + @staticmethod def __format_response(unique: list, query: str, mode: SemanticSearchMode) -> str: if mode == SemanticSearchMode.FRAMES: diff --git a/bot/handlers/sending_videos/__init__.py b/bot/handlers/sending_videos/__init__.py index a9970d0c..48c2812a 100644 --- a/bot/handlers/sending_videos/__init__.py +++ b/bot/handlers/sending_videos/__init__.py @@ -10,3 +10,4 @@ from bot.handlers.sending_videos.semantic_clip_handler import SemanticClipHandler from bot.handlers.sending_videos.send_clip_handler import SendClipHandler from bot.handlers.sending_videos.snap_clip_handler import SnapClipHandler +from bot.handlers.sending_videos.tiktak_handler import TikTakHandler diff --git a/bot/handlers/sending_videos/tiktak_handler.py b/bot/handlers/sending_videos/tiktak_handler.py new file mode 100644 index 00000000..9c592c82 --- /dev/null +++ b/bot/handlers/sending_videos/tiktak_handler.py @@ -0,0 +1,119 @@ +import json +import logging +from typing import List + +from bot.database.database_manager import DatabaseManager +from bot.database.models import ( + ClipType, + LastClip, +) +from bot.handlers.bot_message_handler import BotMessageHandler +from bot.responses.sending_videos.tiktak_handler_responses import ( + get_no_last_clip_message, + get_tiktak_compiled_note, + get_tiktak_no_detections_note, + get_tiktak_success_log, +) +from bot.settings import settings +from bot.utils.constants import ( + EpisodeMetadataKeys, + SegmentKeys, +) +from bot.video.tiktak_processor import TikTakProcessor + + +class TikTakHandler(BotMessageHandler): + def get_commands(self) -> List[str]: + return ["tiktak", "tt"] + + async def _do_handle(self) -> None: + msg = self._message + chat_id = msg.get_chat_id() + + last_clip = await DatabaseManager.get_last_clip_by_chat_id(chat_id) + if not last_clip: + return await self._reply_error(get_no_last_clip_message()) + + if last_clip.clip_type == ClipType.COMPILED and last_clip.compiled_clip: + return await self.__handle_compiled(last_clip) + + return await self.__handle_single(last_clip) + + async def __handle_compiled(self, last_clip: LastClip) -> None: + output = await TikTakProcessor.process_compiled( + last_clip.compiled_clip, + self._logger, + ) + await self._responder.send_markdown(get_tiktak_compiled_note()) + await self._responder.send_video(output, duration=None) + await DatabaseManager.insert_last_clip( + chat_id=self._message.get_chat_id(), + segment=json.loads(last_clip.segment) if isinstance(last_clip.segment, str) else last_clip.segment, + compiled_clip=None, + clip_type=ClipType.TIKTAK, + adjusted_start_time=last_clip.adjusted_start_time, + adjusted_end_time=last_clip.adjusted_end_time, + is_adjusted=last_clip.is_adjusted, + ) + return await self._log_system_message( + logging.INFO, + get_tiktak_success_log(self._message.get_username()), + ) + + async def __handle_single(self, last_clip: LastClip) -> None: + segment = json.loads(last_clip.segment) if isinstance(last_clip.segment, str) else last_clip.segment + video_path = segment.get(SegmentKeys.VIDEO_PATH) + start_time = last_clip.adjusted_start_time or float(segment.get(SegmentKeys.START_TIME, 0)) + end_time = last_clip.adjusted_end_time or float(segment.get(SegmentKeys.END_TIME, 0)) + + episode_metadata = segment.get(EpisodeMetadataKeys.EPISODE_METADATA, {}) + season = episode_metadata.get(EpisodeMetadataKeys.SEASON) + episode_number = episode_metadata.get(EpisodeMetadataKeys.EPISODE_NUMBER) + series_name = episode_metadata.get(EpisodeMetadataKeys.SERIES_NAME, "") + + had_detections = self.__has_detections( + series_name, season, episode_number, settings.TIKTAK_DETECTION_DIR, + ) + + output = await TikTakProcessor.process_single( + video_path=video_path, + start_time=start_time, + end_time=end_time, + season=season or 0, + episode_number=episode_number or 0, + series_name=series_name, + detection_dir=settings.TIKTAK_DETECTION_DIR, + logger=self._logger, + ) + + if not had_detections: + await self._responder.send_markdown(get_tiktak_no_detections_note()) + + duration = end_time - start_time + await self._responder.send_video(output, duration=duration) + + await DatabaseManager.insert_last_clip( + chat_id=self._message.get_chat_id(), + segment=segment, + compiled_clip=None, + clip_type=ClipType.TIKTAK, + adjusted_start_time=start_time, + adjusted_end_time=end_time, + is_adjusted=last_clip.is_adjusted, + ) + return await self._log_system_message( + logging.INFO, + get_tiktak_success_log(self._message.get_username()), + ) + + @staticmethod + def __has_detections( + series_name: str, + season: int, + episode_number: int, + detection_dir: str, + ) -> bool: + if season is None or episode_number is None: + return False + det_path = TikTakProcessor._detection_file_path(series_name, season, episode_number, detection_dir) + return det_path is not None and det_path.exists() diff --git a/bot/responses/not_sending_videos/semantic_search_handler_responses.py b/bot/responses/not_sending_videos/semantic_search_handler_responses.py index 9b82183b..e03f5987 100644 --- a/bot/responses/not_sending_videos/semantic_search_handler_responses.py +++ b/bot/responses/not_sending_videos/semantic_search_handler_responses.py @@ -5,7 +5,10 @@ ) from bot.responses.bot_response import BotResponse -from bot.utils.constants import EpisodeMetadataKeys +from bot.utils.constants import ( + EpisodeMetadataKeys, + SegmentKeys, +) from bot.utils.functions import ( convert_number_to_emoji, format_segment, @@ -83,8 +86,16 @@ def format_semantic_episodes_response( else: ep_fmt = f"S{str(season).zfill(2)}E{str(episode_num).zfill(2)}" + start_time = ep.get(SegmentKeys.START_TIME) + if start_time is not None: + minutes = int(start_time) // 60 + seconds = int(start_time) % 60 + clip_info = f" | klip od {minutes}:{seconds:02d}" + else: + clip_info = "" + line = ( - f"{convert_number_to_emoji(i)} | 📺 {ep_fmt}\n" + f"{convert_number_to_emoji(i)} | 📺 {ep_fmt}{clip_info}\n" f" 👉 {title}" ) episode_lines.append(line) diff --git a/bot/responses/sending_videos/tiktak_handler_responses.py b/bot/responses/sending_videos/tiktak_handler_responses.py new file mode 100644 index 00000000..c6c5d726 --- /dev/null +++ b/bot/responses/sending_videos/tiktak_handler_responses.py @@ -0,0 +1,23 @@ +from bot.responses.bot_response import BotResponse + + +def get_no_last_clip_message() -> str: + return BotResponse.error("BRAK KLIPU", "Najpierw wyszukaj klip, a potem użyj /tiktak") + + +def get_tiktak_success_log(username: str) -> str: + return f"TikTak clip generated for user '{username}'" + + +def get_tiktak_compiled_note() -> str: + return BotResponse.warning( + "KOMPILACJA", + "Dla kompilacji zastosowano statyczne kadrowanie centralne (brak danych detekcji).", + ) + + +def get_tiktak_no_detections_note() -> str: + return BotResponse.warning( + "BRAK DETEKCJI", + "Nie znaleziono danych o osobach - zastosowano kadrowanie centralne.", + ) diff --git a/bot/search/semantic_segments_finder.py b/bot/search/semantic_segments_finder.py index d1f23830..4a74b3a0 100644 --- a/bot/search/semantic_segments_finder.py +++ b/bot/search/semantic_segments_finder.py @@ -243,6 +243,49 @@ def deduplicate_segments(segments: List[Dict[str, Any]]) -> List[Dict[str, Any]] unique.append(seg) return unique + @staticmethod + async def fetch_first_dialogue_times( + episode_ids: List[str], + series_name: str, + logger: logging.Logger, + ) -> Dict[str, float]: + if not episode_ids: + return {} + es = await ElasticSearchManager.connect_to_elasticsearch(logger) + index = f"{series_name}{ElasticsearchIndexSuffixes.TEXT_SEGMENTS}" + query = { + "query": {"terms": {"episode_id": episode_ids}}, + "size": 0, + "aggs": { + "per_episode": { + "terms": {"field": "episode_id", "size": len(episode_ids)}, + "aggs": { + "first_segment": { + "top_hits": { + "sort": [{"start_time": {"order": "asc"}}], + "size": 1, + "_source": ["start_time"], + }, + }, + }, + }, + }, + } + try: + response = await es.search(index=index, body=query) + except Exception: # pylint: disable=broad-except + await log_system_message(logging.WARNING, "Failed to fetch first dialogue times.", logger) + return {} + + result: Dict[str, float] = {} + buckets = response.get("aggregations", {}).get("per_episode", {}).get("buckets", []) + for bucket in buckets: + episode_id = bucket.get("key") + hits = bucket.get("first_segment", {}).get("hits", {}).get("hits", []) + if episode_id and hits: + result[episode_id] = hits[0]["_source"].get("start_time", 0.0) + return result + @staticmethod def deduplicate_episodes(episodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]: seen = set() diff --git a/bot/settings.py b/bot/settings.py index b678c217..cb46ec09 100644 --- a/bot/settings.py +++ b/bot/settings.py @@ -78,6 +78,9 @@ class Settings(BaseSettings): REST_API_APP_PATH: str = Field("bot.platforms.rest_runner:app") DISABLE_RATE_LIMITING: bool = Field(False) + TIKTAK_DETECTION_DIR: str = Field("") + SENSODCINEK_PREVIEW_DURATION_SEC: int = Field(30) + VLLM_HOST: str = Field("http://localhost:11435") VLLM_EMBEDDINGS_MODEL: str = Field("qwen3vl-embed") VLLM_TIMEOUT_SECONDS: int = Field(30) diff --git a/bot/video/tiktak_processor.py b/bot/video/tiktak_processor.py new file mode 100644 index 00000000..eb80ae59 --- /dev/null +++ b/bot/video/tiktak_processor.py @@ -0,0 +1,325 @@ +import asyncio +import json +import logging +import os +from pathlib import Path +import subprocess +import tempfile +from typing import ( + List, + Optional, + Tuple, +) + +from bot.utils.log import log_system_message +from bot.video.utils import FFMpegException + + +class TikTakProcessor: + _ASPECT_9_16 = 9.0 / 16.0 + _MAX_PAN_SPEED = 120.0 + _PERSON_CLASS = "person" + _CODEC = "libx264" + _PRESET = "fast" + _CRF = "23" + _PROFILE = "high" + _LEVEL = "4.1" + _PIX_FMT = "yuv420p" + + @staticmethod + async def process_single( + video_path: str, + start_time: float, + end_time: float, + season: int, + episode_number: int, + series_name: str, + detection_dir: str, + logger: logging.Logger, + ) -> Path: + width, height, fps = TikTakProcessor._probe_video(video_path) + crop_w = TikTakProcessor._even(int(round(height * TikTakProcessor._ASPECT_9_16))) + max_crop_x = width - crop_w + center_x = max_crop_x // 2 + duration = end_time - start_time + + raw_points = TikTakProcessor._load_person_bboxes( + series_name, season, episode_number, + start_time, end_time, fps, width, crop_w, detection_dir, + ) + keypoints = TikTakProcessor._build_trajectory( + raw_points, duration, float(center_x), + ) + x_expr = TikTakProcessor._piecewise_linear_expr(keypoints, max_crop_x) + + await log_system_message( + logging.INFO, + f"TikTak: {len(raw_points)} detection keypoints, crop {crop_w}x{height} from {width}x{height}", + logger, + ) + return await TikTakProcessor._run_ffmpeg( + video_path, start_time, duration, crop_w, height, x_expr, + ) + + @staticmethod + async def process_compiled( + video_data: bytes, + logger: logging.Logger, + ) -> Path: + fd, tmp_input = tempfile.mkstemp(suffix=".mp4") + try: + os.close(fd) + Path(tmp_input).write_bytes(video_data) + width, height, _ = TikTakProcessor._probe_video(tmp_input) + crop_w = TikTakProcessor._even(int(round(height * TikTakProcessor._ASPECT_9_16))) + center_x = (width - crop_w) // 2 + duration = TikTakProcessor._probe_duration(tmp_input) + await log_system_message( + logging.INFO, + f"TikTak compiled: center crop {crop_w}x{height} from {width}x{height}", + logger, + ) + return await TikTakProcessor._run_ffmpeg( + tmp_input, 0.0, duration, crop_w, height, str(center_x), + ) + finally: + if os.path.exists(tmp_input): + os.remove(tmp_input) + + @staticmethod + def _probe_video(video_path: str) -> Tuple[int, int, float]: + result = subprocess.run( + [ + "ffprobe", "-v", "error", + "-select_streams", "v:0", + "-show_entries", "stream=width,height,r_frame_rate", + "-of", "json", str(video_path), + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True, + ) + data = json.loads(result.stdout) + stream = data["streams"][0] + width = int(stream["width"]) + height = int(stream["height"]) + num, den = stream["r_frame_rate"].split("/") + fps = float(num) / float(den) + return width, height, fps + + @staticmethod + def _probe_duration(video_path: str) -> float: + result = subprocess.run( + [ + "ffprobe", "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + str(video_path), + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True, + ) + return float(result.stdout.strip()) + + @staticmethod + def _even(value: int) -> int: + return value - (value % 2) + + @staticmethod + def _detection_file_path( + series_name: str, + season: int, + episode_number: int, + detection_dir: str, + ) -> Optional[Path]: + if not detection_dir: + return None + ep_code = f"s{season:02d}e{episode_number:02d}" + filename = f"{series_name.lower()}_{ep_code}_object_detections.json" + return ( + Path(detection_dir) + / f"S{season:02d}" + / f"E{episode_number:02d}" + / filename + ) + + @staticmethod + def _extract_frame_point( + frame_data: dict, + fps: float, + start_time: float, + end_time: float, + video_width: int, + crop_width: int, + ) -> Optional[Tuple[float, float]]: + frame_num = TikTakProcessor._parse_frame_number(frame_data.get("frame_name", "")) + if frame_num is None: + return None + timestamp = frame_num / fps + if not start_time - 0.5 <= timestamp <= end_time + 0.5: + return None + person_bboxes = [ + d["bbox"] + for d in frame_data.get("detections", []) + if d.get("class_name") == TikTakProcessor._PERSON_CLASS + ] + if not person_bboxes: + return None + return timestamp - start_time, TikTakProcessor._optimal_crop_x(person_bboxes, video_width, crop_width) + + @staticmethod + def _load_person_bboxes( + series_name: str, + season: int, + episode_number: int, + start_time: float, + end_time: float, + fps: float, + video_width: int, + crop_width: int, + detection_dir: str, + ) -> List[Tuple[float, float]]: + det_path = TikTakProcessor._detection_file_path(series_name, season, episode_number, detection_dir) + if not det_path or not det_path.exists(): + return [] + with open(det_path, encoding="utf-8") as f: + data = json.load(f) + points = [ + pt + for frame_data in data.get("detections", []) + if ( + pt := TikTakProcessor._extract_frame_point( + frame_data, fps, start_time, end_time, video_width, crop_width, + ) + ) is not None + ] + return sorted(points, key=lambda p: p[0]) + + @staticmethod + def _parse_frame_number(frame_name: str) -> Optional[int]: + try: + stem = Path(frame_name).stem + return int(stem.rsplit("_frame_", 1)[1]) + except (IndexError, ValueError): + return None + + @staticmethod + def _optimal_crop_x(bboxes: List[dict], video_width: int, crop_width: int) -> float: + max_crop_x = float(video_width - crop_width) + if max_crop_x <= 0: + return 0.0 + + candidates = {0.0, max_crop_x} + for bbox in bboxes: + candidates.add(max(0.0, min(max_crop_x, bbox["x1"]))) + candidates.add(max(0.0, min(max_crop_x, bbox["x2"] - crop_width))) + + best_x = max_crop_x / 2.0 + best_score = -1.0 + for cx in candidates: + score = sum( + max(0.0, min(b["x2"], cx + crop_width) - max(b["x1"], cx)) + * (b["y2"] - b["y1"]) + for b in bboxes + ) + if score > best_score: + best_score = score + best_x = cx + + return best_x + + @staticmethod + def _build_trajectory( + raw_points: List[Tuple[float, float]], + clip_duration: float, + center_x: float, + ) -> List[Tuple[float, float]]: + if not raw_points: + return [(0.0, center_x), (clip_duration, center_x)] + + points = list(raw_points) + if points[0][0] > 0.0: + points.insert(0, (0.0, points[0][1])) + if points[-1][0] < clip_duration: + points.append((clip_duration, points[-1][1])) + + return TikTakProcessor._limit_pan_speed(points) + + @staticmethod + def _limit_pan_speed(points: List[Tuple[float, float]]) -> List[Tuple[float, float]]: + if len(points) < 2: + return points + result = [points[0]] + for i in range(1, len(points)): + prev_t, prev_x = result[-1] + curr_t, curr_x = points[i] + dt = curr_t - prev_t + if dt <= 0: + result.append((curr_t, prev_x)) + continue + max_dx = TikTakProcessor._MAX_PAN_SPEED * dt + dx = curr_x - prev_x + if abs(dx) > max_dx: + curr_x = prev_x + max_dx * (1.0 if dx > 0 else -1.0) + result.append((curr_t, curr_x)) + return result + + @staticmethod + def _piecewise_linear_expr(keypoints: List[Tuple[float, float]], max_crop_x: int) -> str: + if len(keypoints) == 1: + return str(int(round(keypoints[0][1]))) + + expr = str(int(round(keypoints[-1][1]))) + for i in range(len(keypoints) - 2, -1, -1): + t0, x0 = keypoints[i] + t1, x1 = keypoints[i + 1] + dt = t1 - t0 + if dt <= 0: + continue + lerp = f"({x0:.2f}+({x1:.2f}-{x0:.2f})*(t-{t0:.4f})/{dt:.4f})" + expr = f"if(lt(t,{t1:.4f}),{lerp},{expr})" + return f"max(0,min({max_crop_x},{expr}))" + + @staticmethod + async def _run_ffmpeg( + video_path: str, + start_time: float, + duration: float, + crop_w: int, + height: int, + x_expr: str, + ) -> Path: + fd, tmp_path = tempfile.mkstemp(suffix=".mp4") + os.close(fd) + output = Path(tmp_path) + + filter_str = f"crop={crop_w}:{height}:'{x_expr}':0" + command = [ + "ffmpeg", "-y", + "-ss", str(start_time), + "-i", str(video_path), + "-t", str(duration), + "-vf", filter_str, + "-c:v", TikTakProcessor._CODEC, + "-preset", TikTakProcessor._PRESET, + "-crf", TikTakProcessor._CRF, + "-profile:v", TikTakProcessor._PROFILE, + "-level", TikTakProcessor._LEVEL, + "-pix_fmt", TikTakProcessor._PIX_FMT, + "-c:a", "copy", + "-movflags", "+faststart", + "-loglevel", "error", + str(output), + ] + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + _, stderr = await process.communicate() + if process.returncode != 0: + raise FFMpegException(f"TikTak encoding failed: {stderr.decode()[:300]}") + return output