Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/dataobj/sections/logs/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (b *tableBuffer) Flush() (*table, error) {

// Each metadata column may have a different number of rows compared to
// other columns. Backfill them with NULLs to match the max rows in the buffer.
// It is safe to use streamID column row count since it is a requried column in logs section.
// It is safe to use streamID column row count since it is a required column in logs section.
metadataBuilder.Backfill(streamID.Desc.RowsCount)

metadata, _ := metadataBuilder.Flush()
Expand Down
7 changes: 4 additions & 3 deletions pkg/engine/compat_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"

"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical/physicalpb"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
Expand Down Expand Up @@ -97,7 +98,7 @@ func prepareSchema(numLabels int, numMeta int, numParsed int) (*arrow.Schema, []
for i := 0; i < numLabels; i++ {
ident := semconv.NewIdentifier(
fmt.Sprintf("label_%d", i),
types.ColumnTypeLabel,
physicalpb.COLUMN_TYPE_LABEL,
types.Loki.String,
)
labelIdents[i] = ident
Expand All @@ -109,7 +110,7 @@ func prepareSchema(numLabels int, numMeta int, numParsed int) (*arrow.Schema, []
for i := 0; i < numMeta; i++ {
ident := semconv.NewIdentifier(
fmt.Sprintf("meta_%d", i),
types.ColumnTypeMetadata,
physicalpb.COLUMN_TYPE_METADATA,
types.Loki.String,
)
metaIdents[i] = ident
Expand All @@ -121,7 +122,7 @@ func prepareSchema(numLabels int, numMeta int, numParsed int) (*arrow.Schema, []
for i := 0; i < numParsed; i++ {
ident := semconv.NewIdentifier(
fmt.Sprintf("parsed_%d", i),
types.ColumnTypeParsed,
physicalpb.COLUMN_TYPE_PARSED,
types.Loki.String,
)
parsedIdents[i] = ident
Expand Down
25 changes: 13 additions & 12 deletions pkg/engine/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/engine/internal/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical/physicalpb"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logproto"
Expand All @@ -33,7 +34,7 @@ func TestStreamsResultBuilder(t *testing.T) {
t.Run("rows without log line, timestamp, or labels are ignored", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeMetadata, types.Loki.String)
colEnv := semconv.NewIdentifier("env", physicalpb.COLUMN_TYPE_METADATA, types.Loki.String)

schema := arrow.NewSchema(
[]arrow.Field{
Expand Down Expand Up @@ -76,9 +77,9 @@ func TestStreamsResultBuilder(t *testing.T) {
t.Run("successful conversion of labels, log line, timestamp, and structured metadata ", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
colNs := semconv.NewIdentifier("namespace", types.ColumnTypeLabel, types.Loki.String)
colTid := semconv.NewIdentifier("traceID", types.ColumnTypeMetadata, types.Loki.String)
colEnv := semconv.NewIdentifier("env", physicalpb.COLUMN_TYPE_LABEL, types.Loki.String)
colNs := semconv.NewIdentifier("namespace", physicalpb.COLUMN_TYPE_LABEL, types.Loki.String)
colTid := semconv.NewIdentifier("traceID", physicalpb.COLUMN_TYPE_METADATA, types.Loki.String)

schema := arrow.NewSchema(
[]arrow.Field{
Expand Down Expand Up @@ -201,7 +202,7 @@ func TestStreamsResultBuilder(t *testing.T) {
t.Run("multiple records with different streams are accumulated correctly", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
colEnv := semconv.NewIdentifier("env", physicalpb.COLUMN_TYPE_LABEL, types.Loki.String)

schema := arrow.NewSchema(
[]arrow.Field{
Expand Down Expand Up @@ -289,7 +290,7 @@ func TestStreamsResultBuilder(t *testing.T) {
t.Run("buffer reuse with varying record sizes", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
colEnv := semconv.NewIdentifier("env", physicalpb.COLUMN_TYPE_LABEL, types.Loki.String)

schema := arrow.NewSchema(
[]arrow.Field{
Expand Down Expand Up @@ -358,7 +359,7 @@ func TestStreamsResultBuilder(t *testing.T) {
t.Run("empty records mixed with valid records", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colMsg := semconv.ColumnIdentMessage
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String)
colEnv := semconv.NewIdentifier("env", physicalpb.COLUMN_TYPE_LABEL, types.Loki.String)

schema := arrow.NewSchema(
[]arrow.Field{
Expand Down Expand Up @@ -432,8 +433,8 @@ func TestVectorResultBuilder(t *testing.T) {
t.Run("successful conversion of vector data", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colVal := semconv.ColumnIdentValue
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
colJob := semconv.NewIdentifier("job", types.ColumnTypeMetadata, types.Loki.String)
colInst := semconv.NewIdentifier("instance", physicalpb.COLUMN_TYPE_METADATA, types.Loki.String)
colJob := semconv.NewIdentifier("job", physicalpb.COLUMN_TYPE_METADATA, types.Loki.String)

schema := arrow.NewSchema(
[]arrow.Field{
Expand Down Expand Up @@ -496,7 +497,7 @@ func TestVectorResultBuilder(t *testing.T) {
t.Run("rows without timestamp or value are ignored", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colVal := semconv.ColumnIdentValue
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
colInst := semconv.NewIdentifier("instance", physicalpb.COLUMN_TYPE_METADATA, types.Loki.String)

schema := arrow.NewSchema(
[]arrow.Field{
Expand Down Expand Up @@ -534,8 +535,8 @@ func TestMatrixResultBuilder(t *testing.T) {
t.Run("successful conversion of matrix data", func(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp
colVal := semconv.ColumnIdentValue
colInst := semconv.NewIdentifier("instance", types.ColumnTypeMetadata, types.Loki.String)
colJob := semconv.NewIdentifier("job", types.ColumnTypeMetadata, types.Loki.String)
colInst := semconv.NewIdentifier("instance", physicalpb.COLUMN_TYPE_METADATA, types.Loki.String)
colJob := semconv.NewIdentifier("job", physicalpb.COLUMN_TYPE_METADATA, types.Loki.String)

schema := arrow.NewSchema(
[]arrow.Field{
Expand Down
3 changes: 2 additions & 1 deletion pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/logical"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical/physicalpb"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel"
Expand Down Expand Up @@ -181,7 +182,7 @@ func (e *QueryEngine) Execute(ctx context.Context, params logql.Params) (logqlmo
return logqlmodel.Result{}, err
}

physicalPlan, err := func() (*physical.Plan, error) {
physicalPlan, err := func() (*physicalpb.Plan, error) {
ctx, span := tracer.Start(ctx, "QueryEngine.Execute.physicalPlan")
defer span.End()

Expand Down
13 changes: 4 additions & 9 deletions pkg/engine/internal/executor/aggregator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package executor

import (
"fmt"
"maps"
"slices"
"strings"
Expand All @@ -12,7 +11,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/cespare/xxhash/v2"

"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical/physicalpb"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
Expand All @@ -33,7 +32,7 @@ const (

// aggregator is used to aggregate sample values by a set of grouping keys for each point in time.
type aggregator struct {
groupBy []physical.ColumnExpression // columns to group by
groupBy []*physicalpb.ColumnExpression // columns to group by
points map[time.Time]map[uint64]*groupState // holds the groupState for each point in time series
digest *xxhash.Digest // used to compute key for each group
operation aggregationOperation // aggregation type
Expand All @@ -43,7 +42,7 @@ type aggregator struct {
// empty groupBy indicates no grouping. All values are aggregated into a single group.
// TODO: add without argument to support `without(...)` grouping.
// A special case of `without()` that has empty groupBy is used for Noop grouping which retains the input labels as is.
func newAggregator(groupBy []physical.ColumnExpression, pointsSizeHint int, operation aggregationOperation) *aggregator {
func newAggregator(groupBy []*physicalpb.ColumnExpression, pointsSizeHint int, operation aggregationOperation) *aggregator {
a := aggregator{
groupBy: groupBy,
digest: xxhash.New(),
Expand Down Expand Up @@ -140,11 +139,7 @@ func (a *aggregator) BuildRecord() (arrow.Record, error) {
)

for _, column := range a.groupBy {
colExpr, ok := column.(*physical.ColumnExpr)
if !ok {
panic(fmt.Sprintf("invalid column expression type %T", column))
}
ident := semconv.NewIdentifier(colExpr.Ref.Column, colExpr.Ref.Type, types.Loki.String)
ident := semconv.NewIdentifier(column.Name, column.Type, types.Loki.String)
fields = append(fields, semconv.FieldFromIdent(ident, true))
}

Expand Down
26 changes: 11 additions & 15 deletions pkg/engine/internal/executor/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,30 @@ import (

"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical/physicalpb"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)

var (
groupBy = []physical.ColumnExpression{
&physical.ColumnExpr{
Ref: types.ColumnRef{
Column: "env",
Type: types.ColumnTypeLabel,
},
groupBy = []*physicalpb.ColumnExpression{
{
Name: "env",
Type: physicalpb.COLUMN_TYPE_LABEL,
},
&physical.ColumnExpr{
Ref: types.ColumnRef{
Column: "service",
Type: types.ColumnTypeLabel,
},
{
Name: "service",
Type: physicalpb.COLUMN_TYPE_LABEL,
},
}
)

func TestAggregator(t *testing.T) {
colTs := semconv.ColumnIdentTimestamp.FQN()
colVal := semconv.ColumnIdentValue.FQN()
colEnv := semconv.NewIdentifier("env", types.ColumnTypeLabel, types.Loki.String).FQN()
colSvc := semconv.NewIdentifier("service", types.ColumnTypeLabel, types.Loki.String).FQN()
colEnv := semconv.NewIdentifier("env", physicalpb.COLUMN_TYPE_LABEL, types.Loki.String).FQN()
colSvc := semconv.NewIdentifier("service", physicalpb.COLUMN_TYPE_LABEL, types.Loki.String).FQN()

t.Run("basic SUM aggregation with record building", func(t *testing.T) {
agg := newAggregator(groupBy, 10, aggregationOperationSum)
Expand Down Expand Up @@ -210,7 +206,7 @@ func TestAggregator(t *testing.T) {

t.Run("SUM aggregation with empty groupBy", func(t *testing.T) {
// Empty groupBy represents sum by () or sum(...) - all values aggregated into single group
groupBy := []physical.ColumnExpression{}
groupBy := []*physicalpb.ColumnExpression{}

agg := newAggregator(groupBy, 1, aggregationOperationSum)

Expand Down
9 changes: 5 additions & 4 deletions pkg/engine/internal/executor/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/dustin/go-humanize"

"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical/physicalpb"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)

func castFn(operation types.UnaryOp) UnaryFunction {
func castFn(operation physicalpb.UnaryOp) UnaryFunction {
return UnaryFunc(func(input arrow.Array) (arrow.Array, error) {
sourceCol, ok := input.(*array.String)
if !ok {
Expand All @@ -36,11 +37,11 @@ func castFn(operation types.UnaryOp) UnaryFunction {

type conversionFn func(value string) (float64, error)

func getConversionFunction(operation types.UnaryOp) conversionFn {
func getConversionFunction(operation physicalpb.UnaryOp) conversionFn {
switch operation {
case types.UnaryOpCastBytes:
case physicalpb.UNARY_OP_CAST_BYTES:
return convertBytes
case types.UnaryOpCastDuration:
case physicalpb.UNARY_OP_CAST_DURATION:
return convertDuration
default:
return convertFloat
Expand Down
45 changes: 30 additions & 15 deletions pkg/engine/internal/executor/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,62 @@ import (
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"

"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical/physicalpb"
)

func NewScalar(value types.Literal, rows int) arrow.Array {
builder := array.NewBuilder(memory.DefaultAllocator, value.Type().ArrowType())
func NewScalar(value physicalpb.LiteralExpression, rows int) arrow.Array {
var tmp arrow.DataType
switch value.Kind.(type) {
case *physicalpb.LiteralExpression_NullLiteral:
tmp = arrow.Null
case *physicalpb.LiteralExpression_BoolLiteral:
tmp = arrow.FixedWidthTypes.Boolean
case *physicalpb.LiteralExpression_StringLiteral:
tmp = arrow.BinaryTypes.String
case *physicalpb.LiteralExpression_IntegerLiteral, *physicalpb.LiteralExpression_DurationLiteral, *physicalpb.LiteralExpression_BytesLiteral:
tmp = arrow.PrimitiveTypes.Int64
case *physicalpb.LiteralExpression_FloatLiteral:
tmp = arrow.PrimitiveTypes.Float64
case *physicalpb.LiteralExpression_TimestampLiteral:
tmp = arrow.FixedWidthTypes.Timestamp_ns
}
builder := array.NewBuilder(memory.DefaultAllocator, tmp)

switch builder := builder.(type) {
case *array.NullBuilder:
for range rows {
builder.AppendNull()
}
case *array.BooleanBuilder:
value := value.Any().(bool)
value := value.GetBoolLiteral().Value
for range rows {
builder.Append(value)
}
case *array.StringBuilder:
value := value.Any().(string)
value := value.GetStringLiteral().Value
for range rows {
builder.Append(value)
}
case *array.Int64Builder:
var v int64
switch value.Type() {
case types.Loki.Integer:
v = value.Any().(int64)
case types.Loki.Duration:
v = int64(value.Any().(types.Duration))
case types.Loki.Bytes:
v = int64(value.Any().(types.Bytes))
switch value.Kind.(type) {
case *physicalpb.LiteralExpression_IntegerLiteral:
v = value.GetIntegerLiteral().Value
case *physicalpb.LiteralExpression_DurationLiteral:
v = value.GetDurationLiteral().Value
case *physicalpb.LiteralExpression_BytesLiteral:
v = value.GetBytesLiteral().Value
}
for range rows {
builder.Append(v)
}
case *array.Float64Builder:
value := value.Any().(float64)
value := value.GetFloatLiteral().Value
for range rows {
builder.Append(value)
}
case *array.TimestampBuilder:
value := value.Any().(types.Timestamp)
value := value.GetTimestampLiteral().Value
for range rows {
builder.Append(arrow.Timestamp(value))
}
Expand All @@ -65,7 +80,7 @@ func NewCoalesce(columns []*columnWithType) arrow.Array {

// Sort columns by precedence
slices.SortFunc(columns, func(a, b *columnWithType) int {
return types.ColumnTypePrecedence(a.ct) - types.ColumnTypePrecedence(b.ct)
return physicalpb.ColumnTypePrecedence(a.ct) - physicalpb.ColumnTypePrecedence(b.ct)
})

// Only string columns are supported
Expand Down
8 changes: 4 additions & 4 deletions pkg/engine/internal/executor/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"

"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical/physicalpb"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
)

func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipeline) Pipeline {
func newColumnCompatibilityPipeline(compat *physicalpb.ColumnCompat, input Pipeline) Pipeline {
const extracted = "_extracted"

return newGenericPipeline(func(ctx context.Context, inputs []Pipeline) (arrow.Record, error) {
Expand Down Expand Up @@ -45,10 +45,10 @@ func newColumnCompatibilityPipeline(compat *physical.ColumnCompat, input Pipelin
return nil, err
}
switch ident.ColumnType() {
case compat.Collision:
case semconv.ColumnTypePhysToLog(compat.Collision):
collisionFieldIndices = append(collisionFieldIndices, idx)
collisionFieldNames = append(collisionFieldNames, ident.ShortName())
case compat.Source:
case semconv.ColumnTypePhysToLog(compat.Source):
sourceFieldIndices = append(sourceFieldIndices, idx)
sourceFieldNames = append(sourceFieldNames, ident.ShortName())
}
Expand Down
Loading