Skip to content

Commit b5b636a

Browse files
authored
ESQL: Add times to topn status (#131555)
Adds times to the TopNOperator status, specifically the nanoseconds spent receiving the values and the nanoseconds spent emitting the values: ``` { "operator" : "TopNOperator[count=0/1000, elementTypes=[BYTES_REF, DOUBLE], encoders=[UTF8TopNEncoder, DefaultSortable], sortOrders=[SortOrder[channel=1, asc=false, nullsFirst=true]]]", "status" : { "receive_nanos" : 192932, <--- Page add time - expect to scale based on input "receive_time" : "192.9micros", <--- Page add time as human readable string "emit_nanos" : 84383, <--- Result construction time - expect to scale based on window size "emit_time" : "84.3micros", <--- Result construction time as human readable string "occupied_rows" : 0, "ram_bytes_used" : 4296, "ram_used" : "4.1kb", "pages_received" : 1, "pages_emitted" : 1, "rows_received" : 1000, "rows_emitted" : 1000 } ```
1 parent c666679 commit b5b636a

File tree

5 files changed

+88
-52
lines changed

5 files changed

+88
-52
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ static TransportVersion def(int id) {
347347
public static final TransportVersion ML_INFERENCE_LLAMA_ADDED = def(9_125_0_00);
348348
public static final TransportVersion SHARD_WRITE_LOAD_IN_CLUSTER_INFO = def(9_126_0_00);
349349
public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS = def(9_127_0_00);
350+
public static final TransportVersion ESQL_TOPN_TIMINGS = def(9_128_0_00);
350351

351352
/*
352353
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,9 @@ public String describe() {
308308

309309
private Iterator<Page> output;
310310

311+
private long receiveNanos;
312+
private long emitNanos;
313+
311314
/**
312315
* Count of pages that have been received by this operator.
313316
*/
@@ -387,6 +390,7 @@ public boolean needsInput() {
387390

388391
@Override
389392
public void addInput(Page page) {
393+
long start = System.nanoTime();
390394
/*
391395
* Since row tracks memory we have to be careful to close any unused rows,
392396
* including any rows that fail while constructing because they allocate
@@ -422,13 +426,16 @@ public void addInput(Page page) {
422426
page.releaseBlocks();
423427
pagesReceived++;
424428
rowsReceived += page.getPositionCount();
429+
receiveNanos += System.nanoTime() - start;
425430
}
426431
}
427432

428433
@Override
429434
public void finish() {
430435
if (output == null) {
436+
long start = System.nanoTime();
431437
output = toPages();
438+
emitNanos += System.nanoTime() - start;
432439
}
433440
}
434441

@@ -588,7 +595,16 @@ public long ramBytesUsed() {
588595

589596
@Override
590597
public Status status() {
591-
return new TopNOperatorStatus(inputQueue.size(), ramBytesUsed(), pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
598+
return new TopNOperatorStatus(
599+
receiveNanos,
600+
emitNanos,
601+
inputQueue.size(),
602+
ramBytesUsed(),
603+
pagesReceived,
604+
pagesEmitted,
605+
rowsReceived,
606+
rowsEmitted
607+
);
592608
}
593609

594610
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatus.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.common.io.stream.StreamOutput;
1515
import org.elasticsearch.common.unit.ByteSizeValue;
1616
import org.elasticsearch.compute.operator.Operator;
17+
import org.elasticsearch.core.TimeValue;
1718
import org.elasticsearch.xcontent.XContentBuilder;
1819

1920
import java.io.IOException;
@@ -25,6 +26,8 @@ public class TopNOperatorStatus implements Operator.Status {
2526
"topn",
2627
TopNOperatorStatus::new
2728
);
29+
private final long receiveNanos;
30+
private final long emitNanos;
2831
private final int occupiedRows;
2932
private final long ramBytesUsed;
3033
private final int pagesReceived;
@@ -33,13 +36,17 @@ public class TopNOperatorStatus implements Operator.Status {
3336
private final long rowsEmitted;
3437

3538
public TopNOperatorStatus(
39+
long receiveNanos,
40+
long emitNanos,
3641
int occupiedRows,
3742
long ramBytesUsed,
3843
int pagesReceived,
3944
int pagesEmitted,
4045
long rowsReceived,
4146
long rowsEmitted
4247
) {
48+
this.receiveNanos = receiveNanos;
49+
this.emitNanos = emitNanos;
4350
this.occupiedRows = occupiedRows;
4451
this.ramBytesUsed = ramBytesUsed;
4552
this.pagesReceived = pagesReceived;
@@ -49,6 +56,13 @@ public TopNOperatorStatus(
4956
}
5057

5158
TopNOperatorStatus(StreamInput in) throws IOException {
59+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOPN_TIMINGS)) {
60+
this.receiveNanos = in.readVLong();
61+
this.emitNanos = in.readVLong();
62+
} else {
63+
this.receiveNanos = 0;
64+
this.emitNanos = 0;
65+
}
5266
this.occupiedRows = in.readVInt();
5367
this.ramBytesUsed = in.readVLong();
5468

@@ -67,6 +81,11 @@ public TopNOperatorStatus(
6781

6882
@Override
6983
public void writeTo(StreamOutput out) throws IOException {
84+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOPN_TIMINGS)) {
85+
out.writeVLong(receiveNanos);
86+
out.writeVLong(emitNanos);
87+
}
88+
7089
out.writeVInt(occupiedRows);
7190
out.writeVLong(ramBytesUsed);
7291

@@ -83,6 +102,14 @@ public String getWriteableName() {
83102
return ENTRY.name;
84103
}
85104

105+
public long receiveNanos() {
106+
return receiveNanos;
107+
}
108+
109+
public long emitNanos() {
110+
return emitNanos;
111+
}
112+
86113
public int occupiedRows() {
87114
return occupiedRows;
88115
}
@@ -110,6 +137,14 @@ public long rowsEmitted() {
110137
@Override
111138
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
112139
builder.startObject();
140+
builder.field("receive_nanos", receiveNanos);
141+
if (builder.humanReadable()) {
142+
builder.field("receive_time", TimeValue.timeValueNanos(receiveNanos).toString());
143+
}
144+
builder.field("emit_nanos", emitNanos);
145+
if (builder.humanReadable()) {
146+
builder.field("emit_time", TimeValue.timeValueNanos(emitNanos).toString());
147+
}
113148
builder.field("occupied_rows", occupiedRows);
114149
builder.field("ram_bytes_used", ramBytesUsed);
115150
builder.field("ram_used", ByteSizeValue.ofBytes(ramBytesUsed));
@@ -126,7 +161,9 @@ public boolean equals(Object o) {
126161
return false;
127162
}
128163
TopNOperatorStatus that = (TopNOperatorStatus) o;
129-
return occupiedRows == that.occupiedRows
164+
return receiveNanos == that.receiveNanos
165+
&& emitNanos == that.emitNanos
166+
&& occupiedRows == that.occupiedRows
130167
&& ramBytesUsed == that.ramBytesUsed
131168
&& pagesReceived == that.pagesReceived
132169
&& pagesEmitted == that.pagesEmitted
@@ -136,7 +173,7 @@ public boolean equals(Object o) {
136173

137174
@Override
138175
public int hashCode() {
139-
return Objects.hash(occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
176+
return Objects.hash(receiveNanos, emitNanos, occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
140177
}
141178

142179
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatusTests.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616

1717
public class TopNOperatorStatusTests extends AbstractWireSerializingTestCase<TopNOperatorStatus> {
1818
public static TopNOperatorStatus simple() {
19-
return new TopNOperatorStatus(10, 2000, 123, 123, 111, 222);
19+
return new TopNOperatorStatus(100, 40, 10, 2000, 123, 123, 111, 222);
2020
}
2121

2222
public static String simpleToJson() {
2323
return """
2424
{
25+
"receive_nanos" : 100,
26+
"receive_time" : "100nanos",
27+
"emit_nanos" : 40,
28+
"emit_time" : "40nanos",
2529
"occupied_rows" : 10,
2630
"ram_bytes_used" : 2000,
2731
"ram_used" : "1.9kb",
@@ -44,6 +48,8 @@ protected Writeable.Reader<TopNOperatorStatus> instanceReader() {
4448
@Override
4549
protected TopNOperatorStatus createTestInstance() {
4650
return new TopNOperatorStatus(
51+
randomNonNegativeLong(),
52+
randomNonNegativeLong(),
4753
randomNonNegativeInt(),
4854
randomNonNegativeLong(),
4955
randomNonNegativeInt(),
@@ -55,34 +61,51 @@ protected TopNOperatorStatus createTestInstance() {
5561

5662
@Override
5763
protected TopNOperatorStatus mutateInstance(TopNOperatorStatus instance) {
64+
long receiveNanos = instance.receiveNanos();
65+
long emitNanos = instance.emitNanos();
5866
int occupiedRows = instance.occupiedRows();
5967
long ramBytesUsed = instance.ramBytesUsed();
6068
int pagesReceived = instance.pagesReceived();
6169
int pagesEmitted = instance.pagesEmitted();
6270
long rowsReceived = instance.rowsReceived();
6371
long rowsEmitted = instance.rowsEmitted();
64-
switch (between(0, 5)) {
72+
switch (between(0, 7)) {
6573
case 0:
66-
occupiedRows = randomValueOtherThan(occupiedRows, ESTestCase::randomNonNegativeInt);
74+
receiveNanos = randomValueOtherThan(receiveNanos, ESTestCase::randomNonNegativeLong);
6775
break;
6876
case 1:
69-
ramBytesUsed = randomValueOtherThan(ramBytesUsed, ESTestCase::randomNonNegativeLong);
77+
emitNanos = randomValueOtherThan(emitNanos, ESTestCase::randomNonNegativeLong);
7078
break;
7179
case 2:
72-
pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt);
80+
occupiedRows = randomValueOtherThan(occupiedRows, ESTestCase::randomNonNegativeInt);
7381
break;
7482
case 3:
75-
pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
83+
ramBytesUsed = randomValueOtherThan(ramBytesUsed, ESTestCase::randomNonNegativeLong);
7684
break;
7785
case 4:
78-
rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
86+
pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt);
7987
break;
8088
case 5:
89+
pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
90+
break;
91+
case 6:
92+
rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
93+
break;
94+
case 7:
8195
rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
8296
break;
8397
default:
8498
throw new IllegalArgumentException();
8599
}
86-
return new TopNOperatorStatus(occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
100+
return new TopNOperatorStatus(
101+
receiveNanos,
102+
emitNanos,
103+
occupiedRows,
104+
ramBytesUsed,
105+
pagesReceived,
106+
pagesEmitted,
107+
rowsReceived,
108+
rowsEmitted
109+
);
87110
}
88111
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/NamedWriteablesTests.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

0 commit comments

Comments
 (0)