Skip to content

Commit 2a57435

Browse files
feat(v2): cancel artificial delay on async ingest (#4261)
* feat(v2): add request-level write path overrides # Conflicts: # pkg/distributor/distributor.go # pkg/distributor/writepath/router.go # pkg/distributor/writepath/write_path_test.go * feat(v2): cancellable artificial delay * feat(v2): add async ingest tests * feat(v2): add example use case for request-level overrides * Revert "feat(v2): add example use case for request-level overrides" This reverts commit afce63c. * cancel artificial delay on async ingest * Remove import * Fix bug from rebase * Bring back removed trace span --------- Co-authored-by: Aleksandar Petrov <[email protected]>
1 parent d22a386 commit 2a57435

File tree

8 files changed

+290
-71
lines changed

8 files changed

+290
-71
lines changed

pkg/distributor/distributor.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,9 @@ type Limits interface {
144144
IngestionRelabelingRules(tenantID string) []*relabel.Config
145145
SampleTypeRelabelingRules(tenantID string) []*relabel.Config
146146
DistributorUsageGroups(tenantID string) *validation.UsageGroupConfig
147+
WritePathOverrides(tenantID string) writepath.Config
147148
validation.ProfileValidationLimits
148149
aggregator.Limits
149-
writepath.Overrides
150150
}
151151

152152
func New(
@@ -188,11 +188,7 @@ func New(
188188

189189
ingesterRoute := writepath.IngesterFunc(d.sendRequestsToIngester)
190190
segmentWriterRoute := writepath.IngesterFunc(d.sendRequestsToSegmentWriter)
191-
d.router = writepath.NewRouter(
192-
logger, reg, limits,
193-
ingesterRoute,
194-
segmentWriterRoute,
195-
)
191+
d.router = writepath.NewRouter(logger, reg, ingesterRoute, segmentWriterRoute)
196192

197193
var err error
198194
subservices := []services.Service(nil)
@@ -560,7 +556,8 @@ func (d *Distributor) pushSeries(ctx context.Context, req *distributormodel.Prof
560556
// functions to send the request to the appropriate service; these are
561557
// called independently, and may be called concurrently: the request is
562558
// cloned in this case – the callee may modify the request safely.
563-
return d.router.Send(ctx, req)
559+
config := d.limits.WritePathOverrides(req.TenantID)
560+
return d.router.Send(ctx, req, config)
564561
}
565562

566563
func noNewProfilesReceivedError() *connect.Error {
@@ -635,7 +632,8 @@ func (d *Distributor) aggregate(ctx context.Context, req *distributormodel.Profi
635632
Profile: pprof.RawFromProto(p.Profile()),
636633
Annotations: annotations,
637634
}
638-
return d.router.Send(localCtx, aggregated)
635+
config := d.limits.WritePathOverrides(req.TenantID)
636+
return d.router.Send(localCtx, aggregated, config)
639637
})()
640638
if sendErr != nil {
641639
_ = level.Error(d.logger).Log("msg", "failed to handle aggregation", "tenant", req.TenantID, "err", err)

pkg/distributor/writepath/router.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/grafana/pyroscope/pkg/tenant"
2222
"github.com/grafana/pyroscope/pkg/util"
2323
"github.com/grafana/pyroscope/pkg/util/connectgrpc"
24+
"github.com/grafana/pyroscope/pkg/util/delayhandler"
2425
httputil "github.com/grafana/pyroscope/pkg/util/http"
2526
)
2627

@@ -44,17 +45,12 @@ func (f IngesterFunc) Push(
4445
return f(ctx, req)
4546
}
4647

47-
type Overrides interface {
48-
WritePathOverrides(tenantID string) Config
49-
}
50-
5148
type Router struct {
5249
service services.Service
5350
inflight sync.WaitGroup
5451

55-
logger log.Logger
56-
overrides Overrides
57-
metrics *metrics
52+
logger log.Logger
53+
metrics *metrics
5854

5955
ingester IngesterClient
6056
segwriter IngesterClient
@@ -63,13 +59,11 @@ type Router struct {
6359
func NewRouter(
6460
logger log.Logger,
6561
registerer prometheus.Registerer,
66-
overrides Overrides,
6762
ingester IngesterClient,
6863
segwriter IngesterClient,
6964
) *Router {
7065
r := &Router{
7166
logger: logger,
72-
overrides: overrides,
7367
metrics: newMetrics(registerer),
7468
ingester: ingester,
7569
segwriter: segwriter,
@@ -93,17 +87,19 @@ func (m *Router) running(ctx context.Context) error {
9387
return nil
9488
}
9589

96-
func (m *Router) Send(ctx context.Context, req *distributormodel.ProfileSeries) error {
90+
func (m *Router) Send(ctx context.Context, req *distributormodel.ProfileSeries, config Config) error {
9791
sp, ctx := opentracing.StartSpanFromContext(ctx, "Router.Send")
9892
defer sp.Finish()
99-
config := m.overrides.WritePathOverrides(req.TenantID)
93+
if config.AsyncIngest {
94+
delayhandler.CancelDelay(ctx)
95+
}
10096
switch config.WritePath {
10197
case SegmentWriterPath:
102-
return m.send(m.segwriterRoute(true))(ctx, req)
98+
return m.sendToSegmentWriterOnly(ctx, req, &config)
10399
case CombinedPath:
104100
return m.sendToBoth(ctx, req, &config)
105101
default:
106-
return m.send(m.ingesterRoute())(ctx, req)
102+
return m.sendToIngesterOnly(ctx, req)
107103
}
108104
}
109105

@@ -163,7 +159,7 @@ func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.ProfileSe
163159
// The request is to be sent to both asynchronously, therefore we're
164160
// cloning it. We do not wait for the secondary request to complete.
165161
// On shutdown, however, we will wait for all inflight requests.
166-
segwriter.client = m.sendClone(ctx, req.Clone(), segwriter.client, config)
162+
segwriter.client = m.detachedClient(ctx, req.Clone(), segwriter.client, config)
167163
}
168164

169165
if segwriter != nil {
@@ -182,6 +178,22 @@ func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.ProfileSe
182178
return nil
183179
}
184180

181+
func (m *Router) sendToSegmentWriterOnly(ctx context.Context, req *distributormodel.ProfileSeries, config *Config) error {
182+
r := m.segwriterRoute(true)
183+
if !config.AsyncIngest {
184+
return m.send(r)(ctx, req)
185+
}
186+
r.client = m.detachedClient(ctx, req, r.client, config)
187+
m.sendAsync(ctx, req, r)
188+
return nil
189+
}
190+
191+
func (m *Router) sendToIngesterOnly(ctx context.Context, req *distributormodel.ProfileSeries) error {
192+
// NOTE(kolesnikovae): If we also want to support async requests to ingesters,
193+
// we should implement it here and in sendToBoth.
194+
return m.send(m.ingesterRoute())(ctx, req)
195+
}
196+
185197
type sendFunc func(context.Context, *distributormodel.ProfileSeries) error
186198

187199
type route struct {
@@ -190,7 +202,9 @@ type route struct {
190202
primary bool
191203
}
192204

193-
func (m *Router) sendClone(ctx context.Context, req *distributormodel.ProfileSeries, client IngesterClient, config *Config) IngesterFunc {
205+
// detachedClient creates a new IngesterFunc that wraps the call with a local context
206+
// that has a timeout and tenant ID injected so it can be used for asynchronous requests.
207+
func (m *Router) detachedClient(ctx context.Context, req *distributormodel.ProfileSeries, client IngesterClient, config *Config) IngesterFunc {
194208
return func(context.Context, *distributormodel.ProfileSeries) (*connect.Response[pushv1.PushResponse], error) {
195209
localCtx, cancel := context.WithTimeout(context.Background(), config.SegmentWriterTimeout)
196210
localCtx = tenant.InjectTenantID(localCtx, req.TenantID)

0 commit comments

Comments
 (0)