Skip to content

Commit 98bdfad

Browse files
authored
feat: Improve request observability (#4400)
This change improves the observability into requests we do receive from clients: - Uses full logfmt fields instead of embedding custom format into fields - Log request body timings and sizes (also propagate them to the tracing span) - Expose request headers (was not mapped to CLI flags, so we only saw them when a status code 500 was thrown) - Rename OrgID/user to tenant
1 parent 11065c2 commit 98bdfad

File tree

4 files changed

+312
-93
lines changed

4 files changed

+312
-93
lines changed

pkg/ingester/pyroscope/ingest_handler.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,6 @@ func readInputRawDataFromRequest(ctx context.Context, r *http.Request, input *in
171171
if sp != nil {
172172
sp.SetTag("format", format)
173173
sp.SetTag("content_type", contentType)
174-
sp.LogFields(
175-
otlog.String("msg", "reading body from request"),
176-
)
177174
}
178175

179176
buf := bytes.NewBuffer(make([]byte, 0, 64<<10))
@@ -184,9 +181,6 @@ func readInputRawDataFromRequest(ctx context.Context, r *http.Request, input *in
184181

185182
if sp != nil {
186183
sp.SetTag("content_length", n)
187-
sp.LogFields(
188-
otlog.String("msg", "read body from request"),
189-
)
190184
}
191185
b := buf.Bytes()
192186

pkg/pyroscope/modules.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/http"
88
"os"
99
"slices"
10+
"strings"
1011
"time"
1112

1213
"connectrpc.com/connect"
@@ -513,9 +514,11 @@ func (f *Pyroscope) initServer() (services.Service, error) {
513514
middleware.RouteInjector{
514515
RouteMatcher: f.Server.HTTP,
515516
},
516-
util.Log{
517-
Log: f.Server.Log,
518-
LogRequestAtInfoLevel: f.Cfg.Server.LogRequestAtInfoLevel,
517+
&util.Log{
518+
Log: f.Server.Log,
519+
LogRequestAtInfoLevel: f.Cfg.Server.LogRequestAtInfoLevel,
520+
LogRequestHeaders: f.Cfg.Server.LogRequestHeaders,
521+
LogRequestExcludeHeaders: strings.Split(f.Cfg.Server.LogRequestExcludeHeadersList, ","),
519522
},
520523
httpMetric,
521524
objstoreTracerMiddleware,

pkg/util/http.go

Lines changed: 128 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import (
1010
"io"
1111
"net"
1212
"net/http"
13+
"net/textproto"
14+
"slices"
1315
"strings"
16+
"sync"
1417
"time"
1518

1619
"github.com/grafana/dskit/instrument"
@@ -21,12 +24,12 @@ import (
2124
"github.com/go-kit/log"
2225
"github.com/go-kit/log/level"
2326
"github.com/gorilla/mux"
24-
dslog "github.com/grafana/dskit/log"
2527
"github.com/grafana/dskit/middleware"
2628
"github.com/grafana/dskit/multierror"
2729
"github.com/grafana/dskit/tracing"
2830
"github.com/grafana/dskit/user"
2931
"github.com/opentracing/opentracing-go"
32+
otlog "github.com/opentracing/opentracing-go/log"
3033
"github.com/prometheus/client_golang/prometheus"
3134
"golang.org/x/net/http2"
3235
"gopkg.in/yaml.v3"
@@ -46,6 +49,8 @@ var defaultTransport http.RoundTripper = &http2.Transport{
4649
},
4750
}
4851

52+
var timeNow = time.Now
53+
4954
type RoundTripperFunc func(req *http.Request) (*http.Response, error)
5055

5156
func (f RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
@@ -130,14 +135,57 @@ const (
130135

131136
// Log middleware logs http requests
132137
type Log struct {
133-
Log log.Logger
134-
LogRequestHeaders bool // LogRequestHeaders true -> dump http headers at debug log level
135-
LogRequestAtInfoLevel bool // LogRequestAtInfoLevel true -> log requests at info log level
136-
SourceIPs *middleware.SourceIPExtractor
138+
Log log.Logger
139+
LogRequestHeaders bool
140+
LogRequestExcludeHeaders []string
141+
LogRequestAtInfoLevel bool // LogRequestAtInfoLevel true -> log requests at info log level
142+
SourceIPs *middleware.SourceIPExtractor
143+
144+
filterHeaderMap map[string]struct{}
145+
filterHeaderOnce sync.Once
146+
}
147+
148+
func (l *Log) filterHeader(key string) bool {
149+
// ensure map is populated once
150+
l.filterHeaderOnce.Do(func() {
151+
l.filterHeaderMap = make(map[string]struct{})
152+
for _, k := range l.LogRequestExcludeHeaders {
153+
l.filterHeaderMap[textproto.CanonicalMIMEHeaderKey(k)] = struct{}{}
154+
}
155+
for k := range middleware.AlwaysExcludedHeaders {
156+
l.filterHeaderMap[textproto.CanonicalMIMEHeaderKey(k)] = struct{}{}
157+
}
158+
})
159+
_, filter := l.filterHeaderMap[key]
160+
return filter
161+
}
162+
163+
func (l *Log) extractHeaders(req *http.Request) []any {
164+
// Populate header list first and sort it
165+
logKeys := make([]string, 0, len(req.Header))
166+
for k := range req.Header {
167+
if l.filterHeader(k) {
168+
continue
169+
}
170+
logKeys = append(logKeys, k)
171+
}
172+
slices.SortFunc(logKeys, strings.Compare)
173+
174+
// build the log fields
175+
logFields := make([]any, 0, len(logKeys)*2)
176+
for _, k := range logKeys {
177+
logFields = append(
178+
logFields,
179+
"request_header_"+k,
180+
req.Header.Get(k),
181+
)
182+
}
183+
184+
return logFields
137185
}
138186

139187
// logWithRequest information from the request and context as fields.
140-
func (l Log) logWithRequest(r *http.Request) log.Logger {
188+
func (l *Log) logWithRequest(r *http.Request) log.Logger {
141189
localLog := l.Log
142190
traceID, ok := tracing.ExtractTraceID(r.Context())
143191
if ok {
@@ -150,26 +198,52 @@ func (l Log) logWithRequest(r *http.Request) log.Logger {
150198
localLog = log.With(localLog, "sourceIPs", ips)
151199
}
152200
}
153-
orgID := r.Header.Get(user.OrgIDHeaderName)
154-
if orgID == "" {
155-
localLog = user.LogWith(r.Context(), localLog)
156-
} else {
157-
localLog = log.With(localLog, "orgID", orgID)
201+
202+
tenantID := r.Header.Get(user.OrgIDHeaderName)
203+
if tenantID == "" {
204+
id, err := user.ExtractOrgID(r.Context())
205+
if err == nil {
206+
tenantID = id
207+
}
208+
}
209+
if tenantID != "" {
210+
localLog = log.With(localLog, "tenant", tenantID)
158211
}
212+
159213
return localLog
160214
}
161215

162216
// measure request body size
163217
type reqBody struct {
164218
b io.ReadCloser
165219
read byteSize
220+
221+
start time.Time
222+
duration time.Duration
223+
224+
sp opentracing.Span
166225
}
167226

168227
func (w *reqBody) Read(p []byte) (int, error) {
228+
if w.start.IsZero() {
229+
w.start = timeNow()
230+
if w.sp != nil {
231+
w.sp.LogFields(otlog.String("msg", "start reading body from request"))
232+
}
233+
}
169234
n, err := w.b.Read(p)
170235
if n > 0 {
171236
w.read += byteSize(n)
172237
}
238+
if err == io.EOF {
239+
w.duration = timeNow().Sub(w.start)
240+
if w.sp != nil {
241+
w.sp.LogFields(otlog.String("msg", "read body from request"))
242+
if w.read > 0 {
243+
w.sp.SetTag("request_body_size", w.read)
244+
}
245+
}
246+
}
173247
return n, err
174248
}
175249

@@ -184,17 +258,13 @@ func (bs byteSize) String() string {
184258
}
185259

186260
// Wrap implements Middleware
187-
func (l Log) Wrap(next http.Handler) http.Handler {
261+
func (l *Log) Wrap(next http.Handler) http.Handler {
188262
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
189-
begin := time.Now()
190-
uri := r.RequestURI // capture the URI before running next, as it may get rewritten
263+
begin := timeNow()
264+
uri := r.RequestURI // Capture the URI before running next, as it may get rewritten
191265
requestLog := l.logWithRequest(r)
192266
// Log headers before running 'next' in case other interceptors change the data.
193-
headers, err := dumpRequest(r)
194-
if err != nil {
195-
headers = nil
196-
level.Error(requestLog).Log("msg", "Could not dump request headers", "err", err)
197-
}
267+
198268
var (
199269
httpErr multierror.MultiError
200270
httpCode = http.StatusOK
@@ -203,6 +273,8 @@ func (l Log) Wrap(next http.Handler) http.Handler {
203273
bodyLeft = maxResponseBodyInLogs
204274
)
205275

276+
headerFields := l.extractHeaders(r)
277+
206278
wrapped := httpsnoop.Wrap(w, httpsnoop.Hooks{
207279
WriteHeader: func(next httpsnoop.WriteHeaderFunc) httpsnoop.WriteHeaderFunc {
208280
return func(code int) {
@@ -242,49 +314,57 @@ func (l Log) Wrap(next http.Handler) http.Handler {
242314
r.Body = origBody
243315
}()
244316

245-
rBody := &reqBody{b: origBody}
317+
rBody := &reqBody{
318+
b: origBody,
319+
sp: opentracing.SpanFromContext(r.Context()),
320+
}
246321
r.Body = rBody
247322

248323
next.ServeHTTP(wrapped, r)
249324

250325
statusCode, writeErr := httpCode, httpErr.Err()
251326

252-
requestLogD := log.With(requestLog, "method", r.Method, "uri", uri, "status", statusCode, "duration", time.Since(begin))
327+
requestLog = log.With(requestLog, "method", r.Method, "uri", uri, "status", statusCode, "duration", time.Since(begin))
328+
329+
if l.LogRequestHeaders {
330+
requestLog = log.With(requestLog, headerFields...)
331+
}
253332
if rBody.read > 0 {
254-
requestLogD = log.With(requestLogD, "request_body_size", rBody.read)
333+
requestLog = log.With(requestLog, "request_body_size", rBody.read)
334+
if rBody.duration > 0 {
335+
requestLog = log.With(requestLog, "request_body_read_duration", rBody.duration)
336+
}
255337
}
256338

257-
if writeErr != nil {
258-
if errors.Is(writeErr, context.Canceled) {
259-
if l.LogRequestAtInfoLevel {
260-
level.Info(requestLogD).Log("msg", dslog.LazySprintf("request cancelled: %s ws: %v; %s", writeErr, IsWSHandshakeRequest(r), headers))
261-
} else {
262-
level.Debug(requestLogD).Log("msg", dslog.LazySprintf("request cancelled: %s ws: %v; %s", writeErr, IsWSHandshakeRequest(r), headers))
263-
}
264-
} else {
265-
level.Warn(requestLogD).Log("msg", dslog.LazySprintf("error: %s ws: %v; %s", writeErr, IsWSHandshakeRequest(r), headers))
266-
}
339+
requestLvl := level.Debug
340+
if l.LogRequestAtInfoLevel {
341+
requestLvl = level.Info
342+
}
267343

344+
// log successful requests
345+
if writeErr == nil && (100 <= statusCode && statusCode < 400) {
346+
requestLvl(requestLog).Log("msg", "http request processed")
268347
return
269348
}
270349

271-
if 100 <= statusCode && statusCode < 400 {
272-
if l.LogRequestAtInfoLevel {
273-
level.Info(requestLogD).Log("msg", "http request processed")
274-
} else {
275-
level.Debug(requestLogD).Log("msg", "http request processed")
276-
}
277-
if l.LogRequestHeaders && headers != nil {
278-
if l.LogRequestAtInfoLevel {
279-
level.Info(requestLog).Log("msg", dslog.LazySprintf("ws: %v; %s", IsWSHandshakeRequest(r), string(headers)))
280-
} else {
281-
level.Debug(requestLog).Log("msg", dslog.LazySprintf("ws: %v; %s", IsWSHandshakeRequest(r), string(headers)))
282-
}
283-
}
284-
} else {
285-
level.Warn(requestLog).Log("msg", dslog.LazySprintf("%s %s (%d) %s Response: %q ws: %v; %s",
286-
r.Method, uri, statusCode, time.Since(begin), buf.Bytes(), IsWSHandshakeRequest(r), headers))
350+
// context cancelled is not considered a failure
351+
if writeErr != nil && errors.Is(writeErr, context.Canceled) {
352+
requestLvl(requestLog).Log("msg", "request cancelled")
353+
return
354+
}
355+
356+
// add request headers if not anyhow added
357+
if !l.LogRequestHeaders {
358+
requestLog = log.With(requestLog, headerFields...)
359+
}
360+
361+
// writeError shouldn't log the body
362+
if writeErr != nil {
363+
level.Warn(requestLog).Log("msg", "http request failed", "err", writeErr)
364+
return
287365
}
366+
367+
level.Warn(requestLog).Log("msg", "http request failed", "response_body", buf.Bytes())
288368
})
289369
}
290370

@@ -302,37 +382,6 @@ func captureResponseBody(data []byte, bodyBytesLeft int, buf *bytes.Buffer) int
302382
}
303383
}
304384

305-
func dumpRequest(req *http.Request) ([]byte, error) {
306-
var b bytes.Buffer
307-
308-
// Exclude some headers for security, or just that we don't need them when debugging
309-
err := req.Header.WriteSubset(&b, map[string]bool{
310-
"Cookie": true,
311-
"X-Csrf-Token": true,
312-
"Authorization": true,
313-
})
314-
if err != nil {
315-
return nil, err
316-
}
317-
318-
ret := bytes.ReplaceAll(b.Bytes(), []byte("\r\n"), []byte("; "))
319-
return ret, nil
320-
}
321-
322-
// IsWSHandshakeRequest returns true if the given request is a websocket handshake request.
323-
func IsWSHandshakeRequest(req *http.Request) bool {
324-
if strings.ToLower(req.Header.Get("Upgrade")) == "websocket" {
325-
// Connection header values can be of form "foo, bar, ..."
326-
parts := strings.Split(strings.ToLower(req.Header.Get("Connection")), ",")
327-
for _, part := range parts {
328-
if strings.TrimSpace(part) == "upgrade" {
329-
return true
330-
}
331-
}
332-
}
333-
return false
334-
}
335-
336385
// NewHTTPMetricMiddleware creates a new middleware that automatically instruments HTTP requests from the given router.
337386
func NewHTTPMetricMiddleware(mux *mux.Router, namespace string, reg prometheus.Registerer) (middleware.Interface, error) {
338387
// Prometheus histograms for requests.

0 commit comments

Comments
 (0)