Skip to content

Commit 6a101b5

Browse files
committed
chore: optimize checksum calculation
1 parent 093aed2 commit 6a101b5

File tree

17 files changed

+217
-102
lines changed

17 files changed

+217
-102
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
gorm/
22
.idea
3+
.DS_Store

checksum_row_iterator.go

Lines changed: 66 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
package spannerdriver
1616

1717
import (
18-
"bytes"
1918
"context"
2019
"crypto/sha256"
20+
"encoding/binary"
2121
"encoding/gob"
22+
"hash"
23+
"math"
2224
"reflect"
25+
"sort"
2326

2427
"cloud.google.com/go/spanner"
2528
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
@@ -70,8 +73,6 @@ type checksumRowIterator struct {
7073
// seen. It is calculated as a SHA256 checksum over all rows that so far
7174
// have been returned.
7275
checksum *[32]byte
73-
buffer *bytes.Buffer
74-
enc *gob.Encoder
7576

7677
// errIndex and err indicate any error and the index in the result set
7778
// where the error occurred.
@@ -110,7 +111,7 @@ func (it *checksumRowIterator) Next() (row *spanner.Row, err error) {
110111
// checksum of the columns that are included in this result. This is
111112
// also used to detect the possible difference between two empty
112113
// result sets with a different set of columns.
113-
it.checksum, err = createMetadataChecksum(it.enc, it.buffer, it.metadata)
114+
it.checksum, err = createMetadataChecksum(it.metadata)
114115
if err != nil {
115116
return err
116117
}
@@ -119,45 +120,87 @@ func (it *checksumRowIterator) Next() (row *spanner.Row, err error) {
119120
return it.err
120121
}
121122
// Update the current checksum.
122-
it.checksum, err = updateChecksum(it.enc, it.buffer, it.checksum, row)
123+
it.checksum, err = updateChecksum(it.checksum, row)
123124
return err
124125
})
125126
return row, err
126127
}
127128

128129
// updateChecksum calculates the following checksum based on a current checksum
129130
// and a new row.
130-
func updateChecksum(enc *gob.Encoder, buffer *bytes.Buffer, currentChecksum *[32]byte, row *spanner.Row) (*[32]byte, error) {
131-
buffer.Reset()
132-
buffer.Write(currentChecksum[:])
131+
func updateChecksum(currentChecksum *[32]byte, row *spanner.Row) (*[32]byte, error) {
132+
hasher := sha256.New()
133+
hasher.Write(currentChecksum[:])
133134
for i := 0; i < row.Size(); i++ {
134135
var v spanner.GenericColumnValue
135136
err := row.Column(i, &v)
136137
if err != nil {
137138
return nil, err
138139
}
139-
err = enc.Encode(v)
140-
if err != nil {
141-
return nil, err
140+
hashValue(v.Value, hasher)
141+
}
142+
res := hasher.Sum(nil)
143+
return (*[32]byte)(res), nil
144+
}
145+
146+
var int32Buf [4]byte
147+
var float64Buf [8]byte
148+
149+
func hashValue(value *structpb.Value, hasher hash.Hash) {
150+
switch value.GetKind().(type) {
151+
case *structpb.Value_StringValue:
152+
hasher.Write(intToByte(int32Buf, len(value.GetStringValue())))
153+
hasher.Write([]byte(value.GetStringValue()))
154+
case *structpb.Value_NullValue:
155+
hasher.Write([]byte{0})
156+
case *structpb.Value_NumberValue:
157+
hasher.Write(float64ToByte(float64Buf, value.GetNumberValue()))
158+
case *structpb.Value_BoolValue:
159+
if value.GetBoolValue() {
160+
hasher.Write([]byte{1})
161+
} else {
162+
hasher.Write([]byte{0})
163+
}
164+
case *structpb.Value_StructValue:
165+
fields := make([]string, 0, len(value.GetStructValue().Fields))
166+
for field, _ := range value.GetStructValue().Fields {
167+
fields = append(fields, field)
168+
}
169+
sort.Strings(fields)
170+
for _, field := range fields {
171+
hasher.Write(intToByte(int32Buf, len(field)))
172+
hasher.Write([]byte(field))
173+
hashValue(value.GetStructValue().Fields[field], hasher)
174+
}
175+
case *structpb.Value_ListValue:
176+
for _, v := range value.GetListValue().GetValues() {
177+
hashValue(v, hasher)
142178
}
143179
}
144-
res := sha256.Sum256(buffer.Bytes())
145-
return &res, nil
180+
}
181+
182+
func intToByte(buf [4]byte, v int) []byte {
183+
binary.BigEndian.PutUint32(buf[:], uint32(v))
184+
return buf[:]
185+
}
186+
187+
func float64ToByte(buf [8]byte, f float64) []byte {
188+
binary.BigEndian.PutUint64(buf[:], math.Float64bits(f))
189+
return buf[:]
146190
}
147191

148192
// createMetadataChecksum calculates the checksum of the metadata of a result.
149193
// Only the column names and types are included in the checksum. Any transaction
150194
// metadata is not included.
151-
func createMetadataChecksum(enc *gob.Encoder, buffer *bytes.Buffer, metadata *sppb.ResultSetMetadata) (*[32]byte, error) {
152-
buffer.Reset()
195+
func createMetadataChecksum(metadata *sppb.ResultSetMetadata) (*[32]byte, error) {
196+
hasher := sha256.New()
153197
for _, field := range metadata.RowType.Fields {
154-
err := enc.Encode(field)
155-
if err != nil {
156-
return nil, err
157-
}
198+
hasher.Write(intToByte(int32Buf, len(field.Name)))
199+
hasher.Write([]byte(field.Name))
200+
hasher.Write(intToByte(int32Buf, int(field.Type.Code.Number())))
158201
}
159-
res := sha256.Sum256(buffer.Bytes())
160-
return &res, nil
202+
res := hasher.Sum(nil)
203+
return (*[32]byte)(res), nil
161204
}
162205

163206
// retry implements retriableStatement.retry for queries. It will execute the
@@ -167,8 +210,6 @@ func createMetadataChecksum(enc *gob.Encoder, buffer *bytes.Buffer, metadata *sp
167210
// initial iterator was also returned by the new iterator, and that the errors
168211
// were returned by the same row index.
169212
func (it *checksumRowIterator) retry(ctx context.Context, tx *spanner.ReadWriteStmtBasedTransaction) error {
170-
buffer := &bytes.Buffer{}
171-
enc := gob.NewEncoder(buffer)
172213
retryIt := tx.QueryWithOptions(ctx, it.stmt, it.options)
173214
// If the original iterator had been stopped, we should also always stop the
174215
// new iterator.
@@ -198,7 +239,7 @@ func (it *checksumRowIterator) retry(ctx context.Context, tx *spanner.ReadWriteS
198239
for n := int64(0); n < it.nc; n++ {
199240
row, err := retryIt.Next()
200241
if n == 0 && (err == nil || err == iterator.Done) {
201-
newChecksum, checksumErr = createMetadataChecksum(enc, buffer, retryIt.Metadata)
242+
newChecksum, checksumErr = createMetadataChecksum(retryIt.Metadata)
202243
if checksumErr != nil {
203244
return failRetry(checksumErr)
204245
}
@@ -218,7 +259,7 @@ func (it *checksumRowIterator) retry(ctx context.Context, tx *spanner.ReadWriteS
218259
}
219260
return failRetry(ErrAbortedDueToConcurrentModification)
220261
}
221-
newChecksum, err = updateChecksum(enc, buffer, newChecksum, row)
262+
newChecksum, err = updateChecksum(newChecksum, row)
222263
if err != nil {
223264
return failRetry(err)
224265
}

checksum_row_iterator_test.go

Lines changed: 84 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
package spannerdriver
1616

1717
import (
18-
"bytes"
19-
"encoding/gob"
2018
"math/big"
2119
"testing"
2220
"time"
@@ -26,13 +24,6 @@ import (
2624
)
2725

2826
func TestUpdateChecksum(t *testing.T) {
29-
buffer1 := &bytes.Buffer{}
30-
enc1 := gob.NewEncoder(buffer1)
31-
buffer2 := &bytes.Buffer{}
32-
enc2 := gob.NewEncoder(buffer2)
33-
buffer3 := &bytes.Buffer{}
34-
enc3 := gob.NewEncoder(buffer3)
35-
3627
row1, err := spanner.NewRow(
3728
[]string{
3829
"ColBool", "ColInt64", "ColFloat64", "ColNumeric", "ColString", "ColBytes", "ColDate", "ColTimestamp", "ColJson",
@@ -111,17 +102,17 @@ func TestUpdateChecksum(t *testing.T) {
111102
t.Fatalf("could not create row 3: %v", err)
112103
}
113104
initial1 := new([32]byte)
114-
checksum1, err := updateChecksum(enc1, buffer1, initial1, row1)
105+
checksum1, err := updateChecksum(initial1, row1)
115106
if err != nil {
116107
t.Fatalf("could not calculate checksum 1: %v", err)
117108
}
118109
initial2 := new([32]byte)
119-
checksum2, err := updateChecksum(enc2, buffer2, initial2, row2)
110+
checksum2, err := updateChecksum(initial2, row2)
120111
if err != nil {
121112
t.Fatalf("could not calculate checksum 2: %v", err)
122113
}
123114
initial3 := new([32]byte)
124-
checksum3, err := updateChecksum(enc3, buffer3, initial3, row3)
115+
checksum3, err := updateChecksum(initial3, row3)
125116
if err != nil {
126117
t.Fatalf("could not calculate checksum 3: %v", err)
127118
}
@@ -136,11 +127,11 @@ func TestUpdateChecksum(t *testing.T) {
136127

137128
// Updating checksums 1 and 3 with the data from row 2 should also produce
138129
// the same checksum.
139-
checksum1_2, err := updateChecksum(enc1, buffer1, checksum1, row2)
130+
checksum1_2, err := updateChecksum(checksum1, row2)
140131
if err != nil {
141132
t.Fatalf("could not calculate checksum 1_2: %v", err)
142133
}
143-
checksum3_2, err := updateChecksum(enc3, buffer3, checksum3, row2)
134+
checksum3_2, err := updateChecksum(checksum3, row2)
144135
if err != nil {
145136
t.Fatalf("could not calculate checksum 1_2: %v", err)
146137
}
@@ -150,7 +141,7 @@ func TestUpdateChecksum(t *testing.T) {
150141

151142
// The combination of row 3 and 2 will produce a different checksum than the
152143
// combination 2 and 3, because they are in a different order.
153-
checksum2_3, err := updateChecksum(enc2, buffer2, checksum2, row3)
144+
checksum2_3, err := updateChecksum(checksum2, row3)
154145
if err != nil {
155146
t.Fatalf("could not calculate checksum 2_3: %v", err)
156147
}
@@ -160,9 +151,6 @@ func TestUpdateChecksum(t *testing.T) {
160151
}
161152

162153
func TestUpdateChecksumForNullValues(t *testing.T) {
163-
buffer := &bytes.Buffer{}
164-
enc := gob.NewEncoder(buffer)
165-
166154
row, err := spanner.NewRow(
167155
[]string{
168156
"ColBool", "ColInt64", "ColFloat64", "ColNumeric", "ColString", "ColBytes", "ColDate", "ColTimestamp", "ColJson",
@@ -182,7 +170,7 @@ func TestUpdateChecksumForNullValues(t *testing.T) {
182170
}
183171
initial := new([32]byte)
184172
// Create the initial checksum.
185-
checksum, err := updateChecksum(enc, buffer, initial, row)
173+
checksum, err := updateChecksum(initial, row)
186174
if err != nil {
187175
t.Fatalf("could not calculate checksum 1: %v", err)
188176
}
@@ -192,14 +180,88 @@ func TestUpdateChecksumForNullValues(t *testing.T) {
192180
t.Fatalf("checksum value should not be equal to the initial value")
193181
}
194182
// Calculating the same checksum again should yield the same result.
195-
buffer2 := &bytes.Buffer{}
196-
enc2 := gob.NewEncoder(buffer2)
197183
initial2 := new([32]byte)
198-
checksum2, err := updateChecksum(enc2, buffer2, initial2, row)
184+
checksum2, err := updateChecksum(initial2, row)
199185
if err != nil {
200186
t.Fatalf("failed to update checksum: %v", err)
201187
}
202188
if *checksum != *checksum2 {
203189
t.Fatalf("recalculated checksum does not match the initial calculation")
204190
}
205191
}
192+
193+
func BenchmarkChecksumRowIterator(b *testing.B) {
194+
row1, _ := spanner.NewRow(
195+
[]string{
196+
"ColBool", "ColInt64", "ColFloat64", "ColNumeric", "ColString", "ColBytes", "ColDate", "ColTimestamp", "ColJson",
197+
"ArrBool", "ArrInt64", "ArrFloat64", "ArrNumeric", "ArrString", "ArrBytes", "ArrDate", "ArrTimestamp", "ArrJson",
198+
},
199+
[]interface{}{
200+
true, int64(1), 3.14, numeric("6.626"), "test", []byte("testbytes"), civil.Date{Year: 2021, Month: 8, Day: 5},
201+
time.Date(2021, 8, 5, 13, 19, 23, 123456789, time.UTC),
202+
nullJson(true, `"key": "value", "other-key": ["value1", "value2"]}`),
203+
[]bool{true, false}, []int64{1, 2}, []float64{3.14, 6.626}, []big.Rat{numeric("3.14"), numeric("6.626")},
204+
[]string{"test1", "test2"}, [][]byte{[]byte("testbytes1"), []byte("testbytes1")},
205+
[]civil.Date{{Year: 2021, Month: 8, Day: 5}, {Year: 2021, Month: 8, Day: 6}},
206+
[]time.Time{
207+
time.Date(2021, 8, 5, 13, 19, 23, 123456789, time.UTC),
208+
time.Date(2021, 8, 6, 13, 19, 23, 123456789, time.UTC),
209+
},
210+
[]spanner.NullJSON{
211+
nullJson(true, `"key1": "value1", "other-key1": ["value1", "value2"]}`),
212+
nullJson(true, `"key2": "value2", "other-key2": ["value1", "value2"]}`),
213+
},
214+
},
215+
)
216+
row2, _ := spanner.NewRow(
217+
[]string{
218+
"ColBool", "ColInt64", "ColFloat64", "ColNumeric", "ColString", "ColBytes", "ColDate", "ColTimestamp", "ColJson",
219+
"ArrBool", "ArrInt64", "ArrFloat64", "ArrNumeric", "ArrString", "ArrBytes", "ArrDate", "ArrTimestamp", "ArrJson",
220+
},
221+
[]interface{}{
222+
true, int64(2), 6.626, numeric("3.14"), "test2", []byte("testbytes2"), civil.Date{Year: 2020, Month: 8, Day: 5},
223+
time.Date(2020, 8, 5, 13, 19, 23, 123456789, time.UTC),
224+
nullJson(true, `"key": "other-value", "other-key": ["other-value1", "other-value2"]}`),
225+
[]bool{true, false}, []int64{1, 2}, []float64{3.14, 6.626}, []big.Rat{numeric("3.14"), numeric("6.626")},
226+
[]string{"test1_", "test2_"}, [][]byte{[]byte("testbytes1_"), []byte("testbytes1_")},
227+
[]civil.Date{{Year: 2020, Month: 8, Day: 5}, {Year: 2020, Month: 8, Day: 6}},
228+
[]time.Time{
229+
time.Date(2020, 8, 5, 13, 19, 23, 123456789, time.UTC),
230+
time.Date(2020, 8, 6, 13, 19, 23, 123456789, time.UTC),
231+
},
232+
[]spanner.NullJSON{
233+
nullJson(true, `"key1": "other-value1", "other-key1": ["other-value1", "other-value2"]}`),
234+
nullJson(true, `"key2": "other-value2", "other-key2": ["other-value1", "other-value2"]}`),
235+
},
236+
},
237+
)
238+
row3, _ := spanner.NewRow(
239+
[]string{
240+
"ColBool", "ColInt64", "ColFloat64", "ColNumeric", "ColString", "ColBytes", "ColDate", "ColTimestamp", "ColJson",
241+
"ArrBool", "ArrInt64", "ArrFloat64", "ArrNumeric", "ArrString", "ArrBytes", "ArrDate", "ArrTimestamp", "ArrJson",
242+
},
243+
[]interface{}{
244+
true, int64(1), 3.14, numeric("6.626"), "test", []byte("testbytes"), civil.Date{Year: 2021, Month: 8, Day: 5},
245+
time.Date(2021, 8, 5, 13, 19, 23, 123456789, time.UTC),
246+
nullJson(true, `"key": "value", "other-key": ["value1", "value2"]}`),
247+
[]bool{true, false}, []int64{1, 2}, []float64{3.14, 6.626}, []big.Rat{numeric("3.14"), numeric("6.626")},
248+
[]string{"test1", "test2"}, [][]byte{[]byte("testbytes1"), []byte("testbytes1")},
249+
[]civil.Date{{Year: 2021, Month: 8, Day: 5}, {Year: 2021, Month: 8, Day: 6}},
250+
[]time.Time{
251+
time.Date(2021, 8, 5, 13, 19, 23, 123456789, time.UTC),
252+
time.Date(2021, 8, 6, 13, 19, 23, 123456789, time.UTC),
253+
},
254+
[]spanner.NullJSON{
255+
nullJson(true, `"key1": "value1", "other-key1": ["value1", "value2"]}`),
256+
nullJson(true, `"key2": "value2", "other-key2": ["value1", "value2"]}`),
257+
},
258+
},
259+
)
260+
261+
for b.Loop() {
262+
initial := new([32]byte)
263+
checksum, _ := updateChecksum(initial, row1)
264+
checksum, _ = updateChecksum(checksum, row2)
265+
checksum, _ = updateChecksum(checksum, row3)
266+
}
267+
}

0 commit comments

Comments
 (0)