diff --git a/ingest/checkpoint_change_reader.go b/ingest/checkpoint_change_reader.go index a9aaf1737d..3402b0af7d 100644 --- a/ingest/checkpoint_change_reader.go +++ b/ingest/checkpoint_change_reader.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "iter" "sync" "time" @@ -14,25 +15,29 @@ import ( "github.com/stellar/go/xdr" ) -// readResult is the result of reading a bucket value -type readResult struct { - entryChange xdr.LedgerEntryChange - e error -} - // CheckpointChangeReader is a ChangeReader which returns Changes from a history archive // snapshot. The Changes produced by a CheckpointChangeReader reflect the state of the Stellar // network at a particular checkpoint ledger sequence. type CheckpointChangeReader struct { - ctx context.Context has *historyarchive.HistoryArchiveState archive historyarchive.ArchiveInterface visitedLedgerKeys set.Set[string] sequence uint32 - readChan chan readResult - streamOnce sync.Once - closeOnce sync.Once - done chan bool + // readChan is used to buffer ledger entries while streaming + // from the history archives. + readChan chan xdr.LedgerEntry + // closeChanOnce is used to ensure readChan is only closed once + closeChanOnce sync.Once + // ctx is used to terminate early in case of errors while streaming + // or if the reader is closed. + // To avoid goroutine leaks and deadlocks, every time readChan is + // read from or written to we should also include ctx.Done() in the + // select statement so we eliminate the possibility of blocking + // indefinitely. + ctx context.Context + streamOnce sync.Once + streamWaitGroup sync.WaitGroup + cancel context.CancelCauseFunc readBytesMutex sync.RWMutex totalRead int64 @@ -40,6 +45,11 @@ type CheckpointChangeReader struct { encodingBuffer *xdr.EncodingBuffer + bucketListType xdr.BucketListType + + ledgerEntryFilter func(xdr.LedgerEntry) bool + ledgerKeyFilter func(key xdr.LedgerKey) bool + // This should be set to true in tests only disableBucketListHashValidation bool sleep func(time.Duration) @@ -55,7 +65,33 @@ const ( msrBufferSize = 50000 ) -// NewCheckpointChangeReader constructs a new CheckpointChangeReader instance. +// CheckpointReaderOption configures a CheckpointChangeReader's behavior. +// Multiple options can be provided; when conflicting options are given, the last one wins. +type CheckpointReaderOption func(*CheckpointChangeReader) + +// DisableBucketListValidation disables validation of the bucket list +// while streaming from the history archives. +var DisableBucketListValidation = func(r *CheckpointChangeReader) { + r.disableBucketListHashValidation = true +} + +// WithFilter configures a filter on the CheckpointChangeReader so irrelevant +// ledger entries / ledger keys can be omitted when streaming from the +// history archives. +// nil values are equivalent to functions which return true for all ledger +// entries. +func WithFilter( + ledgerEntryFilter func(xdr.LedgerEntry) bool, + ledgerKeyFilter func(key xdr.LedgerKey) bool, +) CheckpointReaderOption { + return func(r *CheckpointChangeReader) { + r.ledgerEntryFilter = ledgerEntryFilter + r.ledgerKeyFilter = ledgerKeyFilter + } +} + +// NewCheckpointChangeReader constructs a new CheckpointChangeReader instance +// which enumerates ledger entries from the live bucket list. // // The ledger sequence must be a checkpoint ledger. By default (see // `historyarchive.ConnectOptions.CheckpointFrequency` for configuring this), @@ -65,6 +101,75 @@ func NewCheckpointChangeReader( ctx context.Context, archive historyarchive.ArchiveInterface, sequence uint32, + opts ...CheckpointReaderOption, +) (*CheckpointChangeReader, error) { + return newCheckpointChangeReaderWithBucketList( + ctx, + archive, + sequence, + xdr.BucketListTypeLive, + opts..., + ) +} + +// NewHotArchiveIterator constructs an iterator which enumerates +// ledger entries from the hot archive bucket list. +// +// The ledger sequence must be a checkpoint ledger. By default (see +// `historyarchive.ConnectOptions.CheckpointFrequency` for configuring this), +// its next sequence number would have to be a multiple of 64, e.g. +// sequence=100031 is a checkpoint ledger, since: (100031+1) mod 64 == 0 +func NewHotArchiveIterator( + ctx context.Context, + archive historyarchive.ArchiveInterface, + sequence uint32, + opts ...CheckpointReaderOption, +) iter.Seq2[xdr.LedgerEntry, error] { + return func(yield func(xdr.LedgerEntry, error) bool) { + r, err := newCheckpointChangeReaderWithBucketList( + ctx, + archive, + sequence, + xdr.BucketListTypeHotArchive, + opts..., + ) + if err != nil { + yield(xdr.LedgerEntry{}, err) + return + } + r.streamWaitGroup.Add(1) + go r.streamBucketList() + defer func() { + // the streamBucketList go routine writes to readChan + // so it is only safe to close it once that go routine + // terminates + r.streamWaitGroup.Wait() + r.closeReadChan() + }() + + for { + select { + case <-r.ctx.Done(): + yield(xdr.LedgerEntry{}, context.Cause(r.ctx)) + return + case entry, ok := <-r.readChan: + if !ok { + return + } + if !yield(entry, nil) { + return + } + } + } + } +} + +func newCheckpointChangeReaderWithBucketList( + ctx context.Context, + archive historyarchive.ArchiveInterface, + sequence uint32, + bucketListType xdr.BucketListType, + opts ...CheckpointReaderOption, ) (*CheckpointChangeReader, error) { manager := archive.GetCheckpointManager() @@ -84,19 +189,26 @@ func NewCheckpointChangeReader( return nil, errors.Wrapf(err, "unable to get checkpoint HAS at ledger sequence %d", sequence) } - return &CheckpointChangeReader{ + var cancel context.CancelCauseFunc + ctx, cancel = context.WithCancelCause(ctx) + r := &CheckpointChangeReader{ ctx: ctx, has: &has, archive: archive, visitedLedgerKeys: set.Set[string]{}, sequence: sequence, - readChan: make(chan readResult, msrBufferSize), - streamOnce: sync.Once{}, - closeOnce: sync.Once{}, - done: make(chan bool), + readChan: make(chan xdr.LedgerEntry, msrBufferSize), + cancel: cancel, encodingBuffer: xdr.NewEncodingBuffer(), + bucketListType: bucketListType, sleep: time.Sleep, - }, nil + } + + for _, opt := range opts { + opt(r) + } + + return r, nil } // VerifyBucketList verifies that the bucket list hash computed from the history archive snapshot @@ -126,7 +238,7 @@ func (r *CheckpointChangeReader) bucketExists(hash historyarchive.Hash) (bool, e return r.archive.BucketExists(hash) } -// streamBuckets is internal method that streams buckets from the given HAS. +// streamBucketList is internal method that streams buckets from the given HAS. // // Buckets should be processed from oldest to newest, `snap` and then `curr` at // each level. The correct value of ledger entry is the latest seen @@ -149,21 +261,30 @@ func (r *CheckpointChangeReader) bucketExists(hash historyarchive.Hash) (bool, e // In such algorithm we just need to store a set of keys that require much less space. // The memory requirements will be lowered when CAP-0020 is live and older buckets are // rewritten. Then, we will only need to keep track of `DEADENTRY`. -func (r *CheckpointChangeReader) streamBuckets() { +func (r *CheckpointChangeReader) streamBucketList() { defer func() { r.visitedLedgerKeys = nil - - r.closeOnce.Do(r.close) - close(r.readChan) + r.streamWaitGroup.Done() }() var buckets []historyarchive.Hash - for i := 0; i < len(r.has.CurrentBuckets); i++ { - b := r.has.CurrentBuckets[i] + // Select the bucket list based on configuration + var list historyarchive.BucketList + switch r.bucketListType { + case xdr.BucketListTypeLive: + list = r.has.CurrentBuckets + case xdr.BucketListTypeHotArchive: + list = r.has.HotArchiveBuckets + default: + r.cancel(errors.Errorf("Unsupported bucket list type: %d", r.bucketListType)) + return + } + for i := 0; i < len(list); i++ { + b := list[i] for _, hashString := range []string{b.Curr, b.Snap} { hash, err := historyarchive.DecodeHash(hashString) if err != nil { - r.readChan <- r.error(errors.Wrap(err, "Error decoding bucket hash")) + r.cancel(errors.Wrap(err, "Error decoding bucket hash")) return } @@ -178,24 +299,18 @@ func (r *CheckpointChangeReader) streamBuckets() { for _, hash := range buckets { exists, err := r.bucketExists(hash) if err != nil { - r.readChan <- r.error( - errors.Wrapf(err, "error checking if bucket exists: %s", hash), - ) + r.cancel(errors.Wrapf(err, "error checking if bucket exists: %s", hash)) return } if !exists { - r.readChan <- r.error( - errors.Errorf("bucket hash does not exist: %s", hash), - ) + r.cancel(errors.Errorf("bucket hash does not exist: %s", hash)) return } size, err := r.archive.BucketSize(hash) if err != nil { - r.readChan <- r.error( - errors.Wrapf(err, "error checking bucket size: %s", hash), - ) + r.cancel(errors.Wrapf(err, "error checking bucket size: %s", hash)) return } @@ -206,32 +321,44 @@ func (r *CheckpointChangeReader) streamBuckets() { for i, hash := range buckets { oldestBucket := i == len(buckets)-1 - if shouldContinue := r.streamBucketContents(hash, oldestBucket); !shouldContinue { - break + for ledgerEntry, err := range r.streamBucket(hash, oldestBucket) { + if err != nil { + r.cancel(err) + return + } + + select { + case r.readChan <- ledgerEntry: + case <-r.ctx.Done(): + return + } } } + + r.closeReadChan() } -// readBucketEntry will attempt to read a bucket entry from `stream`. -// If any errors are encountered while reading from `stream`, readBucketEntry will -// retry the operation using a new *historyarchive.XdrStream. -// The total number of retries will not exceed `maxStreamRetries`. -func (r *CheckpointChangeReader) readBucketEntry(stream *xdr.Stream, hash historyarchive.Hash) ( - xdr.BucketEntry, - error, -) { - var entry xdr.BucketEntry +func (r *CheckpointChangeReader) closeReadChan() { + r.closeChanOnce.Do(func() { + close(r.readChan) + }) +} + +// readBucketRecord attempts to read a single XDR record of type T from `stream`. +// If any errors are encountered while reading from `stream`, it retries the operation +// using a new *historyarchive.XdrStream. The total number of retries will not exceed +// `maxStreamRetries`. +func (r *CheckpointChangeReader) readBucketRecord(stream *xdr.Stream, hash historyarchive.Hash, entry xdr.DecoderFrom) error { var err error currentPosition := stream.BytesRead() gzipCurrentPosition := stream.CompressedBytesRead() for attempts := 0; ; attempts++ { if r.ctx.Err() != nil { - err = r.ctx.Err() - break + return r.ctx.Err() } if err == nil { - err = stream.ReadOne(&entry) + err = stream.ReadOne(entry) if err == nil || err == io.EOF { r.readBytesMutex.Lock() r.totalRead += stream.CompressedBytesRead() - gzipCurrentPosition @@ -261,7 +388,7 @@ func (r *CheckpointChangeReader) readBucketEntry(stream *xdr.Stream, hash histor } } - return entry, err + return err } func (r *CheckpointChangeReader) newXDRStream(hash historyarchive.Hash) ( @@ -278,201 +405,294 @@ func (r *CheckpointChangeReader) newXDRStream(hash historyarchive.Hash) ( return rdr, e } -// streamBucketContents pushes value onto the read channel, returning false when the channel needs to be closed otherwise true -func (r *CheckpointChangeReader) streamBucketContents(hash historyarchive.Hash, oldestBucket bool) bool { - rdr, e := r.newXDRStream(hash) - if e != nil { - r.readChan <- r.error( - errors.Wrapf(e, "cannot get xdr stream for hash '%s'", hash.String()), +func validateHotArchiveMetaEntry(index int, meta xdr.BucketMetadata, hash historyarchive.Hash) error { + if index != 0 { + return errors.Errorf( + "METAENTRY not the first entry (n=%d) in the bucket hash '%s'", + index, hash.String(), ) - return false } + if bucketListType, ok := meta.Ext.GetBucketListType(); !ok { + return errors.Errorf("METAENTRY missing bucket list type in the bucket hash '%s'", hash.String()) + } else if bucketListType != xdr.BucketListTypeHotArchive { + return errors.Errorf( + "expected bucket list type to be hot-archive (instead got %s) in the bucket hash '%s'", + bucketListType.String(), hash.String(), + ) + } + return nil +} - defer func() { - err := rdr.Close() - if err != nil { - r.readChan <- r.error(errors.Wrap(err, "Error closing xdr stream")) - // Stop streaming from the rest of the files. - r.Close() - } - }() - - // bucketProtocolVersion is a protocol version read from METAENTRY or 0 when no METAENTRY. - // No METAENTRY means that bucket originates from before protocol version 11. - bucketProtocolVersion := uint32(0) - - for n := 0; ; n++ { - var entry xdr.BucketEntry - entry, e = r.readBucketEntry(rdr, hash) - if e != nil { - if e == io.EOF { - // No entries loaded for this batch, nothing more to process - return true +func (r *CheckpointChangeReader) streamHotArchiveBucket(rdr *xdr.Stream, hash historyarchive.Hash, oldestBucket bool) iter.Seq2[xdr.LedgerEntry, error] { + return func(yield func(xdr.LedgerEntry, error) bool) { + for n := 0; ; n++ { + var entry xdr.HotArchiveBucketEntry + if err := r.readBucketRecord(rdr, hash, &entry); err != nil { + if err != io.EOF { + yield(xdr.LedgerEntry{}, errors.Wrapf(err, "Error on XDR record %d of hash '%s'", n, hash.String())) + } + return } - r.readChan <- r.error( - errors.Wrapf(e, "Error on XDR record %d of hash '%s'", n, hash.String()), - ) - return false - } - if entry.Type == xdr.BucketEntryTypeMetaentry { - if n != 0 { - r.readChan <- r.error( - errors.Errorf( - "METAENTRY not the first entry (n=%d) in the bucket hash '%s'", - n, hash.String(), - ), - ) - return false - } - metaEntry := entry.MustMetaEntry() - bucketProtocolVersion = uint32(metaEntry.LedgerVersion) - bucketListType, ok := metaEntry.Ext.GetBucketListType() - if ok && bucketListType != xdr.BucketListTypeLive { - r.readChan <- r.error( + if entry.Type == xdr.HotArchiveBucketEntryTypeHotArchiveMetaentry { + if err := validateHotArchiveMetaEntry(n, entry.MustMetaEntry(), hash); err != nil { + yield(xdr.LedgerEntry{}, err) + return + } + continue + } else if n == 0 { + yield(xdr.LedgerEntry{}, errors.Errorf( - "expected bucket list type to be live (instead got %s) in the bucket hash '%s'", - bucketListType.String(), hash.String(), + "METAENTRY not the first entry in the bucket hash '%s'", + hash.String(), ), ) - return false + return } - continue - } - var key xdr.LedgerKey - var err error + var key xdr.LedgerKey + switch entry.Type { + case xdr.HotArchiveBucketEntryTypeHotArchiveArchived: + ledgerEntry := entry.MustArchivedEntry() + if r.ledgerEntryFilter != nil && !r.ledgerEntryFilter(ledgerEntry) { + continue + } + k, err := ledgerEntry.LedgerKey() + if err != nil { + yield(xdr.LedgerEntry{}, errors.Wrapf(err, "Error generating ledger key for XDR record %d of hash '%s'", n, hash.String())) + return + } + key = k + case xdr.HotArchiveBucketEntryTypeHotArchiveLive: + key = entry.MustKey() + if r.ledgerKeyFilter != nil && !r.ledgerKeyFilter(key) { + continue + } + default: + yield(xdr.LedgerEntry{}, errors.Errorf("Unknown HotArchiveBucketEntryType=%d: %d@%s", entry.Type, n, hash.String())) + return + } - switch entry.Type { - case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: - liveEntry := entry.MustLiveEntry() - key, err = liveEntry.LedgerKey() + // We're using compressed keys here + // Safe, since we are converting to string right away + keyBytes, err := r.encodingBuffer.LedgerKeyUnsafeMarshalBinaryCompress(key) if err != nil { - r.readChan <- r.error( - errors.Wrapf(err, "Error generating ledger key for XDR record %d of hash '%s'", n, hash.String()), - ) - return false + yield(xdr.LedgerEntry{}, errors.Wrapf(err, "Error marshaling XDR record %d of hash '%s'", n, hash.String())) + return + } + h := string(keyBytes) + if !r.visitedLedgerKeys.Contains(h) { + // We skip adding entries from the last bucket to visitedLedgerKeys because: + // 1. Ledger keys are unique within a single bucket. + // 2. This is the last bucket we process so there's no need to track + // seen last entries in this bucket. + if !oldestBucket { + r.visitedLedgerKeys.Add(h) + } + if entry.Type == xdr.HotArchiveBucketEntryTypeHotArchiveArchived { + if !yield(entry.MustArchivedEntry(), nil) { + return + } + } } - case xdr.BucketEntryTypeDeadentry: - key = entry.MustDeadEntry() - default: - r.readChan <- r.error( - errors.Errorf("Unknown BucketEntryType=%d: %d@%s", entry.Type, n, hash.String()), - ) - return false } + } +} - // We're using compressed keys here - // Safe, since we are converting to string right away - keyBytes, e := r.encodingBuffer.LedgerKeyUnsafeMarshalBinaryCompress(key) - if e != nil { - r.readChan <- r.error( - errors.Wrapf( - e, "Error marshaling XDR record %d of hash '%s'", n, hash.String(), - ), - ) - return false - } +func validateLiveMetaEntry(index int, meta xdr.BucketMetadata, hash historyarchive.Hash) error { + if index != 0 { + return errors.Errorf( + "METAENTRY not the first entry (n=%d) in the bucket hash '%s'", + index, hash.String(), + ) + } + if bucketListType, ok := meta.Ext.GetBucketListType(); ok && bucketListType != xdr.BucketListTypeLive { + return errors.Errorf( + "expected bucket list type to be live (instead got %s) in the bucket hash '%s'", + bucketListType.String(), hash.String(), + ) + } + return nil +} - h := string(keyBytes) - // claimable balances and offers have unique ids - // once a claimable balance or offer is created we can assume that - // the id can never be recreated again, unlike, for example, trustlines - // which can be deleted and then recreated - unique := key.Type == xdr.LedgerEntryTypeClaimableBalance || - key.Type == xdr.LedgerEntryTypeOffer - - switch entry.Type { - case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: - if entry.Type == xdr.BucketEntryTypeInitentry && bucketProtocolVersion < 11 { - r.readChan <- r.error( - errors.Errorf("Read INITENTRY from version <11 bucket: %d@%s", n, hash.String()), - ) - return false +func (r *CheckpointChangeReader) streamLiveBucket(rdr *xdr.Stream, hash historyarchive.Hash, oldestBucket bool) iter.Seq2[xdr.LedgerEntry, error] { + // bucketProtocolVersion is a protocol version read from METAENTRY or 0 when no METAENTRY. + // No METAENTRY means that bucket originates from before protocol version 11. + bucketProtocolVersion := uint32(0) + return func(yield func(xdr.LedgerEntry, error) bool) { + for n := 0; ; n++ { + var entry xdr.BucketEntry + if err := r.readBucketRecord(rdr, hash, &entry); err != nil { + if err != io.EOF { + yield(xdr.LedgerEntry{}, errors.Wrapf(err, "Error on XDR record %d of hash '%s'", n, hash.String())) + } + return } - if !r.visitedLedgerKeys.Contains(h) { - // Return LEDGER_ENTRY_STATE changes only now. + if entry.Type == xdr.BucketEntryTypeMetaentry { + metaEntry := entry.MustMetaEntry() + bucketProtocolVersion = uint32(metaEntry.LedgerVersion) + if err := validateLiveMetaEntry(n, metaEntry, hash); err != nil { + yield(xdr.LedgerEntry{}, err) + return + } + continue + } + + var key xdr.LedgerKey + switch entry.Type { + case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: liveEntry := entry.MustLiveEntry() - entryChange := xdr.LedgerEntryChange{ - Type: xdr.LedgerEntryChangeTypeLedgerEntryState, - State: &liveEntry, + if r.ledgerEntryFilter != nil && !r.ledgerEntryFilter(liveEntry) { + continue + } + k, err := liveEntry.LedgerKey() + if err != nil { + yield(xdr.LedgerEntry{}, errors.Wrapf(err, "Error generating ledger key for XDR record %d of hash '%s'", n, hash.String())) + return } - r.readChan <- readResult{entryChange, nil} - - // We don't update `visitedLedgerKeys` for INITENTRY because CAP-20 says: - // > a bucket entry marked INITENTRY implies that either no entry - // > with the same ledger key exists in an older bucket, or else - // > that the (chronologically) preceding entry with the same ledger - // > key was DEADENTRY. - if entry.Type == xdr.BucketEntryTypeLiveentry { - // We skip adding entries from the last bucket to visitedLedgerKeys because: - // 1. Ledger keys are unique within a single bucket. - // 2. This is the last bucket we process so there's no need to track - // seen last entries in this bucket. - if oldestBucket { - continue + key = k + case xdr.BucketEntryTypeDeadentry: + key = entry.MustDeadEntry() + if r.ledgerKeyFilter != nil && !r.ledgerKeyFilter(key) { + continue + } + default: + yield(xdr.LedgerEntry{}, errors.Errorf("Unknown BucketEntryType=%d: %d@%s", entry.Type, n, hash.String())) + return + } + + // We're using compressed keys here + // Safe, since we are converting to string right away + keyBytes, err := r.encodingBuffer.LedgerKeyUnsafeMarshalBinaryCompress(key) + if err != nil { + yield(xdr.LedgerEntry{}, errors.Wrapf( + err, "Error marshaling XDR record %d of hash '%s'", n, hash.String(), + )) + return + } + h := string(keyBytes) + // claimable balances and offers have unique ids + // once a claimable balance or offer is created we can assume that + // the id can never be recreated again, unlike, for example, trustlines + // which can be deleted and then recreated + unique := key.Type == xdr.LedgerEntryTypeClaimableBalance || + key.Type == xdr.LedgerEntryTypeOffer + + switch entry.Type { + case xdr.BucketEntryTypeLiveentry, xdr.BucketEntryTypeInitentry: + if entry.Type == xdr.BucketEntryTypeInitentry && bucketProtocolVersion < 11 { + yield(xdr.LedgerEntry{}, errors.Errorf("Read INITENTRY from version <11 bucket: %d@%s", n, hash.String())) + return + } + if !r.visitedLedgerKeys.Contains(h) { + liveEntry := entry.MustLiveEntry() + if !yield(liveEntry, nil) { + return } - r.visitedLedgerKeys.Add(h) + // We don't update `visitedLedgerKeys` for INITENTRY because CAP-20 says: + // > a bucket entry marked INITENTRY implies that either no entry + // > with the same ledger key exists in an older bucket, or else + // > that the (chronologically) preceding entry with the same ledger + // > key was DEADENTRY. + if entry.Type == xdr.BucketEntryTypeLiveentry { + // We skip adding entries from the last bucket to visitedLedgerKeys because: + // 1. Ledger keys are unique within a single bucket. + // 2. This is the last bucket we process so there's no need to track + // seen last entries in this bucket. + if !oldestBucket { + r.visitedLedgerKeys.Add(h) + } + } + } else if entry.Type == xdr.BucketEntryTypeInitentry && unique { + // we can remove the ledger key because we know that it's unique in the ledger + // and cannot be recreated + r.visitedLedgerKeys.Remove(h) } - } else if entry.Type == xdr.BucketEntryTypeInitentry && unique { - // we can remove the ledger key because we know that it's unique in the ledger - // and cannot be recreated - r.visitedLedgerKeys.Remove(h) + case xdr.BucketEntryTypeDeadentry: + r.visitedLedgerKeys.Add(h) + default: + yield(xdr.LedgerEntry{}, errors.Errorf("Unexpected entry type %d: %d@%s", entry.Type, n, hash.String())) + return } - case xdr.BucketEntryTypeDeadentry: - r.visitedLedgerKeys.Add(h) - default: - r.readChan <- r.error( - errors.Errorf("Unexpected entry type %d: %d@%s", entry.Type, n, hash.String()), - ) - return false } + } +} - select { - case <-r.done: - // Close() called: stop processing buckets. - return false +// streamBucket returns an iterator over ledger entries for the given bucket hash. +// Any errors encountered during setup or iteration will be yielded via the iterator +// as an error value alongside a zero xdr.LedgerEntry. +func (r *CheckpointChangeReader) streamBucket(hash historyarchive.Hash, oldestBucket bool) iter.Seq2[xdr.LedgerEntry, error] { + return func(yield func(xdr.LedgerEntry, error) bool) { + rdr, e := r.newXDRStream(hash) + if e != nil { + yield(xdr.LedgerEntry{}, errors.Wrapf(e, "cannot get xdr stream for hash '%s'", hash.String())) + return + } + var closed bool + closeRdr := func() error { + if !closed { + closed = true + return rdr.Close() + } + return nil + } + defer closeRdr() + + var iterator iter.Seq2[xdr.LedgerEntry, error] + switch r.bucketListType { + case xdr.BucketListTypeLive: + iterator = r.streamLiveBucket(rdr, hash, oldestBucket) + case xdr.BucketListTypeHotArchive: + iterator = r.streamHotArchiveBucket(rdr, hash, oldestBucket) default: - continue + yield(xdr.LedgerEntry{}, errors.Errorf("Unsupported bucket list type: %d", r.bucketListType)) + return + } + + // Enumerate inner iterator and forward values. + for ledgerEntry, err := range iterator { + if !yield(ledgerEntry, err) { + return + } + if err != nil { + return + } } - } - panic("Shouldn't happen") + // After iteration completes, close the stream and propagate close error if any. + if err := closeRdr(); err != nil { + _ = yield(xdr.LedgerEntry{}, errors.Wrap(err, "Error closing xdr stream")) + } + } } // Read returns a new ledger entry change on each call, returning io.EOF when the stream ends. func (r *CheckpointChangeReader) Read() (Change, error) { r.streamOnce.Do(func() { - go r.streamBuckets() + r.streamWaitGroup.Add(1) + go r.streamBucketList() }) - // blocking call. anytime we consume from this channel, the background goroutine will stream in the next value - result, ok := <-r.readChan - if !ok { - // when channel is closed then return io.EOF - return Change{}, io.EOF - } - - if result.e != nil { - return Change{}, errors.Wrap(result.e, "Error while reading from buckets") - } - entryType, err := result.entryChange.EntryType() - if err != nil { - return Change{}, errors.Wrap(err, "Error getting entry type") + select { + case <-r.ctx.Done(): + // the streamBucketList go routine writes to readChan + // so it is only safe to close it once that go routine + // terminates + r.streamWaitGroup.Wait() + r.closeReadChan() + return Change{}, context.Cause(r.ctx) + case entry, ok := <-r.readChan: + if !ok { + // when channel is closed then return io.EOF + return Change{}, io.EOF + } + return Change{ + Type: entry.Data.Type, + ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryCreated, + Post: &entry, + }, nil } - return Change{ - Type: entryType, - Post: result.entryChange.State, - }, nil -} - -func (r *CheckpointChangeReader) error(err error) readResult { - return readResult{xdr.LedgerEntryChange{}, err} -} - -func (r *CheckpointChangeReader) close() { - close(r.done) } // Progress returns progress reading all buckets in percents. @@ -484,6 +704,6 @@ func (r *CheckpointChangeReader) Progress() float64 { // Close should be called when reading is finished. func (r *CheckpointChangeReader) Close() error { - r.closeOnce.Do(r.close) + r.cancel(errors.New("reader is closed")) return nil } diff --git a/ingest/checkpoint_change_reader_test.go b/ingest/checkpoint_change_reader_test.go index 13b1b3910e..4639e81be5 100644 --- a/ingest/checkpoint_change_reader_test.go +++ b/ingest/checkpoint_change_reader_test.go @@ -27,6 +27,7 @@ type CheckpointChangeReaderTestSuite struct { mockArchive *historyarchive.MockArchive reader *CheckpointChangeReader has historyarchive.HistoryArchiveState + parentCtxCancel context.CancelFunc mockBucketExistsCall *mock.Call mockBucketSizeCall *mock.Call } @@ -58,17 +59,17 @@ func (s *CheckpointChangeReaderTestSuite) SetupTest() { Return(historyarchive.NewCheckpointManager( historyarchive.DefaultCheckpointFrequency)) + var ctx context.Context + ctx, s.parentCtxCancel = context.WithCancel(context.Background()) s.reader, err = NewCheckpointChangeReader( - context.Background(), + ctx, s.mockArchive, ledgerSeq, + DisableBucketListValidation, ) s.Require().NotNil(s.reader) s.Require().NoError(err) s.Assert().Equal(ledgerSeq, s.reader.sequence) - - // Disable hash validation. We trust historyarchive.Stream tests here. - s.reader.disableBucketListHashValidation = true } func (s *CheckpointChangeReaderTestSuite) TearDownTest() { @@ -78,17 +79,17 @@ func (s *CheckpointChangeReaderTestSuite) TearDownTest() { // TestSimple test reading buckets with a single live entry. func (s *CheckpointChangeReaderTestSuite) TestSimple() { meta := metaEntry(23) - liveArchiveType := xdr.BucketListTypeLive + liveType := xdr.BucketListTypeLive meta.MetaEntry.Ext = xdr.BucketMetadataExt{ V: 1, - BucketListType: &liveArchiveType, + BucketListType: &liveType, } curr1 := createXdrStream( meta, entryAccount(xdr.BucketEntryTypeLiveentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), ) - nextBucket := s.getNextBucketChannel() + nextBucket := createBucketChannel(s.has.CurrentBuckets) // Return curr1 stream for the first bucket... s.mockArchive. @@ -114,6 +115,110 @@ func (s *CheckpointChangeReaderTestSuite) TestSimple() { s.Require().Equal(err, io.EOF) } +func (s *CheckpointChangeReaderTestSuite) TestReadAfterClose() { + meta := metaEntry(23) + liveType := xdr.BucketListTypeLive + meta.MetaEntry.Ext = xdr.BucketMetadataExt{ + V: 1, + BucketListType: &liveType, + } + curr1 := createXdrStream( + meta, + entryAccount(xdr.BucketEntryTypeLiveentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), + entryAccount(xdr.BucketEntryTypeLiveentry, "GCMNSW2UZMSH3ZFRLWP6TW2TG4UX4HLSYO5HNIKUSFMLN2KFSF26JKWF", 10), + ) + + nextBucket := createBucketChannel(s.has.CurrentBuckets) + + // Return curr1 stream for the first bucket... + s.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(curr1, nil).Once() + + // ...and empty streams for the rest of the buckets. + for hash := range nextBucket { + s.mockArchive. + On("GetXdrStreamForHash", hash). + Return(createXdrStream(), nil).Maybe() + } + + var e Change + var err error + e, err = s.reader.Read() + s.Require().NoError(err) + + id := e.Post.Data.MustAccount().AccountId + s.Assert().Equal("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", id.Address()) + + s.Require().NoError(s.reader.Close()) + + // there is a race condition because we are selecting on either + // the read channel or the context done channel so it is possible + // we receive the 2nd ledger entry before the context cancel error + for i := 0; i < 2; i++ { + e, err = s.reader.Read() + if err == nil { + id = e.Post.Data.MustAccount().AccountId + s.Assert().Equal("GCMNSW2UZMSH3ZFRLWP6TW2TG4UX4HLSYO5HNIKUSFMLN2KFSF26JKWF", id.Address()) + continue + } + break + } + s.Require().ErrorContains(err, "reader is closed") +} + +func (s *CheckpointChangeReaderTestSuite) TestContextCanceled() { + meta := metaEntry(23) + liveType := xdr.BucketListTypeLive + meta.MetaEntry.Ext = xdr.BucketMetadataExt{ + V: 1, + BucketListType: &liveType, + } + curr1 := createXdrStream( + meta, + entryAccount(xdr.BucketEntryTypeLiveentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), + entryAccount(xdr.BucketEntryTypeLiveentry, "GCMNSW2UZMSH3ZFRLWP6TW2TG4UX4HLSYO5HNIKUSFMLN2KFSF26JKWF", 10), + ) + + nextBucket := createBucketChannel(s.has.CurrentBuckets) + + // Return curr1 stream for the first bucket... + s.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(curr1, nil).Once() + + // ...and empty streams for the rest of the buckets. + for hash := range nextBucket { + s.mockArchive. + On("GetXdrStreamForHash", hash). + Return(createXdrStream(), nil).Maybe() + } + + var e Change + var err error + e, err = s.reader.Read() + s.Require().NoError(err) + + id := e.Post.Data.MustAccount().AccountId + s.Assert().Equal("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", id.Address()) + + s.parentCtxCancel() + + // there is a race condition because we are selecting on either + // the read channel or the context done channel so it is possible + // we receive the 2nd ledger entry before the context cancel error + for i := 0; i < 2; i++ { + e, err = s.reader.Read() + if err == nil { + id = e.Post.Data.MustAccount().AccountId + s.Assert().Equal("GCMNSW2UZMSH3ZFRLWP6TW2TG4UX4HLSYO5HNIKUSFMLN2KFSF26JKWF", id.Address()) + continue + } + break + } + s.Require().ErrorContains(err, "context canceled") +} + // TestRemoved test reading buckets with a single live entry that was removed. func (s *CheckpointChangeReaderTestSuite) TestRemoved() { curr1 := createXdrStream( @@ -124,7 +229,7 @@ func (s *CheckpointChangeReaderTestSuite) TestRemoved() { entryAccount(xdr.BucketEntryTypeLiveentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), ) - nextBucket := s.getNextBucketChannel() + nextBucket := createBucketChannel(s.has.CurrentBuckets) // Return curr1 and snap1 stream for the first two bucket... s.mockArchive. @@ -159,7 +264,7 @@ func (s *CheckpointChangeReaderTestSuite) TestConcurrentRead() { entryAccount(xdr.BucketEntryTypeLiveentry, "GCK45YKCFNIOICB4TWPCOPWLQYNUKCJVV7OMMHH55AB3DD67K4E54STO", 1), ) - nextBucket := s.getNextBucketChannel() + nextBucket := createBucketChannel(s.has.CurrentBuckets) // Return curr1 and snap1 stream for the first two bucket... s.mockArchive. @@ -204,7 +309,7 @@ func (s *CheckpointChangeReaderTestSuite) TestEnsureLatestLiveEntry() { entryAccount(xdr.BucketEntryTypeInitentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 2), ) - nextBucket := s.getNextBucketChannel() + nextBucket := createBucketChannel(s.has.CurrentBuckets) // Return curr1 stream, rest won't be read due to an error s.mockArchive. @@ -246,7 +351,7 @@ func (s *CheckpointChangeReaderTestSuite) TestUniqueInitEntryOptimization() { entryAccount(xdr.BucketEntryTypeInitentry, "GAIH3ULLFQ4DGSECF2AR555KZ4KNDGEKN4AFI4SU2M7B43MGK3QJZNSR", 1), ) - nextBucket := s.getNextBucketChannel() + nextBucket := createBucketChannel(s.has.CurrentBuckets) // Return curr1 and snap1 stream for the first two bucket... s.mockArchive. @@ -266,7 +371,7 @@ func (s *CheckpointChangeReaderTestSuite) TestUniqueInitEntryOptimization() { // replace readChan with an unbuffered channel so we can test behavior of when items are added / removed // from visitedLedgerKeys - s.reader.readChan = make(chan readResult, 0) + s.reader.readChan = make(chan xdr.LedgerEntry, 0) change, err := s.reader.Read() s.Require().NoError(err) @@ -369,7 +474,7 @@ func (s *CheckpointChangeReaderTestSuite) TestMalformedProtocol11Bucket() { metaEntry(11), ) - nextBucket := s.getNextBucketChannel() + nextBucket := createBucketChannel(s.has.CurrentBuckets) // Return curr1 stream, rest won't be read due to an error s.mockArchive. @@ -383,7 +488,7 @@ func (s *CheckpointChangeReaderTestSuite) TestMalformedProtocol11Bucket() { // Meta entry _, err = s.reader.Read() s.Require().NotNil(err) - s.Assert().Equal("Error while reading from buckets: METAENTRY not the first entry (n=1) in the bucket hash '517bea4c6627a688a8ce501febd8c562e737e3d86b29689d9956217640f3c74b'", err.Error()) + s.Assert().Equal("METAENTRY not the first entry (n=1) in the bucket hash '517bea4c6627a688a8ce501febd8c562e737e3d86b29689d9956217640f3c74b'", err.Error()) } // TestMalformedProtocol11BucketNoMeta tests a buggy protocol 11 bucket (no meta entry) @@ -392,7 +497,7 @@ func (s *CheckpointChangeReaderTestSuite) TestMalformedProtocol11BucketNoMeta() entryAccount(xdr.BucketEntryTypeInitentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), ) - nextBucket := s.getNextBucketChannel() + nextBucket := createBucketChannel(s.has.CurrentBuckets) // Return curr1 stream, rest won't be read due to an error s.mockArchive. @@ -402,7 +507,7 @@ func (s *CheckpointChangeReaderTestSuite) TestMalformedProtocol11BucketNoMeta() // Init entry without meta _, err := s.reader.Read() s.Require().NotNil(err) - s.Assert().Equal("Error while reading from buckets: Read INITENTRY from version <11 bucket: 0@517bea4c6627a688a8ce501febd8c562e737e3d86b29689d9956217640f3c74b", err.Error()) + s.Assert().Equal("Read INITENTRY from version <11 bucket: 0@517bea4c6627a688a8ce501febd8c562e737e3d86b29689d9956217640f3c74b", err.Error()) } // TestMalformedBucketListType ensures the checkpoint change reader asserts its reading from the live bucketlist @@ -418,7 +523,7 @@ func (s *CheckpointChangeReaderTestSuite) TestMalformedBucketListType() { entryAccount(xdr.BucketEntryTypeLiveentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), ) - nextBucket := s.getNextBucketChannel() + nextBucket := createBucketChannel(s.has.CurrentBuckets) // Return curr1 stream, rest won't be read due to an error s.mockArchive. @@ -428,7 +533,74 @@ func (s *CheckpointChangeReaderTestSuite) TestMalformedBucketListType() { // Meta entry _, err := s.reader.Read() s.Require().NotNil(err) - s.Assert().EqualError(err, "Error while reading from buckets: expected bucket list type to be live (instead got BucketListTypeHotArchive) in the bucket hash '517bea4c6627a688a8ce501febd8c562e737e3d86b29689d9956217640f3c74b'") + s.Assert().EqualError(err, "expected bucket list type to be live (instead got BucketListTypeHotArchive) in the bucket hash '517bea4c6627a688a8ce501febd8c562e737e3d86b29689d9956217640f3c74b'") +} + +// TestFilter exercises the WithFilter functionality by ignoring a DEADENTRY +// for a specific account in a newer bucket so that an older LIVEENTRY for +// that account is yielded. +func (s *CheckpointChangeReaderTestSuite) TestFilter() { + // Prepare streams: newer bucket has a DEADENTRY for A; older bucket has LIVEENTRY for A. + curr1 := createXdrStream( + entryAccount(xdr.BucketEntryTypeDeadentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), + entryCB(xdr.BucketEntryTypeInitentry, xdr.Hash{1, 2, 3}, 100), + ) + snap1 := createXdrStream( + entryAccount(xdr.BucketEntryTypeLiveentry, "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), + ) + + nextBucket := createBucketChannel(s.has.CurrentBuckets) + + // Mock to return curr1 then snap1 for the first two buckets... + s.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(curr1, nil).Once() + + s.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(snap1, nil).Once() + + // ...and empty streams for the rest of the buckets. + for hash := range nextBucket { + s.mockArchive. + On("GetXdrStreamForHash", hash). + Return(createXdrStream(), nil).Once() + } + + // Recreate reader with a filter that ignores the DEADENTRY for account A. + ctx := context.Background() + var err error + s.reader, err = NewCheckpointChangeReader( + ctx, + s.mockArchive, + s.reader.sequence, + DisableBucketListValidation, + WithFilter( + // accept all live entries + func(le xdr.LedgerEntry) bool { + return le.Data.Type != xdr.LedgerEntryTypeClaimableBalance + }, + // ignore DEADENTRY for our target account + func(key xdr.LedgerKey) bool { + if key.Type != xdr.LedgerEntryTypeAccount { + return true + } + return key.Account.AccountId.Address() != "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" + }, + ), + ) + s.Require().NoError(err) + + // We should now receive the older LIVEENTRY for the account because the + // newer DEADENTRY was filtered out. + change, err := s.reader.Read() + s.Require().NoError(err) + id := change.Post.Data.MustAccount().AccountId + s.Assert().Equal("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", id.Address()) + + // Then EOF + _, err = s.reader.Read() + s.Require().Equal(err, io.EOF) } func TestBucketExistsTestSuite(t *testing.T) { @@ -561,15 +733,16 @@ func (s *ReadBucketEntryTestSuite) TestReadAllEntries() { stream, err := s.reader.newXDRStream(emptyHash) s.Require().NoError(err) - entry, err := s.reader.readBucketEntry(stream, emptyHash) + var entry xdr.BucketEntry + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().NoError(err) s.Require().Equal(entry, firstEntry) - entry, err = s.reader.readBucketEntry(stream, emptyHash) + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().NoError(err) s.Require().Equal(entry, secondEntry) - _, err = s.reader.readBucketEntry(stream, emptyHash) + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().Equal(io.EOF, err) } @@ -585,7 +758,8 @@ func (s *ReadBucketEntryTestSuite) TestFirstReadFailsWithContextError() { s.Require().NoError(err) s.cancel() - _, err = s.reader.readBucketEntry(stream, emptyHash) + var entry xdr.BucketEntry + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().Equal(context.Canceled, err) } @@ -600,12 +774,13 @@ func (s *ReadBucketEntryTestSuite) TestSecondReadFailsWithContextError() { stream, err := s.reader.newXDRStream(emptyHash) s.Require().NoError(err) - entry, err := s.reader.readBucketEntry(stream, emptyHash) + var entry xdr.BucketEntry + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().NoError(err) s.Require().Equal(entry, firstEntry) s.cancel() - _, err = s.reader.readBucketEntry(stream, emptyHash) + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().Equal(context.Canceled, err) } @@ -621,7 +796,8 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryAllRetriesFail() { stream, err := s.reader.newXDRStream(emptyHash) s.Require().NoError(err) - _, err = s.reader.readBucketEntry(stream, emptyHash) + var entry xdr.BucketEntry + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().EqualError(err, "Read wrong number of bytes from XDR") } @@ -643,7 +819,8 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetryIgnoresProtocolCloseError() stream, err := s.reader.newXDRStream(emptyHash) s.Require().NoError(err) - entry, err := s.reader.readBucketEntry(stream, emptyHash) + var entry xdr.BucketEntry + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().NoError(err) s.Require().Equal(entry, expectedEntry) @@ -651,7 +828,7 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetryIgnoresProtocolCloseError() s.Require().Equal(historyarchive.Hash(hash), emptyHash) s.Require().True(ok) - _, err = s.reader.readBucketEntry(stream, emptyHash) + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().Equal(err, io.EOF) } @@ -670,7 +847,8 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetryFailsToCreateNewStream() { stream, err := s.reader.newXDRStream(emptyHash) s.Require().NoError(err) - _, err = s.reader.readBucketEntry(stream, emptyHash) + var entry xdr.BucketEntry + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().EqualError(err, "Error creating new xdr stream: cannot create new stream") } @@ -695,11 +873,12 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetrySucceedsAfterFailsToCreateN stream, err := s.reader.newXDRStream(emptyHash) s.Require().NoError(err) - entry, err := s.reader.readBucketEntry(stream, emptyHash) + var entry xdr.BucketEntry + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().NoError(err) s.Require().Equal(entry, firstEntry) - _, err = s.reader.readBucketEntry(stream, emptyHash) + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().Equal(io.EOF, err) } @@ -722,7 +901,8 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetrySucceeds() { stream, err := s.reader.newXDRStream(emptyHash) s.Require().NoError(err) - entry, err := s.reader.readBucketEntry(stream, emptyHash) + var entry xdr.BucketEntry + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().NoError(err) s.Require().Equal(entry, expectedEntry) @@ -730,7 +910,7 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetrySucceeds() { s.Require().Equal(historyarchive.Hash(hash), emptyHash) s.Require().True(ok) - _, err = s.reader.readBucketEntry(stream, emptyHash) + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().Equal(err, io.EOF) } @@ -755,11 +935,12 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetrySucceedsWithDiscard() { stream, err := s.reader.newXDRStream(emptyHash) s.Require().NoError(err) - entry, err := s.reader.readBucketEntry(stream, emptyHash) + var entry xdr.BucketEntry + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().NoError(err) s.Require().Equal(entry, firstEntry) - entry, err = s.reader.readBucketEntry(stream, emptyHash) + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().NoError(err) s.Require().Equal(entry, secondEntry) @@ -767,7 +948,7 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetrySucceedsWithDiscard() { s.Require().Equal(historyarchive.Hash(hash), emptyHash) s.Require().True(ok) - _, err = s.reader.readBucketEntry(stream, emptyHash) + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().Equal(err, io.EOF) } @@ -790,11 +971,12 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetryFailsWithDiscardError() { stream, err := s.reader.newXDRStream(emptyHash) s.Require().NoError(err) - entry, err := s.reader.readBucketEntry(stream, emptyHash) + var entry xdr.BucketEntry + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().NoError(err) s.Require().Equal(entry, firstEntry) - _, err = s.reader.readBucketEntry(stream, emptyHash) + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().EqualError(err, "Error discarding from xdr stream: EOF") } @@ -826,15 +1008,16 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetrySucceedsAfterDiscardError() stream, err := s.reader.newXDRStream(emptyHash) s.Require().NoError(err) - entry, err := s.reader.readBucketEntry(stream, emptyHash) + var entry xdr.BucketEntry + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().NoError(err) s.Require().Equal(entry, firstEntry) - entry, err = s.reader.readBucketEntry(stream, emptyHash) + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().NoError(err) s.Require().Equal(entry, secondEntry) - _, err = s.reader.readBucketEntry(stream, emptyHash) + err = s.reader.readBucketRecord(stream, emptyHash, &entry) s.Require().Equal(io.EOF, err) } @@ -997,7 +1180,7 @@ func writeInvalidFrame(b *bytes.Buffer) { b.Truncate(bufferSize + frameSize/2) } -func createXdrStream(entries ...xdr.BucketEntry) *xdr.Stream { +func createXdrStream(entries ...interface{}) *xdr.Stream { b := &bytes.Buffer{} for _, e := range entries { err := xdr.MarshalFramed(b, e) @@ -1013,15 +1196,15 @@ func xdrStreamFromBuffer(b *bytes.Buffer) *xdr.Stream { return xdr.NewStream(ioutil.NopCloser(b)) } -// getNextBucket is a helper that returns next bucket hash in the order of processing. +// createBucketChannel is a helper that returns next bucket hash in the order of processing. // This allows to write simpler test code that ensures that mocked calls are in a // correct order. -func (s *CheckpointChangeReaderTestSuite) getNextBucketChannel() <-chan (historyarchive.Hash) { +func createBucketChannel(buckets historyarchive.BucketList) <-chan (historyarchive.Hash) { // 11 levels with 2 buckets each = buffer of 22 c := make(chan (historyarchive.Hash), 22) - for i := 0; i < len(s.has.CurrentBuckets); i++ { - b := s.has.CurrentBuckets[i] + for i := 0; i < len(buckets); i++ { + b := buckets[i] curr := historyarchive.MustDecodeHash(b.Curr) if !curr.IsZero() { diff --git a/ingest/hot_archive_iterator_test.go b/ingest/hot_archive_iterator_test.go new file mode 100644 index 0000000000..e01ede60dc --- /dev/null +++ b/ingest/hot_archive_iterator_test.go @@ -0,0 +1,426 @@ +package ingest + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/xdr" +) + +var hasWithHotArchiveExample = `{ + "version": 1, + "server": "v11.1.0", + "currentLedger": 24123007, + "hotArchiveBuckets": [ + { + "curr": "517bea4c6627a688a8ce501febd8c562e737e3d86b29689d9956217640f3c74b", + "next": { + "state": 0 + }, + "snap": "75c8c5540a825da61e05ae23d0b0be9d29f2bdb8fdfa550a3f3496f030f62ffd" + }, + { + "curr": "5bca6165dbf6832ff4550e67d0e564eca56494acfc9b7fd46c740f4d66c74609", + "next": { + "state": 1, + "output": "75c8c5540a825da61e05ae23d0b0be9d29f2bdb8fdfa550a3f3496f030f62ffd" + }, + "snap": "b6bad6183a3394087aae3d05ed393c5dcb80e35ed557e2c8935cba855f20dfa5" + }, + { + "curr": "56b70bb56fcb27dd05759b00b07bc3c9dc7cc6dbfc9d409cfec2a41d9fd4a1e8", + "next": { + "state": 1, + "output": "cfa973ce4ba1fbdf3b5767e398a5b7b86e30461855d24b7b50dc499eb313b4d0" + }, + "snap": "974a089a6980bf25d8ad1a6a71370bac2663e9bb14702ba90b9db657464c0b3a" + }, + { + "curr": "16742c8e61a4dde3b35179bedbdd7c56e67d03a5faf8973a6094c57e430322df", + "next": { + "state": 1, + "output": "ef39804657a928139750e801c63d1d911532d7d126c80f151ba362f49147972e" + }, + "snap": "b415a283c5e33d8c425cbb003a86c780f73e8d2016fb5dcc6ba1477e551a2c66" + }, + { + "curr": "b081e1c075c9114a6c74cf87a0767ee877f02e88e18a8bf97b8f268ff120ad0d", + "next": { + "state": 1, + "output": "162b859558c7c51c6416f659dbd8d70236c75540196e5d7a5dee2a66744ebbf5" + }, + "snap": "66f8fb3f36bbe328bbbe14151951891d455ad0fba1d19d05531226c0909a84c7" + }, + { + "curr": "822b766e755e83d4ad08a38e86466f47452a2d7c4702295ebd3235332db76a05", + "next": { + "state": 1, + "output": "1c04dc66c3410efc535044f4250c02490627b549f99a8873e4857b2cec4d51c8" + }, + "snap": "163a49fa560761217710f6bbbf85179514aa7714d373337dde7f200f8d6c623a" + }, + { + "curr": "75b77814875529876258760ed6b6f37d81b5a39183812c684b9e3014bb6b8cf6", + "next": { + "state": 1, + "output": "444088f447eb7ea3d397e7098d57c4f63b66912d24c4a26a29bf1dde7a4fdecc" + }, + "snap": "35472156c463eaf62867c9b229b92e8192e2fe40cf86e269cab65fd0045c996f" + }, + { + "curr": "b331675d693bdb4456f409083a1b8cbadbcef977df765ba7d4ddd787800bdc84", + "next": { + "state": 1, + "output": "3d9627fa5ef81486688dc584f52445560a55496d3b961a7664b0e536655179bb" + }, + "snap": "5a7996730755a90ea5cbd2d726a982f3f3703c3db8bc2a2217bd496b9c0cf3d1" + }, + { + "curr": "11f8c2f8e1cb0d47576f74d9e2fa838f5f3a37180907a24a85d0ad8b647862e4", + "next": { + "state": 1, + "output": "6c0133dfd0411f9975c74d792911bb80fc1555830a943249cea6c2a80e5064d1" + }, + "snap": "48f435285dd96511d0822f7ae1a20e28c6c28019e385313713655fc76fe3bc03" + }, + { + "curr": "5f351041761b45f3e725f98bb8b6713873e30ab6c8aee56ba0823d357c7ebd0d", + "next": { + "state": 1, + "output": "264d3a93bc5fff47a968cc53f0f2f50297e5f9015300bbc357cfb8dec30899c6" + }, + "snap": "4100ad3b1085bd14d1c808ece3b38db97171532d0d11ed5edd57aff0e416e06a" + }, + { + "curr": "a4811c9ba9505e421f0015e5fcfd9f5d204ae85b584766759e844ef85db10d47", + "next": { + "state": 1, + "output": "be4ecc289998a40319be24662c88f161f5e78d4be846b083923614573aa17336" + }, + "snap": "0000000000000000000000000000000000000000000000000000000000000000" + } + ] +}` + +func hotArchiveMetaEntry(version uint32) xdr.HotArchiveBucketEntry { + listType := xdr.BucketListTypeHotArchive + return xdr.HotArchiveBucketEntry{ + Type: xdr.HotArchiveBucketEntryTypeHotArchiveMetaentry, + MetaEntry: &xdr.BucketMetadata{ + LedgerVersion: xdr.Uint32(version), + Ext: xdr.BucketMetadataExt{ + V: 1, + BucketListType: &listType, + }, + }, + } +} + +func archivedBucketEntry(id string, balance uint32) xdr.HotArchiveBucketEntry { + return xdr.HotArchiveBucketEntry{ + Type: xdr.HotArchiveBucketEntryTypeHotArchiveArchived, + ArchivedEntry: &xdr.LedgerEntry{ + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.AccountEntry{ + AccountId: xdr.MustAddress(id), + Balance: xdr.Int64(balance), + }, + }, + }, + } +} + +func archivedLiveEntry(id string) xdr.HotArchiveBucketEntry { + return xdr.HotArchiveBucketEntry{ + Type: xdr.HotArchiveBucketEntryTypeHotArchiveLive, + Key: &xdr.LedgerKey{ + Type: xdr.LedgerEntryTypeAccount, + Account: &xdr.LedgerKeyAccount{ + AccountId: xdr.MustAddress(id), + }, + }, + } +} + +func TestHotArchiveIteratorTestSuite(t *testing.T) { + suite.Run(t, new(HotArchiveIteratorTestSuite)) +} + +type HotArchiveIteratorTestSuite struct { + suite.Suite + mockArchive *historyarchive.MockArchive + has historyarchive.HistoryArchiveState + ledgerSeq uint32 +} + +func (h *HotArchiveIteratorTestSuite) SetupTest() { + require.NoError(h.T(), json.Unmarshal([]byte(hasWithHotArchiveExample), &h.has)) + + h.mockArchive = &historyarchive.MockArchive{} + h.ledgerSeq = 24123007 + + h.mockArchive. + On("GetCheckpointHAS", h.ledgerSeq). + Return(h.has, nil) + + // BucketExists should be called 21 times (11 levels, last without `snap`) + h.mockArchive. + On("BucketExists", mock.AnythingOfType("historyarchive.Hash")). + Return(true, nil).Times(21) + + // BucketSize should be called 21 times (11 levels, last without `snap`) + h.mockArchive. + On("BucketSize", mock.AnythingOfType("historyarchive.Hash")). + Return(int64(100), nil).Times(21) + + h.mockArchive. + On("GetCheckpointManager"). + Return(historyarchive.NewCheckpointManager( + historyarchive.DefaultCheckpointFrequency)) +} + +func (h *HotArchiveIteratorTestSuite) TearDownTest() { + h.mockArchive.AssertExpectations(h.T()) +} + +func (h *HotArchiveIteratorTestSuite) TestIteration() { + curr1 := createXdrStream( + hotArchiveMetaEntry(24), + archivedBucketEntry("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), + archivedBucketEntry("GALPCCZN4YXA3YMJHKL6CVIECKPLJJCTVMSNYWBTKJW4K5HQLYLDMZTB", 100), + archivedLiveEntry("GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"), + ) + + snap1 := createXdrStream( + hotArchiveMetaEntry(24), + archivedBucketEntry("GALPCCZN4YXA3YMJHKL6CVIECKPLJJCTVMSNYWBTKJW4K5HQLYLDMZTB", 50), + archivedLiveEntry("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + archivedBucketEntry("GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", 600), + ) + + nextBucket := createBucketChannel(h.has.HotArchiveBuckets) + + // Return curr1 and snap1 stream for the first two bucket... + h.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(curr1, nil).Once() + + h.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(snap1, nil).Once() + + // ...and empty streams for the rest of the buckets. + for hash := range nextBucket { + h.mockArchive. + On("GetXdrStreamForHash", hash). + Return(createXdrStream(), nil).Once() + } + + expectedAccounts := []string{ + "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", + "GALPCCZN4YXA3YMJHKL6CVIECKPLJJCTVMSNYWBTKJW4K5HQLYLDMZTB", + } + expectedBalances := []uint32{1, 100} + + i := 0 + for ledgerEntry, err := range NewHotArchiveIterator( + context.Background(), + h.mockArchive, + h.ledgerSeq, + DisableBucketListValidation, + ) { + h.Require().NoError(err) + h.Require().Less(i, len(expectedAccounts)) + h.Require().Equal(expectedAccounts[i], ledgerEntry.Data.Account.AccountId.Address()) + h.Require().Equal(expectedBalances[i], uint32(ledgerEntry.Data.Account.Balance)) + i++ + } +} + +// TestFilter exercises WithFilter for the hot archive iterator by ignoring a +// HotArchiveLive key in a newer bucket so that an older ArchivedEntry is yielded. +func (h *HotArchiveIteratorTestSuite) TestFilter() { + curr1 := createXdrStream( + hotArchiveMetaEntry(24), + // Newest bucket contains only a live key for A + archivedLiveEntry("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"), + archivedBucketEntry("GALPCCZN4YXA3YMJHKL6CVIECKPLJJCTVMSNYWBTKJW4K5HQLYLDMZTB", 100), + ) + + snap1 := createXdrStream( + hotArchiveMetaEntry(24), + // Older bucket contains the archived entry we expect to receive + archivedBucketEntry("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 50), + ) + + nextBucket := createBucketChannel(h.has.HotArchiveBuckets) + + // Return curr1 and snap1 for the first two buckets... + h.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(curr1, nil).Once() + h.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(snap1, nil).Once() + + // ...and empty streams for the rest of the buckets. + for hash := range nextBucket { + h.mockArchive. + On("GetXdrStreamForHash", hash). + Return(createXdrStream(), nil).Once() + } + + i := 0 + for ledgerEntry, err := range NewHotArchiveIterator( + context.Background(), + h.mockArchive, + h.ledgerSeq, + DisableBucketListValidation, + WithFilter( + // accept all archived entries + func(le xdr.LedgerEntry) bool { + if le.Data.Type != xdr.LedgerEntryTypeAccount { + return true + } + return le.Data.Account.AccountId.Address() != "GALPCCZN4YXA3YMJHKL6CVIECKPLJJCTVMSNYWBTKJW4K5HQLYLDMZTB" + }, + // ignore the live key for our target account so the older archived entry is not suppressed + func(key xdr.LedgerKey) bool { + if key.Type != xdr.LedgerEntryTypeAccount { + return true + } + return key.Account.AccountId.Address() != "GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML" + }, + ), + ) { + h.Require().NoError(err) + h.Require().Zero(i) + h.Require().Equal("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", ledgerEntry.Data.Account.AccountId.Address()) + h.Require().Equal(uint32(50), uint32(ledgerEntry.Data.Account.Balance)) + i++ + } +} + +func (h *HotArchiveIteratorTestSuite) TestMetaEntryNotFirst() { + curr1 := createXdrStream( + archivedBucketEntry("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), + hotArchiveMetaEntry(24), + ) + + nextBucket := createBucketChannel(h.has.HotArchiveBuckets) + + // Return curr1 stream for the first bucket... + h.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(curr1, nil).Once() + + // ...and empty streams for the rest of the buckets. + for hash := range nextBucket { + h.mockArchive. + On("GetXdrStreamForHash", hash). + Return(createXdrStream(), nil).Maybe() + } + + i := 0 + for _, err := range NewHotArchiveIterator( + context.Background(), + h.mockArchive, + h.ledgerSeq, + DisableBucketListValidation, + ) { + if i == 0 { + h.Require().ErrorContains(err, "METAENTRY not the first entry in the bucket hash") + } else { + h.Require().FailNow("expected at most 1 element") + } + i++ + } +} + +func (h *HotArchiveIteratorTestSuite) TestMissingBucketListType() { + curr1 := createXdrStream( + xdr.HotArchiveBucketEntry{ + Type: xdr.HotArchiveBucketEntryTypeHotArchiveMetaentry, + MetaEntry: &xdr.BucketMetadata{ + LedgerVersion: xdr.Uint32(24), + }, + }, + archivedBucketEntry("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), + ) + + nextBucket := createBucketChannel(h.has.HotArchiveBuckets) + + // Return curr1 stream for the first bucket... + h.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(curr1, nil).Once() + + // ...and empty streams for the rest of the buckets. + for hash := range nextBucket { + h.mockArchive. + On("GetXdrStreamForHash", hash). + Return(createXdrStream(), nil).Maybe() + } + + i := 0 + for _, err := range NewHotArchiveIterator( + context.Background(), + h.mockArchive, + h.ledgerSeq, + DisableBucketListValidation, + ) { + h.Require().Zero(i) + h.Require().ErrorContains(err, "METAENTRY missing bucket list type") + i++ + } +} + +func (h *HotArchiveIteratorTestSuite) TestInvalidBucketListType() { + listType := xdr.BucketListTypeLive + curr1 := createXdrStream( + xdr.HotArchiveBucketEntry{ + Type: xdr.HotArchiveBucketEntryTypeHotArchiveMetaentry, + MetaEntry: &xdr.BucketMetadata{ + LedgerVersion: xdr.Uint32(24), + Ext: xdr.BucketMetadataExt{ + V: 1, + BucketListType: &listType, + }, + }, + }, + archivedBucketEntry("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", 1), + ) + + nextBucket := createBucketChannel(h.has.HotArchiveBuckets) + + // Return curr1 stream for the first bucket... + h.mockArchive. + On("GetXdrStreamForHash", <-nextBucket). + Return(curr1, nil).Once() + + // ...and empty streams for the rest of the buckets. + for hash := range nextBucket { + h.mockArchive. + On("GetXdrStreamForHash", hash). + Return(createXdrStream(), nil).Maybe() + } + + i := 0 + for _, err := range NewHotArchiveIterator( + context.Background(), + h.mockArchive, + h.ledgerSeq, + DisableBucketListValidation, + ) { + h.Require().Zero(i) + h.Require().ErrorContains(err, "expected bucket list type to be hot-") + i++ + } +} diff --git a/ingest/sac/contract_data.go b/ingest/sac/contract_data.go index d354fa4bc4..23a7e1f146 100644 --- a/ingest/sac/contract_data.go +++ b/ingest/sac/contract_data.go @@ -178,6 +178,21 @@ func AssetFromContractData(ledgerEntry xdr.LedgerEntry, passphrase string) (xdr. return asset, true } +// ValidAssetEntryLedgerKey checks if a given ledger key could correspond +// to a valid asset entry for a contract. +func ValidAssetEntryLedgerKey(ledgerKey xdr.LedgerKey) bool { + if ledgerKey.Type != xdr.LedgerEntryTypeContractData { + return false + } + if ledgerKey.ContractData.Key.Type != xdr.ScValTypeScvLedgerKeyContractInstance { + return false + } + if ledgerKey.ContractData.Durability != xdr.ContractDataDurabilityPersistent { + return false + } + return true +} + // ContractBalanceFromContractData takes a ledger entry and verifies that the // ledger entry corresponds to the balance entry written to contract storage by // the Stellar Asset Contract. @@ -259,6 +274,33 @@ func ContractBalanceFromContractData(ledgerEntry xdr.LedgerEntry, passphrase str return holder, amt, true } +// ValidContractBalanceLedgerKey verifies if the provided ledgerKey could represent +// a valid contract balance ledger key. Specifically, a valid contract balance ledger key: +// - Is of type ContractData with Persistent durability. +// - Has a key that is a 2-element vector, where the first element is the balance metadata symbol ("Balance"). +func ValidContractBalanceLedgerKey(ledgerKey xdr.LedgerKey) bool { + if ledgerKey.Type != xdr.LedgerEntryTypeContractData { + return false + } + if ledgerKey.ContractData.Durability != xdr.ContractDataDurabilityPersistent { + return false + } + keyEnumVecPtr, ok := ledgerKey.ContractData.Key.GetVec() + if !ok || keyEnumVecPtr == nil { + return false + } + keyEnumVec := *keyEnumVecPtr + if len(keyEnumVec) != 2 || !keyEnumVec[0].Equals( + xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &balanceMetadataSym, + }, + ) { + return false + } + return true +} + func metadataObjFromAsset(isNative bool, code, issuer string) (*xdr.ScMap, error) { assetInfoVecKey := &xdr.ScVec{ xdr.ScVal{ diff --git a/ingest/tutorial/expiring-sac-balances/main.go b/ingest/tutorial/expiring-sac-balances/main.go new file mode 100644 index 0000000000..06415d0d05 --- /dev/null +++ b/ingest/tutorial/expiring-sac-balances/main.go @@ -0,0 +1,220 @@ +package main + +import ( + "context" + "crypto/sha256" + "encoding/csv" + "io" + "log" + "os" + "strconv" + + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/sac" + "github.com/stellar/go/network" + "github.com/stellar/go/strkey" + "github.com/stellar/go/xdr" +) + +var ( + archiveURLs = []string{ + "https://history.stellar.org/prd/core-live/core_live_001", + "https://history.stellar.org/prd/core-live/core_live_002", + "https://history.stellar.org/prd/core-live/core_live_003", + } + targetAssets = map[string]string{ + "CDTKPWPLOURQA2SGTKTUQOWRCBZEORB4BWBOMJ3D3ZTQQSGE5F6JBQLV": "EURC", + "CCW67TSZV3SSS2HXMBQ5JFGCKJNXKZM7UQUWUZPUTHXSTZLEO7SJMI75": "USDC", + } + cutoffLedger uint32 = 60739011 +) + +type balance struct { + ledgerKey string + asset string + holderAddress string + amount string + ttl string +} + +func findEvictedBalances(ctx context.Context, arch historyarchive.ArchiveInterface, checkpointLedger uint32, targetAssets map[string]string) []balance { + var balances []balance + for ledgerEntry, err := range ingest.NewHotArchiveIterator(ctx, arch, checkpointLedger) { + if err != nil { + log.Fatalf("error while reading hot archive ledger entries: %v", err) + } + if ledgerEntry.Data.Type != xdr.LedgerEntryTypeContractData { + continue + } + + contractID, ok := ledgerEntry.Data.MustContractData().Contract.GetContractId() + if !ok { + continue + } + sacAddress := strkey.MustEncode(strkey.VersionByteContract, contractID[:]) + asset, ok := targetAssets[sacAddress] + if !ok { + continue + } + + holder, amt, ok := sac.ContractBalanceFromContractData(ledgerEntry, network.PublicNetworkPassphrase) + if !ok { + continue + } + + ledgerKey, err := ledgerEntry.LedgerKey() + if err != nil { + log.Fatalf("error while extracting ledger key: %v", err) + } + ledgerKeyBase64, err := ledgerKey.MarshalBinaryBase64() + if err != nil { + log.Fatalf("error while marshaling ledger key: %v", err) + } + + balances = append(balances, balance{ + ledgerKey: ledgerKeyBase64, + asset: asset, + holderAddress: strkey.MustEncode(strkey.VersionByteContract, holder[:]), + amount: amt.String(), + ttl: "evicted", + }) + } + return balances +} + +func findExpiringBalances(ctx context.Context, arch historyarchive.ArchiveInterface, checkpointLedger, expirationLedger uint32, targetAssets map[string]string) []balance { + var balances []balance + byKeyHash := map[xdr.Hash]balance{} + ttls := map[xdr.Hash]uint32{} + + reader, err := ingest.NewCheckpointChangeReader(ctx, arch, checkpointLedger) + if err != nil { + log.Fatalf("failed to create checkpoint change reader: %v", err) + } + defer reader.Close() + + for { + change, err := reader.Read() + if err == io.EOF { + break + } + if err != nil { + log.Fatalf("error while reading checkpoint changes: %v", err) + } + + if change.Type == xdr.LedgerEntryTypeTtl { + ttlEntry := change.Post.Data.MustTtl() + ttls[ttlEntry.KeyHash] = uint32(ttlEntry.LiveUntilLedgerSeq) + } + + if change.Type != xdr.LedgerEntryTypeContractData || change.Post == nil { + continue + } + + le := *change.Post + contractID, ok := le.Data.MustContractData().Contract.GetContractId() + if !ok { + continue + } + sacAddress := strkey.MustEncode(strkey.VersionByteContract, contractID[:]) + asset, ok := targetAssets[sacAddress] + if !ok { + continue + } + + holder, amt, ok := sac.ContractBalanceFromContractData(le, network.PublicNetworkPassphrase) + if !ok { + continue + } + + ledgerKey, err := le.LedgerKey() + if err != nil { + log.Fatalf("error while extracting ledger key: %v", err) + } + bin, err := ledgerKey.MarshalBinary() + if err != nil { + log.Fatalf("error while marshaling ledger key: %v", err) + } + keyHash := xdr.Hash(sha256.Sum256(bin)) + if _, ok := byKeyHash[keyHash]; ok { + log.Fatalf("duplicate key hash: %v", keyHash) + } + + ledgerKeyBase64, err := ledgerKey.MarshalBinaryBase64() + if err != nil { + log.Fatalf("error while marshaling ledger key: %v", err) + } + + byKeyHash[keyHash] = balance{ + ledgerKey: ledgerKeyBase64, + asset: asset, + holderAddress: strkey.MustEncode(strkey.VersionByteContract, holder[:]), + amount: amt.String(), + } + } + + for keyHash, b := range byKeyHash { + ttl, ok := ttls[keyHash] + if !ok { + log.Fatalf("missing ttl for key hash: %v", keyHash) + } + if ttl > expirationLedger { + continue + } + b.ttl = strconv.FormatUint(uint64(ttl), 10) + balances = append(balances, b) + } + return balances +} + +// finds all SAC balances for a given list of assets that are either archived +// or close to being archived +func main() { + arch, err := historyarchive.NewArchivePool(archiveURLs, historyarchive.ArchiveOptions{}) + if err != nil { + log.Fatalf("failed to connect to pubnet archives: %v", err) + } + + // Determine the latest published checkpoint ledger sequence + seq, err := arch.GetLatestLedgerSequence() + if err != nil { + log.Fatalf("failed to get latest checkpoint sequence: %v", err) + } + + // Always write to stdout + csvw := csv.NewWriter(os.Stdout) + // header + if err = csvw.Write([]string{"asset", "ledger_key", "holder_contract_id", "amount", "ttl"}); err != nil { + log.Fatalf("failed writing CSV: %v", err) + } + + for _, b := range findExpiringBalances(context.Background(), arch, seq, cutoffLedger, targetAssets) { + if err = csvw.Write([]string{ + b.asset, + b.ledgerKey, + b.holderAddress, + b.amount, + b.ttl, + }); err != nil { + log.Fatalf("failed writing CSV: %v", err) + } + } + + for _, b := range findEvictedBalances(context.Background(), arch, seq, targetAssets) { + if err = csvw.Write([]string{ + b.asset, + b.ledgerKey, + b.holderAddress, + b.amount, + b.ttl, + }); err != nil { + log.Fatalf("failed writing CSV: %v", err) + } + } + + csvw.Flush() + if err := csvw.Error(); err != nil { + log.Fatalf("failed writing CSV: %v", err) + } +}