Skip to content
This repository was archived by the owner on Apr 30, 2025. It is now read-only.

Commit e1b3b82

Browse files
Add test and clean up extract code
Signed-off-by: Michel Hollands <[email protected]>
1 parent 902f74e commit e1b3b82

File tree

4 files changed

+27
-12
lines changed

4 files changed

+27
-12
lines changed

pkg/distributor/distributor.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/weaveworks/common/httpgrpc"
1919
"github.com/weaveworks/common/instrument"
2020
"github.com/weaveworks/common/user"
21-
grpc_metadata "google.golang.org/grpc/metadata"
2221

2322
"github.com/cortexproject/cortex/pkg/ingester/client"
2423
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
@@ -541,7 +540,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
541540
}
542541

543542
// Get clientIP(s) from Context and add it to localCtx
544-
localCtx = grpc_metadata.AppendToOutgoingContext(localCtx, util.IPAddressesKey, source)
543+
localCtx = util.AddSourceToOutgoingContext(localCtx, source)
545544

546545
return d.send(localCtx, ingester, timeseries, metadata, req.Source)
547546
}, func() { client.ReuseSlice(req.Timeseries) })

pkg/ingester/ingester_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
3131
"github.com/cortexproject/cortex/pkg/ingester/client"
3232
"github.com/cortexproject/cortex/pkg/ring"
33+
"github.com/cortexproject/cortex/pkg/util"
3334
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
3435
"github.com/cortexproject/cortex/pkg/util/services"
3536
"github.com/cortexproject/cortex/pkg/util/test"
@@ -468,8 +469,10 @@ func TestIngesterAppendOutOfOrderAndDuplicate(t *testing.T) {
468469
require.Equal(t, errResp.code, 400)
469470

470471
// Same timestamp as previous sample, but different value.
472+
ctx = util.AddSourceToIncomingContext(ctx, "1.2.3.4")
471473
err = ing.append(ctx, userID, m, 1, 1, client.API, nil)
472474
require.Contains(t, err.Error(), "sample with repeated timestamp but different value")
475+
require.Contains(t, err.Error(), "1.2.3.4")
473476
errResp, ok = err.(*validationError)
474477
require.True(t, ok)
475478
require.Equal(t, errResp.code, 400)

pkg/util/extract_forwarded.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import (
99
"google.golang.org/grpc/metadata"
1010
)
1111

12-
// IPAddressesKey is key for the GRPC metadata where the IP addresses are stored
13-
const IPAddressesKey = "ipaddresseskey"
12+
// ipAddressesKey is key for the GRPC metadata where the IP addresses are stored
13+
const ipAddressesKey = "ipaddresseskey"
1414

1515
// extractHost returns the Host IP address without any port information
1616
func extractHost(address string) string {
@@ -53,7 +53,7 @@ func GetSourceFromOutgoingCtx(ctx context.Context) string {
5353
if !ok {
5454
return ""
5555
}
56-
ipAddresses, ok := md[IPAddressesKey]
56+
ipAddresses, ok := md[ipAddressesKey]
5757
if !ok {
5858
return ""
5959
}
@@ -66,9 +66,26 @@ func GetSourceFromIncomingCtx(ctx context.Context) string {
6666
if !ok {
6767
return ""
6868
}
69-
ipAddresses, ok := md[IPAddressesKey]
69+
ipAddresses, ok := md[ipAddressesKey]
7070
if !ok {
7171
return ""
7272
}
7373
return ipAddresses[0]
7474
}
75+
76+
// AddSourceToOutgoingContext adds the given source to the GRPC context
77+
func AddSourceToOutgoingContext(ctx context.Context, source string) context.Context {
78+
if source != "" {
79+
ctx = metadata.AppendToOutgoingContext(ctx, ipAddressesKey, source)
80+
}
81+
return ctx
82+
}
83+
84+
// AddSourceToIncomingContext adds the given source to the GRPC context
85+
func AddSourceToIncomingContext(ctx context.Context, source string) context.Context {
86+
if source != "" {
87+
md := metadata.Pairs(ipAddressesKey, source)
88+
ctx = metadata.NewIncomingContext(ctx, md)
89+
}
90+
return ctx
91+
}

pkg/util/push/push.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"context"
55
"net/http"
66

7-
"google.golang.org/grpc/metadata"
8-
97
"github.com/go-kit/kit/log/level"
108
"github.com/weaveworks/common/httpgrpc"
119

@@ -20,11 +18,9 @@ func Handler(cfg distributor.Config, push func(context.Context, *client.WriteReq
2018
// Extract X-Forwarder-For header
2119
ctx := r.Context()
2220
source := util.GetSource(r)
23-
logger := util.WithContext(ctx, util.Logger)
24-
if source != "" {
25-
ctx = metadata.AppendToOutgoingContext(ctx, util.IPAddressesKey, source)
26-
}
21+
ctx = util.AddSourceToOutgoingContext(ctx, source)
2722

23+
logger := util.WithContext(ctx, util.Logger)
2824
compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Write-Version"))
2925
var req client.PreallocWriteRequest
3026
_, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType)

0 commit comments

Comments
 (0)