Skip to content

Commit 8c15328

Browse files
committed
feat: change ingester data insertion policy
Now allows for overwrites of null data Closes #1552
1 parent da32f90 commit 8c15328

File tree

4 files changed

+210
-16
lines changed

4 files changed

+210
-16
lines changed

backend/kernelCI_app/constants/ingester.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,9 @@
4848
except (ValueError, TypeError):
4949
logger.warning("Invalid INGEST_QUEUE_MAXSIZE, using default 5000")
5050
INGEST_QUEUE_MAXSIZE = 5000
51+
52+
53+
PRIO_DB = is_boolean_or_string_true(os.environ.get("PRIO_DB", "True"))
54+
"""Toggles the priority when updating database data.\n
55+
If True, this will prioritize what is already present in the database, never updating fields != null.
56+
If False, it will prioritize the incoming data, allowing it to overwrite existing data."""

backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py

Lines changed: 87 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
INGEST_FLUSH_TIMEOUT_SEC,
1111
INGEST_QUEUE_MAXSIZE,
1212
VERBOSE,
13+
PRIO_DB,
1314
)
1415
import threading
1516
import time
@@ -21,7 +22,7 @@
2122
extract_log_excerpt,
2223
)
2324
import kcidb_io
24-
from django.db import transaction
25+
from django.db import connections, transaction
2526
from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents
2627

2728
from kernelCI_app.management.commands.helpers.process_submissions import (
@@ -111,23 +112,100 @@ def prepare_file_data(
111112
}
112113

113114

114-
def consume_buffer(buffer: list[TableModels], item_type: TableNames) -> None:
115+
def _generate_model_insert_query(
116+
table_name: TableNames, model: TableModels
117+
) -> tuple[list[str], str]:
118+
"""
119+
Dynamically generates the insert query for any model.
120+
This function should only be used inside a transaction context.
121+
122+
The generated query's update policy on conflict depends on the `PRIO_DB` environment variable:
123+
- If `PRIO_DB` is True, the query never updates non-null values in the database (prefers existing values).
124+
- If `PRIO_DB` is False, the query prefers new values, updating database values with new non-null values.
125+
126+
Returns a list of which model properties can be updated and the insert query.
127+
"""
128+
updateable_model_fields: list[str] = []
129+
updateable_db_fields: list[str] = []
130+
query_params_properties: list[tuple[str, str]] = []
131+
132+
for field in model._meta.fields:
133+
if field.generated:
134+
continue
135+
136+
field_name = (
137+
field.name + "_id"
138+
if field.get_internal_type() == "ForeignKey"
139+
else field.name
140+
)
141+
real_name = field.db_column or field_name
142+
operation = "GREATEST" if real_name == "_timestamp" else "COALESCE"
143+
144+
query_params_properties.append((real_name, operation))
145+
updateable_model_fields.append(field_name)
146+
updateable_db_fields.append(real_name)
147+
148+
conflict_clauses = []
149+
for field, op in query_params_properties:
150+
if PRIO_DB:
151+
conflict_clauses.append(
152+
f"""
153+
{field} = {op}({table_name}.{field}, EXCLUDED.{field})"""
154+
)
155+
else:
156+
conflict_clauses.append(
157+
f"""
158+
{field} = {op}(EXCLUDED.{field}, {table_name}.{field})"""
159+
)
160+
161+
query = f"""
162+
INSERT INTO {table_name} (
163+
{', '.join(updateable_db_fields)}
164+
)
165+
VALUES (
166+
{', '.join(['%s'] * len(updateable_db_fields))}
167+
)
168+
ON CONFLICT (id)
169+
DO UPDATE SET {', '.join(conflict_clauses)};
170+
"""
171+
172+
return updateable_model_fields, query
173+
174+
175+
def consume_buffer(buffer: list[TableModels], table_name: TableNames) -> None:
115176
"""
116177
Consume a buffer of items and insert them into the database.
117178
This function is called by the db_worker thread.
118179
"""
119180
if not buffer:
120181
return
121182

122-
model = MODEL_MAP[item_type]
183+
try:
184+
model = MODEL_MAP[table_name]
185+
except KeyError:
186+
out(
187+
"Unknown table '%s' passed to consume_buffer. Valid tables: %s"
188+
% (table_name, str(", ".join(MODEL_MAP.keys())))
189+
)
190+
raise
191+
192+
updateable_model_fields, query = _generate_model_insert_query(table_name, model)
193+
194+
params = []
195+
for obj in buffer:
196+
obj_values = []
197+
for field in updateable_model_fields:
198+
value = getattr(obj, field)
199+
if isinstance(value, (dict, list)):
200+
value = json.dumps(value)
201+
obj_values.append(value)
202+
params.append(tuple(obj_values))
123203

124204
t0 = time.time()
125-
model.objects.bulk_create(
126-
buffer,
127-
batch_size=INGEST_BATCH_SIZE,
128-
ignore_conflicts=True,
129-
)
130-
out("bulk_create %s: n=%d in %.3fs" % (item_type, len(buffer), time.time() - t0))
205+
with connections["default"].cursor() as cursor:
206+
cursor.executemany(query, params)
207+
208+
out("bulk_create %s: n=%d in %.3fs" % (table_name, len(buffer), time.time() - t0))
131209

132210

133211
def flush_buffers(

backend/kernelCI_app/models.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
"""Defines the models used in the main database.
2+
All models should have explicit id column for the ingester to work properly."""
3+
14
from django.db import models
25
from django.contrib.postgres.fields import ArrayField
36
from django.contrib.postgres.indexes import GinIndex

backend/kernelCI_app/tests/unitTests/commands/monitorSubmissions/kcidbng_ingester_test.py

Lines changed: 114 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,86 @@ def test_prepare_file_data_file_error(self, mock_file_open, mock_logger):
206206
mock_file_open.assert_called_once()
207207

208208

209+
class TestGenerateInsertQuery:
210+
mock_field_timestamp = MagicMock()
211+
mock_field_timestamp.name = "field_timestamp"
212+
mock_field_timestamp.db_column = "_timestamp"
213+
mock_field_timestamp.generated = False
214+
mock_field_timestamp.get_internal_type.return_value = "DateTimeField"
215+
216+
mock_field_comment = MagicMock()
217+
mock_field_comment.name = "comment"
218+
mock_field_comment.db_column = None
219+
mock_field_comment.generated = False
220+
mock_field_comment.get_internal_type.return_value = "CharField"
221+
222+
@patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.PRIO_DB", True)
223+
def test_generate_model_insert_query_prio_db_true(self):
224+
"""Test _generate_model_insert_query with PRIO_DB=True (prefers existing DB values)."""
225+
from kernelCI_app.management.commands.helpers.kcidbng_ingester import (
226+
_generate_model_insert_query,
227+
)
228+
229+
mock_model = MagicMock()
230+
231+
mock_field3 = MagicMock()
232+
mock_field3.name = "checkout"
233+
mock_field3.db_column = None
234+
mock_field3.generated = False
235+
mock_field3.get_internal_type.return_value = "ForeignKey"
236+
237+
mock_field4 = MagicMock()
238+
mock_field4.name = "series"
239+
mock_field4.generated = True
240+
241+
mock_model._meta.fields = [
242+
self.mock_field_timestamp,
243+
self.mock_field_comment,
244+
mock_field3,
245+
mock_field4,
246+
]
247+
248+
with patch(
249+
"kernelCI_app.management.commands.helpers.kcidbng_ingester.MODEL_MAP",
250+
{"issues": mock_model},
251+
):
252+
updateable_fields, query = _generate_model_insert_query(
253+
"issues", mock_model
254+
)
255+
256+
assert updateable_fields == ["field_timestamp", "comment", "checkout_id"]
257+
assert "INSERT INTO issues" in query
258+
assert "_timestamp, comment, checkout_id" in query
259+
# PRIO_DB=True means database values come first in COALESCE/GREATEST
260+
assert "GREATEST(issues._timestamp, EXCLUDED._timestamp)" in query
261+
assert "COALESCE(issues.comment, EXCLUDED.comment)" in query
262+
assert "COALESCE(issues.checkout_id, EXCLUDED.checkout_id)" in query
263+
assert "series" not in query
264+
265+
@patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.PRIO_DB", False)
266+
def test_generate_model_insert_query_prio_db_false(self):
267+
"""Test _generate_model_insert_query with PRIO_DB=False (prefers new values)."""
268+
from kernelCI_app.management.commands.helpers.kcidbng_ingester import (
269+
_generate_model_insert_query,
270+
)
271+
272+
mock_model = MagicMock()
273+
mock_model._meta.fields = [self.mock_field_timestamp, self.mock_field_comment]
274+
275+
with patch(
276+
"kernelCI_app.management.commands.helpers.kcidbng_ingester.MODEL_MAP",
277+
{"tests": mock_model},
278+
):
279+
updateable_fields, query = _generate_model_insert_query("tests", mock_model)
280+
281+
assert updateable_fields == ["field_timestamp", "comment"]
282+
assert "INSERT INTO tests" in query
283+
assert "_timestamp, comment" in query
284+
# PRIO_DB=False means new values (EXCLUDED) come first in COALESCE/GREATEST
285+
assert "GREATEST(EXCLUDED._timestamp, tests._timestamp)" in query
286+
assert "COALESCE(EXCLUDED.comment, tests.comment)" in query
287+
288+
209289
class TestConsumeBuffer:
210290
"""Test cases for consume_buffer function."""
211291

@@ -218,24 +298,45 @@ class TestConsumeBuffer:
218298
INGEST_BATCH_SIZE_MOCK,
219299
)
220300
@patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out")
301+
@patch(
302+
"kernelCI_app.management.commands.helpers.kcidbng_ingester._generate_model_insert_query"
303+
)
304+
@patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.connections")
221305
@patch("time.time", side_effect=TIME_MOCK)
222-
def test_consume_buffer_with_items(self, mock_time, mock_out):
306+
def test_consume_buffer_with_items(
307+
self, mock_time, mock_connections, mock_generate_query, mock_out
308+
):
223309
"""Test consume_buffer with items in buffer."""
310+
table_name = "issues"
224311
mock_model = MagicMock()
225312
mock_buffer = [MagicMock(), MagicMock()]
313+
mock_generate_query.return_value = (
314+
["_timestamp", "other_field"],
315+
"""
316+
INSERT INTO issues (
317+
_timestamp, other_field
318+
)
319+
VALUES (
320+
%s, %s
321+
)
322+
ON CONFLICT (id)
323+
DO UPDATE SET
324+
GREATEST(issues._timestamp, EXCLUDED._timestamp),
325+
COALESCE(issues.other_field, EXCLUDED.other_field);""",
326+
)
327+
mock_cursor = MagicMock()
328+
mock_connections["default"].cursor.return_value.__enter__.return_value = (
329+
mock_cursor
330+
)
226331

227332
with patch(
228333
"kernelCI_app.management.commands.helpers.kcidbng_ingester.MODEL_MAP",
229334
{"issues": mock_model},
230335
):
231-
consume_buffer(mock_buffer, "issues")
336+
consume_buffer(mock_buffer, table_name)
232337

233338
assert mock_time.call_count == 2
234-
mock_model.objects.bulk_create.assert_called_once_with(
235-
mock_buffer,
236-
batch_size=INGEST_BATCH_SIZE_MOCK,
237-
ignore_conflicts=True,
238-
)
339+
mock_cursor.executemany.assert_called_once()
239340
mock_out.assert_called_once()
240341

241342
@patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out")
@@ -254,6 +355,12 @@ def test_consume_buffer_empty_buffer(self, mock_time, mock_out):
254355
mock_time.assert_not_called()
255356
mock_out.assert_not_called()
256357

358+
def test_consume_buffer_wrong_table(self):
359+
"""Test consume_buffer with invalid table name raises KeyError."""
360+
with pytest.raises(KeyError):
361+
mock_model = MagicMock()
362+
consume_buffer([mock_model], "another")
363+
257364

258365
class TestFlushBuffers:
259366
"""Test cases for flush_buffers function."""

0 commit comments

Comments
 (0)