diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index cc1405e8..fdf90ad7 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -472,7 +472,7 @@ func (a *Appender) updateCheckpoint(ctx context.Context, size uint64, root []byt // objStore describes a type which can store and retrieve objects. type objStore interface { - getObject(ctx context.Context, obj string) ([]byte, int64, error) + getObject(ctx context.Context, obj string) ([]byte, *gcs.ReaderObjectAttrs, error) setObject(ctx context.Context, obj string, data []byte, cond *gcs.Conditions, contType string, cacheCtl string) error deleteObjectsWithPrefix(ctx context.Context, prefix string) error } @@ -488,7 +488,12 @@ func (lrs *logResourceStore) setCheckpoint(ctx context.Context, cpRaw []byte) er } func (lrs *logResourceStore) getCheckpoint(ctx context.Context) ([]byte, error) { - r, _, err := lrs.objStore.getObject(ctx, layout.CheckpointPath) + r, attr, err := lrs.objStore.getObject(ctx, layout.CheckpointPath) + if err != nil { + return nil, err + } + + checkpointAgeHistogram.Record(ctx, time.Since(attr.LastModified).Milliseconds()) return r, err } @@ -1260,8 +1265,8 @@ type gcsStorage struct { } // getObject returns the data and generation of the specified object, or an error. -func (s *gcsStorage) getObject(ctx context.Context, obj string) ([]byte, int64, error) { - return otel.Trace2(ctx, "tessera.storage.gcp.getObject", tracer, func(ctx context.Context, span trace.Span) ([]byte, int64, error) { +func (s *gcsStorage) getObject(ctx context.Context, obj string) ([]byte, *gcs.ReaderObjectAttrs, error) { + return otel.Trace2(ctx, "tessera.storage.gcp.getObject", tracer, func(ctx context.Context, span trace.Span) ([]byte, *gcs.ReaderObjectAttrs, error) { if s.bucketPrefix != "" { obj = filepath.Join(s.bucketPrefix, obj) } @@ -1270,14 +1275,14 @@ func (s *gcsStorage) getObject(ctx context.Context, obj string) ([]byte, int64, r, err := s.gcsClient.Bucket(s.bucket).Object(obj).NewReader(ctx) if err != nil { - return nil, -1, fmt.Errorf("getObject: failed to create reader for object %q in bucket %q: %w", obj, s.bucket, err) + return nil, nil, fmt.Errorf("getObject: failed to create reader for object %q in bucket %q: %w", obj, s.bucket, err) } d, err := io.ReadAll(r) if err != nil { - return nil, -1, fmt.Errorf("failed to read %q: %v", obj, err) + return nil, nil, fmt.Errorf("failed to read %q: %v", obj, err) } - return d, r.Attrs.Generation, r.Close() + return d, &r.Attrs, r.Close() }) } @@ -1329,9 +1334,9 @@ func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte, preconditionFailed = true } if preconditionFailed { - existing, existingGen, err := s.getObject(ctx, objName) + existing, existingAttr, err := s.getObject(ctx, objName) if err != nil { - return fmt.Errorf("failed to fetch existing content for %q (@%d): %v", objName, existingGen, err) + return fmt.Errorf("failed to fetch existing content for %q (@%d): %v", objName, existingAttr.Generation, err) } if !bytes.Equal(existing, data) { span.AddEvent("Non-idempotent write") diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index a3b15fd2..b0e0aaf7 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -792,15 +792,15 @@ func newMemObjStore() *memObjStore { } } -func (m *memObjStore) getObject(_ context.Context, obj string) ([]byte, int64, error) { +func (m *memObjStore) getObject(_ context.Context, obj string) ([]byte, *gcs.ReaderObjectAttrs, error) { m.RLock() defer m.RUnlock() d, ok := m.mem[obj] if !ok { - return nil, -1, fmt.Errorf("obj %q not found: %w", obj, gcs.ErrObjectNotExist) + return nil, nil, fmt.Errorf("obj %q not found: %w", obj, gcs.ErrObjectNotExist) } - return d, 1, nil + return d, &gcs.ReaderObjectAttrs{Generation: 1}, nil } // TODO(phboneff): add content type tests diff --git a/storage/gcp/otel.go b/storage/gcp/otel.go index 6759bcbe..8902df7e 100644 --- a/storage/gcp/otel.go +++ b/storage/gcp/otel.go @@ -36,8 +36,9 @@ var ( objectPathKey = attribute.Key("tessera.objectPath") opNameKey = attribute.Key("op_name") - publishCount metric.Int64Counter - opsHistogram metric.Int64Histogram + publishCount metric.Int64Counter + opsHistogram metric.Int64Histogram + checkpointAgeHistogram metric.Int64Histogram // Custom histogram buckets as we're interested in low-millis upto low-seconds. histogramBuckets = []float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1200, 1400, 1600, 1800, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000} @@ -55,6 +56,15 @@ func init() { klog.Exitf("Failed to create opsHistogram metric: %v", err) } + checkpointAgeHistogram, err = meter.Int64Histogram( + "tessera.reader.checkpoint.age", + metric.WithDescription("Age of checkpoints at the point of reading from GCP"), + metric.WithUnit("ms"), + metric.WithExplicitBucketBoundaries(histogramBuckets...)) + if err != nil { + klog.Exitf("Failed to create checkpointAgeHistogram metric: %v", err) + } + publishCount, err = meter.Int64Counter( "tessera.appender.checkpoint.publication.counter", metric.WithDescription("Number of checkpoint publication attempts by result"),