Skip to content

Commit 581747d

Browse files
feat: hardware denormalization for hardware listing (#1614)
* feat(hardware): add denormalized hardware status schema and models - Add HardwareStatus model to store aggregated test/build/boot status by platform - Add LatestCheckout model to track latest checkout per origin/tree/repository - Add PendingTest model to queue tests for aggregation processing - Add ProcessedHardwareStatus model to track processed entities - Add SimplifiedStatusChoices enum for aggregated status values - Create database migration for new tables with appropriate indexes * feat(hardware): implement real-time hardware status aggregation - Add aggregation_helpers module with checkout and test aggregation logic - Integrate aggregation into kcidbng_ingester db_worker - Populate LatestCheckout table with checkout tracking - Populate PendingTest table with test records for processing - Run aggregations before buffer flush to ensure data consistency * feat(hardware): add backlog processing and query support - Add process_pending_aggregations management command for backlog processing - Implement batch processing with configurable batch size and loop mode - Add hardware status aggregation from PendingTest queue - Add get_hardware_listing_data_from_status_table query function - Query denormalized hardware_status table for improved performance * test: fix db_worker tests mocking aggregation helper * feat: add cronjob to delete unused hardware status rows every week * feat: add command to backfill data into PendingTests and LatestCheckout
1 parent 1b6daa3 commit 581747d

File tree

15 files changed

+1406
-3
lines changed

15 files changed

+1406
-3
lines changed

backend/kernelCI/settings.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,13 @@ def get_json_env_var(name, default):
179179
"--yes",
180180
],
181181
),
182+
(
183+
"0 0 * * 1",
184+
"django.core.management.call_command",
185+
[
186+
"delete_unused_hardware_status",
187+
],
188+
),
182189
]
183190

184191
# Email settings for SMTP backend
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import time
2+
from datetime import timedelta
3+
from typing import Any, Callable, Sequence
4+
5+
from django.core.management.base import BaseCommand
6+
from django.db import transaction
7+
from django.db.models.query import QuerySet
8+
from django.utils import timezone
9+
10+
from kernelCI_app.management.commands.helpers.aggregation_helpers import (
11+
aggregate_checkouts,
12+
aggregate_tests,
13+
)
14+
from kernelCI_app.models import (
15+
Checkouts,
16+
LatestCheckout,
17+
PendingTest,
18+
Tests,
19+
)
20+
from kernelCI_app.helpers.logger import out
21+
22+
23+
class Command(BaseCommand):
24+
help = """
25+
Backfill hardware aggregations (LatestCheckout and PendingTest)
26+
which can later be collected and processed by the process_pending_aggregations command.
27+
"""
28+
29+
def add_arguments(self, parser: Any) -> None:
30+
parser.add_argument(
31+
"--days",
32+
type=int,
33+
default=30,
34+
help="Number of days to look back (default: 30)",
35+
)
36+
parser.add_argument(
37+
"--truncate",
38+
action="store_true",
39+
help="""Truncate destination tables before backfilling.
40+
This is a destructive operation and should be used with caution.""",
41+
)
42+
parser.add_argument(
43+
"--batch-size",
44+
type=int,
45+
default=2000,
46+
help="Batch size for processing (default: 2000)",
47+
)
48+
49+
def handle(self, *args: Any, **options: Any) -> None:
50+
days = options["days"]
51+
truncate = options["truncate"]
52+
batch_size = options["batch_size"]
53+
54+
cutoff_date = timezone.now() - timedelta(days=days)
55+
out(f"Backfilling hardware aggregations since {cutoff_date}...")
56+
57+
if truncate:
58+
out("Truncating LatestCheckout and PendingTest tables...")
59+
LatestCheckout.objects.all().delete()
60+
PendingTest.objects.all().delete()
61+
out("Truncation complete.")
62+
63+
out("Backfilling LatestCheckout...")
64+
checkouts_qs = Checkouts.objects.filter(start_time__gte=cutoff_date).order_by(
65+
"-start_time"
66+
)
67+
68+
# Process checkouts to insert into latest_checkout table
69+
self.process_queryset(
70+
checkouts_qs, aggregate_checkouts, batch_size, "Checkouts"
71+
)
72+
73+
out("Backfilling PendingTest...")
74+
tests_qs = (
75+
Tests.objects.filter(
76+
start_time__gte=cutoff_date,
77+
environment_misc__platform__isnull=False,
78+
build__checkout__id__in=LatestCheckout.objects.values_list(
79+
"checkout_id", flat=True
80+
),
81+
)
82+
.select_related("build")
83+
.order_by("start_time")
84+
)
85+
86+
# Process tests to insert into pending_tests table
87+
self.process_queryset(tests_qs, aggregate_tests, batch_size, "Tests")
88+
89+
out("Backfill complete.")
90+
91+
def process_queryset(
92+
self,
93+
queryset: QuerySet[Any],
94+
aggregator_func: Callable[[Sequence[Any]], None],
95+
batch_size: int,
96+
label: str,
97+
) -> None:
98+
"""
99+
Process a queryset in batches using the provided aggregator function
100+
101+
Iterates through the queryset in chunks, collects items into batches,
102+
and processes each batch using the aggregator function within a transaction
103+
"""
104+
count = 0
105+
total_processed = 0
106+
batch = []
107+
108+
t0 = time.time()
109+
for item in queryset.iterator(chunk_size=batch_size):
110+
try:
111+
batch.append(item)
112+
count += 1
113+
if count >= batch_size:
114+
with transaction.atomic():
115+
aggregator_func(batch)
116+
total_processed += count
117+
out(
118+
f"Processed {total_processed} {label} (elapsed time: {time.time() - t0:.2f}s)"
119+
)
120+
batch = []
121+
count = 0
122+
except Exception as e:
123+
out(f"Error processing {label}: {e}")
124+
batch = []
125+
count = 0
126+
continue
127+
128+
if batch:
129+
aggregator_func(batch)
130+
total_processed += count
131+
out(
132+
f"Processed {total_processed} {label} (elapsed time: {time.time() - t0:.2f}s)"
133+
)
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
"""
2+
Management command to delete unused entries from hardware_status table.
3+
4+
Removes HardwareStatus entries that have no corresponding checkout_id in the LatestCheckout table.
5+
"""
6+
7+
import logging
8+
from django.core.management.base import BaseCommand
9+
from django.db import transaction
10+
from kernelCI_app.models import HardwareStatus, LatestCheckout, ProcessedHardwareStatus
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class Command(BaseCommand):
16+
help = (
17+
"Delete HardwareStatus entries with no corresponding checkout_id "
18+
"in the LatestCheckout table"
19+
)
20+
21+
def add_arguments(self, parser):
22+
parser.add_argument(
23+
"--dry-run",
24+
action="store_true",
25+
help="Show what would be deleted without actually deleting",
26+
)
27+
parser.add_argument(
28+
"--batch-size",
29+
type=int,
30+
default=10000,
31+
help="Number of records to delete per batch (default: 10000)",
32+
)
33+
34+
def handle(self, *args, **options):
35+
dry_run = options["dry_run"]
36+
batch_size = options["batch_size"]
37+
38+
with transaction.atomic():
39+
valid_checkout_ids = set(
40+
LatestCheckout.objects.values_list("checkout_id", flat=True)
41+
)
42+
43+
orphaned_hardware_entries = HardwareStatus.objects.exclude(
44+
checkout_id__in=valid_checkout_ids
45+
).values_list("checkout_id", flat=True)
46+
orphaned_hardware_count = orphaned_hardware_entries.count()
47+
48+
orphaned_processed_hardware_entries = (
49+
ProcessedHardwareStatus.objects.exclude(
50+
checkout_id__in=valid_checkout_ids
51+
)
52+
).values_list("hardware_key", flat=True)
53+
54+
orphaned_processed_hardware_count = (
55+
orphaned_processed_hardware_entries.count()
56+
)
57+
58+
if orphaned_hardware_count == 0 and orphaned_processed_hardware_count == 0:
59+
self.stdout.write(
60+
self.style.SUCCESS(
61+
"No orphaned HardwareStatus/ProcessedHardwareStatus entries found."
62+
)
63+
)
64+
return
65+
66+
if dry_run:
67+
self.stdout.write(
68+
self.style.WARNING(
69+
f"[DRY RUN] Would delete {orphaned_hardware_count} HardwareStatus entries and "
70+
f"{orphaned_processed_hardware_count} ProcessedHardwareStatus entries "
71+
"Run without --dry-run to execute deletion."
72+
)
73+
)
74+
return
75+
76+
self.stdout.write(
77+
f"Found {orphaned_hardware_count} HardwareStatus entries "
78+
f"and {orphaned_processed_hardware_count} ProcessedHardwareStatus entries "
79+
"with no corresponding LatestCheckout."
80+
)
81+
82+
total_hardware_deleted = 0
83+
total_processed_hardware_deleted = 0
84+
while True:
85+
hardware_batch_ids = list(orphaned_hardware_entries[:batch_size])
86+
processed_hardware_batch_ids = list(
87+
orphaned_processed_hardware_entries[:batch_size]
88+
)
89+
90+
if not hardware_batch_ids and not processed_hardware_batch_ids:
91+
break
92+
93+
if hardware_batch_ids:
94+
hardware_delete_count = HardwareStatus.objects.filter(
95+
checkout_id__in=hardware_batch_ids
96+
).delete()[0]
97+
self.stdout.write(
98+
f"Deleted hardware_status(n={hardware_delete_count}) entries "
99+
f"(total: {total_hardware_deleted}/{orphaned_hardware_count})"
100+
)
101+
total_hardware_deleted += hardware_delete_count
102+
103+
if processed_hardware_batch_ids:
104+
processed_hardware_delete_count = (
105+
ProcessedHardwareStatus.objects.filter(
106+
hardware_key__in=processed_hardware_batch_ids
107+
).delete()[0]
108+
)
109+
110+
total_processed_hardware_deleted += processed_hardware_delete_count
111+
112+
self.stdout.write(
113+
f"Deleted processed_hardware_status(n={processed_hardware_delete_count}) entries "
114+
f"(total: {total_processed_hardware_deleted}/{orphaned_processed_hardware_count})"
115+
)
116+
117+
self.stdout.write(
118+
self.style.SUCCESS(
119+
f"Successfully deleted hardware_status(n={total_hardware_deleted}) "
120+
f"and processed_hardware_status(n={total_processed_hardware_deleted})."
121+
)
122+
)
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import time
2+
from typing import Sequence
3+
4+
5+
from django.db import connection
6+
from kernelCI_app.helpers.logger import out
7+
from kernelCI_app.models import (
8+
Checkouts,
9+
PendingTest,
10+
StatusChoices,
11+
SimplifiedStatusChoices,
12+
Tests,
13+
)
14+
from kernelCI_app.utils import is_boot
15+
from typing import Optional
16+
17+
18+
def simplify_status(status: Optional[StatusChoices]) -> SimplifiedStatusChoices:
19+
if status == StatusChoices.PASS:
20+
return SimplifiedStatusChoices.PASS
21+
elif status == StatusChoices.FAIL:
22+
return SimplifiedStatusChoices.FAIL
23+
else:
24+
return SimplifiedStatusChoices.INCONCLUSIVE
25+
26+
27+
def convert_test(t: Tests) -> PendingTest:
28+
return PendingTest(
29+
test_id=t.id,
30+
origin=t.origin,
31+
platform=t.environment_misc.get("platform"),
32+
compatible=t.environment_compatible,
33+
build_id=t.build_id,
34+
status=simplify_status(t.status),
35+
is_boot=is_boot(t.path) if t.path else False,
36+
)
37+
38+
39+
def aggregate_checkouts(checkouts_instances: Sequence[Checkouts]) -> None:
40+
"""
41+
Insert checkouts on latest_checkouts table,
42+
maintaining only the latest ones for each
43+
(origin, tree_name, git_repository_url, git_repository_branch) combination
44+
"""
45+
t0 = time.time()
46+
values = [
47+
(
48+
checkout.id,
49+
checkout.origin,
50+
checkout.tree_name,
51+
checkout.git_repository_url,
52+
checkout.git_repository_branch,
53+
checkout.start_time,
54+
)
55+
for checkout in checkouts_instances
56+
]
57+
58+
with connection.cursor() as cursor:
59+
cursor.executemany(
60+
"""
61+
INSERT INTO latest_checkout (
62+
checkout_id, origin, tree_name,
63+
git_repository_url, git_repository_branch, start_time
64+
)
65+
VALUES (%s, %s, %s, %s, %s, %s)
66+
ON CONFLICT (origin, tree_name, git_repository_url, git_repository_branch)
67+
DO UPDATE SET
68+
start_time = EXCLUDED.start_time,
69+
checkout_id = EXCLUDED.checkout_id
70+
WHERE latest_checkout.start_time < EXCLUDED.start_time
71+
""",
72+
values,
73+
)
74+
out(f"inserted {len(checkouts_instances)} checkouts in {time.time() - t0:.3f}s")
75+
76+
77+
def aggregate_tests(
78+
tests_instances: Sequence[Tests],
79+
) -> None:
80+
"""Insert tests data on pending_tests table to be processed later"""
81+
t0 = time.time()
82+
pending_tests = (
83+
convert_test(test)
84+
for test in tests_instances
85+
if test.environment_misc and test.environment_misc.get("platform") is not None
86+
)
87+
88+
if pending_tests:
89+
pending_tests_inserted = PendingTest.objects.bulk_create(
90+
pending_tests,
91+
ignore_conflicts=True,
92+
)
93+
out(
94+
f"bulk_create pending_tests: n={len(pending_tests_inserted)} in {time.time() - t0:.3f}s"
95+
)
96+
97+
98+
def aggregate_checkouts_and_tests(
99+
checkouts_instances: Sequence[Checkouts],
100+
tests_instances: Sequence[Tests],
101+
) -> None:
102+
aggregate_checkouts(checkouts_instances)
103+
aggregate_tests(tests_instances)

0 commit comments

Comments
 (0)