Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions _kafka/src/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def create_kafka_producer(kafka_broker: str | list):
return producer


def insert_data(producer: KafkaProducer):
def insert_player(producer: KafkaProducer):
players = []
player_generator = kafka_data.create_player()
for player in player_generator:
print(player.name)
Expand All @@ -31,7 +32,14 @@ def insert_data(producer: KafkaProducer):
topic="players.to_scrape",
value=player_to_scrape.model_dump(mode="json"),
)
players.append(player)
return players


def insert_scraped_data(
producer: KafkaProducer, players: list[kafka_data.PlayerStruct]
):
for player in players:
scrape_gen = kafka_data.create_scraped_data(player, n_records=30)
scraped_data = list()
scraped_data = [d.model_copy(deep=True) for d in scrape_gen]
Expand All @@ -45,6 +53,16 @@ def insert_data(producer: KafkaProducer):
print("\t", scrape_data.player_data.updated_at)


def insert_report(producer: KafkaProducer):
report_generator = kafka_data.create_report()
for report in report_generator:
report_to_insert = kafka_data.ReportsToInsertStruct(**report)
producer.send(
topic="reports.to_insert",
value=report_to_insert.model_dump(mode="json"),
)


def main():
random.seed(43)

Expand All @@ -53,7 +71,9 @@ def main():
kafka_topics.create_topics(kafka_broker=kafka_broker)

producer = create_kafka_producer(kafka_broker=kafka_broker)
insert_data(producer=producer)
players = insert_player(producer=producer)
insert_scraped_data(producer=producer, players=players)
insert_report(producer=producer)


if __name__ == "__main__":
Expand Down
85 changes: 80 additions & 5 deletions _kafka/src/kafka_data.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,56 @@
import random
import time
from datetime import date, datetime, timedelta
from typing import Generator, Optional

from pydantic import BaseModel
from pydantic.fields import Field


class MetaData(BaseModel):
version: int
source: str


class Equipment(BaseModel):
equip_head_id: Optional[int] = Field(None, ge=0)
equip_amulet_id: Optional[int] = Field(None, ge=0)
equip_torso_id: Optional[int] = Field(None, ge=0)
equip_legs_id: Optional[int] = Field(None, ge=0)
equip_boots_id: Optional[int] = Field(None, ge=0)
equip_cape_id: Optional[int] = Field(None, ge=0)
equip_hands_id: Optional[int] = Field(None, ge=0)
equip_weapon_id: Optional[int] = Field(None, ge=0)
equip_shield_id: Optional[int] = Field(None, ge=0)


class BaseDetection(BaseModel):
region_id: int = Field(0, ge=0, le=100_000)
x_coord: int = Field(0, ge=0)
y_coord: int = Field(0, ge=0)
z_coord: int = Field(0, ge=0)
ts: int = Field(int(time.time()), ge=0)
manual_detect: int = Field(0, ge=0, le=1)
on_members_world: int = Field(0, ge=0, le=1)
on_pvp_world: int = Field(0, ge=0, le=1)
world_number: int = Field(0, ge=300, le=1_000)
equipment: Equipment
equip_ge_value: int = Field(0, ge=0)


class Detection(BaseDetection):
reporter: str = Field(..., min_length=1, max_length=13)
reported: str = Field(..., min_length=1, max_length=12)


class ParsedDetection(BaseDetection):
reporter_id: int = Field(..., ge=0)
reported_id: int = Field(..., ge=0)


class ReportsToInsertStruct(BaseModel):
metadata: MetaData
report: ParsedDetection


class PlayerStruct(BaseModel):
Expand All @@ -25,11 +73,6 @@ class ScraperHiscoreData(BaseModel):
activities: Optional[dict[str, int]] = None


class MetaData(BaseModel):
version: int
source: str


class ScrapedStruct(BaseModel):
metadata: MetaData
player_data: PlayerStruct
Expand Down Expand Up @@ -144,6 +187,38 @@ def create_scraped_data(
}


def create_report() -> Generator[dict, None, None]:
"""Generates report IDs for demonstration purposes."""
yield {
"metadata": {"version": 1, "source": "api_public"},
"report": {
"region_id": 12598,
"x_coord": 3167,
"y_coord": 3490,
"z_coord": 0,
"ts": 1763909384,
"manual_detect": 0,
"on_members_world": 1,
"on_pvp_world": 0,
"world_number": 490,
"equipment": {
"equip_head_id": None,
"equip_amulet_id": None,
"equip_torso_id": None,
"equip_legs_id": None,
"equip_boots_id": None,
"equip_cape_id": None,
"equip_hands_id": None,
"equip_weapon_id": None,
"equip_shield_id": None,
},
"equip_ge_value": 0,
"reporter_id": 398265,
"reported_id": 233134407,
},
}


# Example Usage
if __name__ == "__main__":
random.seed(43)
Expand Down
13 changes: 12 additions & 1 deletion bases/bot_detector/worker_report/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async def insert_batch(
await report_repo.insert(async_session=session, reports=batch)
await session.commit()
except OperationalError as e:
logger.error(f"OperationalError during batch insert: {str(e)}")
return None, str(e)
logger.info(f"inserted: {len(batch)}")
return None, None
Expand Down Expand Up @@ -105,7 +106,17 @@ async def consume_many_task(
await asyncio.sleep(15)
await report_consumer.commit()
except Exception as e:
logger.error(f"Error consuming reports: {e}")
tb = e.__traceback__
if tb is not None:
logger.error(
{
"result_msg": "Error consuming reports",
"message": str(e),
"filename": tb.tb_frame.f_code.co_filename,
"line": tb.tb_lineno,
"name": tb.tb_frame.f_code.co_name,
}
)
logger.debug(f"Traceback: \n{traceback.format_exc()}")
await asyncio.sleep(5)

Expand Down
6 changes: 5 additions & 1 deletion components/bot_detector/database/report/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from datetime import datetime

import sqlalchemy as sqla
from .interface import ReportInterface
from bot_detector.structs import ParsedDetection
from sqlalchemy import TextClause
from sqlalchemy.ext.asyncio import AsyncSession

from .interface import ReportInterface

logger = logging.getLogger(__name__)


Expand All @@ -28,8 +29,11 @@ def _parse_reports(self, reports: list[ParsedDetection]) -> list[dict]:
report_dict = report.model_dump()
# flatten nested equiment
equipment: dict = report_dict.pop("equipment", {})
## correct for NONE values
equipment = {k: v for k, v in equipment.items() if v is not None}
## correct for item bug
equipment = {k: 0 if v > 32767 else v for k, v in equipment.items()}

report_dict.update(equipment)

# epoch timestamp to datetime value
Expand Down
11 changes: 11 additions & 0 deletions components/bot_detector/kafka/repositories/reports_to_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,18 @@ async def stop(self):
async def get_consumer(self):
return self.consumer

def set_consumer(self, consumer):
"""
Set a custom consumer instance for testing purposes.
"""
self.consumer = consumer

def _validate_value(self, value) -> tuple[dict | None, str | None]:
if isinstance(value, bytes):
try:
value = orjson.loads(value)
except orjson.JSONDecodeError as e:
return None, f"Failed to decode JSON: {str(e)}"
if not isinstance(value, dict):
return None, "Message value is not a dict"
if "metadata" not in value:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dev = [
"ruff>=0.8.4",
"pytest-asyncio>=0.25.0",
"pytest-cov>=6.1.1",
"mockafka-py>=0.2.3",
]


Expand Down
130 changes: 130 additions & 0 deletions test/bases/bot_detector/worker_report/test_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import logging

import pytest
from bot_detector.structs._metadata import MetaData
from bot_detector.structs.kafka.reports_to_insert import ReportsToInsertStruct
from bot_detector.structs.reports import Equipment, ParsedDetection

from bases.bot_detector.worker_report.main import parse_detections

logger = logging.getLogger(__name__)


@pytest.mark.asyncio
async def test_parse_detections_valid_report():
# Arrange
valid_report_v1 = ReportsToInsertStruct(
metadata=MetaData(version=1, source="test_source"),
report=ParsedDetection(
region_id=1,
x_coord=2,
y_coord=3,
z_coord=4,
ts=1234567890,
manual_detect=0,
on_members_world=1,
on_pvp_world=0,
world_number=301,
equipment=Equipment(
equip_head_id=0,
equip_amulet_id=0,
equip_torso_id=0,
equip_legs_id=0,
equip_boots_id=0,
equip_cape_id=0,
equip_hands_id=0,
equip_weapon_id=0,
equip_shield_id=0,
),
equip_ge_value=1000,
reporter_id=42,
reported_id=84,
),
)

# Act
parsed_detections = await parse_detections([valid_report_v1])

# Assert
assert len(parsed_detections) == 1
assert parsed_detections[0] == valid_report_v1.report


@pytest.mark.asyncio
async def test_parse_detections_invalid_and_unsupported_reports():
unsupported_version_report = ReportsToInsertStruct(
metadata=MetaData(version=2, source="test_source"),
report=ParsedDetection(
region_id=1,
x_coord=2,
y_coord=3,
z_coord=4,
ts=1234567890,
manual_detect=0,
on_members_world=1,
on_pvp_world=0,
world_number=301,
equipment=Equipment(
equip_head_id=0,
equip_amulet_id=0,
equip_torso_id=0,
equip_legs_id=0,
equip_boots_id=0,
equip_cape_id=0,
equip_hands_id=0,
equip_weapon_id=0,
equip_shield_id=0,
),
equip_ge_value=1000,
reporter_id=42,
reported_id=84,
),
)

reports = [unsupported_version_report]

parsed_detections = await parse_detections(reports)

assert len(parsed_detections) == 0


@pytest.mark.asyncio
async def test_parse_detections_valid_null_report():
mock_report = ReportsToInsertStruct(
metadata=MetaData(version=1, source="api_public"),
report=ParsedDetection(
region_id=12598,
x_coord=3167,
y_coord=3490,
z_coord=0,
ts=1763909384,
manual_detect=0,
on_members_world=1,
on_pvp_world=0,
world_number=490,
equipment=Equipment(
equip_head_id=None,
equip_amulet_id=None,
equip_torso_id=None,
equip_legs_id=None,
equip_boots_id=None,
equip_cape_id=None,
equip_hands_id=None,
equip_weapon_id=None,
equip_shield_id=None,
),
equip_ge_value=0,
reporter_id=398265,
reported_id=233134407,
),
)

# Act
parsed_detections = await parse_detections([mock_report])

# Assert
assert len(parsed_detections) == 1
parsed_report = parsed_detections[0]
assert parsed_report.equipment.equip_head_id is None
assert parsed_report.equipment.equip_amulet_id is None
logger.info(f"Parsed detections: {parsed_detections}")
Loading