Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (a *Appender) updateCheckpoint(ctx context.Context, size uint64, root []byt

// objStore describes a type which can store and retrieve objects.
type objStore interface {
getObject(ctx context.Context, obj string) ([]byte, int64, error)
getObject(ctx context.Context, obj string) ([]byte, *gcs.ReaderObjectAttrs, error)
setObject(ctx context.Context, obj string, data []byte, cond *gcs.Conditions, contType string, cacheCtl string) error
deleteObjectsWithPrefix(ctx context.Context, prefix string) error
}
Expand All @@ -488,7 +488,12 @@ func (lrs *logResourceStore) setCheckpoint(ctx context.Context, cpRaw []byte) er
}

func (lrs *logResourceStore) getCheckpoint(ctx context.Context) ([]byte, error) {
r, _, err := lrs.objStore.getObject(ctx, layout.CheckpointPath)
r, attr, err := lrs.objStore.getObject(ctx, layout.CheckpointPath)
if err != nil {
return nil, err
}

checkpointAgeHistogram.Record(ctx, time.Since(attr.LastModified).Milliseconds())
return r, err
}

Expand Down Expand Up @@ -1260,8 +1265,8 @@ type gcsStorage struct {
}

// getObject returns the data and generation of the specified object, or an error.
func (s *gcsStorage) getObject(ctx context.Context, obj string) ([]byte, int64, error) {
return otel.Trace2(ctx, "tessera.storage.gcp.getObject", tracer, func(ctx context.Context, span trace.Span) ([]byte, int64, error) {
func (s *gcsStorage) getObject(ctx context.Context, obj string) ([]byte, *gcs.ReaderObjectAttrs, error) {
return otel.Trace2(ctx, "tessera.storage.gcp.getObject", tracer, func(ctx context.Context, span trace.Span) ([]byte, *gcs.ReaderObjectAttrs, error) {
if s.bucketPrefix != "" {
obj = filepath.Join(s.bucketPrefix, obj)
}
Expand All @@ -1270,14 +1275,14 @@ func (s *gcsStorage) getObject(ctx context.Context, obj string) ([]byte, int64,

r, err := s.gcsClient.Bucket(s.bucket).Object(obj).NewReader(ctx)
if err != nil {
return nil, -1, fmt.Errorf("getObject: failed to create reader for object %q in bucket %q: %w", obj, s.bucket, err)
return nil, nil, fmt.Errorf("getObject: failed to create reader for object %q in bucket %q: %w", obj, s.bucket, err)
}

d, err := io.ReadAll(r)
if err != nil {
return nil, -1, fmt.Errorf("failed to read %q: %v", obj, err)
return nil, nil, fmt.Errorf("failed to read %q: %v", obj, err)
}
return d, r.Attrs.Generation, r.Close()
return d, &r.Attrs, r.Close()
})
}

Expand Down Expand Up @@ -1329,9 +1334,9 @@ func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte,
preconditionFailed = true
}
if preconditionFailed {
existing, existingGen, err := s.getObject(ctx, objName)
existing, existingAttr, err := s.getObject(ctx, objName)
if err != nil {
return fmt.Errorf("failed to fetch existing content for %q (@%d): %v", objName, existingGen, err)
return fmt.Errorf("failed to fetch existing content for %q (@%d): %v", objName, existingAttr.Generation, err)
}
if !bytes.Equal(existing, data) {
span.AddEvent("Non-idempotent write")
Expand Down
6 changes: 3 additions & 3 deletions storage/gcp/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,15 +792,15 @@ func newMemObjStore() *memObjStore {
}
}

func (m *memObjStore) getObject(_ context.Context, obj string) ([]byte, int64, error) {
func (m *memObjStore) getObject(_ context.Context, obj string) ([]byte, *gcs.ReaderObjectAttrs, error) {
m.RLock()
defer m.RUnlock()

d, ok := m.mem[obj]
if !ok {
return nil, -1, fmt.Errorf("obj %q not found: %w", obj, gcs.ErrObjectNotExist)
return nil, nil, fmt.Errorf("obj %q not found: %w", obj, gcs.ErrObjectNotExist)
}
return d, 1, nil
return d, &gcs.ReaderObjectAttrs{Generation: 1}, nil
}

// TODO(phboneff): add content type tests
Expand Down
14 changes: 12 additions & 2 deletions storage/gcp/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ var (
objectPathKey = attribute.Key("tessera.objectPath")
opNameKey = attribute.Key("op_name")

publishCount metric.Int64Counter
opsHistogram metric.Int64Histogram
publishCount metric.Int64Counter
opsHistogram metric.Int64Histogram
checkpointAgeHistogram metric.Int64Histogram

// Custom histogram buckets as we're interested in low-millis upto low-seconds.
histogramBuckets = []float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1200, 1400, 1600, 1800, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000}
Expand All @@ -55,6 +56,15 @@ func init() {
klog.Exitf("Failed to create opsHistogram metric: %v", err)
}

checkpointAgeHistogram, err = meter.Int64Histogram(
"tessera.reader.checkpoint.age",
metric.WithDescription("Age of checkpoints at the point of reading from GCP"),
metric.WithUnit("ms"),
metric.WithExplicitBucketBoundaries(histogramBuckets...))
if err != nil {
klog.Exitf("Failed to create checkpointAgeHistogram metric: %v", err)
}

publishCount, err = meter.Int64Counter(
"tessera.appender.checkpoint.publication.counter",
metric.WithDescription("Number of checkpoint publication attempts by result"),
Expand Down
Loading