Skip to content

Commit 1793d0c

Browse files
committed
GH-3358: Add Configurable Thrift Max Message Size for Parquet Metadata Reading
1 parent 8e740f0 commit 1793d0c

File tree

3 files changed

+255
-10
lines changed

3 files changed

+255
-10
lines changed

parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.parquet.format.event.TypedConsumer.I64Consumer;
4646
import org.apache.parquet.format.event.TypedConsumer.StringConsumer;
4747
import org.apache.thrift.TBase;
48+
import org.apache.thrift.TConfiguration;
4849
import org.apache.thrift.TException;
4950
import org.apache.thrift.protocol.TCompactProtocol;
5051
import org.apache.thrift.protocol.TProtocol;
@@ -59,6 +60,7 @@
5960
public class Util {
6061

6162
private static final int INIT_MEM_ALLOC_ENCR_BUFFER = 100;
63+
private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB
6264

6365
public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream to) throws IOException {
6466
writeColumnIndex(columnIndex, to, null, null);
@@ -156,6 +158,15 @@ public static FileMetaData readFileMetaData(InputStream from, BlockCipher.Decryp
156158
return read(from, new FileMetaData(), decryptor, AAD);
157159
}
158160

161+
public static FileMetaData readFileMetaData(InputStream from, int maxMessageSize) throws IOException {
162+
return readFileMetaData(from, null, null, maxMessageSize);
163+
}
164+
165+
public static FileMetaData readFileMetaData(
166+
InputStream from, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize) throws IOException {
167+
return read(from, new FileMetaData(), decryptor, AAD, maxMessageSize);
168+
}
169+
159170
public static void writeColumnMetaData(
160171
ColumnMetaData columnMetaData, OutputStream to, BlockCipher.Encryptor encryptor, byte[] AAD)
161172
throws IOException {
@@ -190,6 +201,18 @@ public static FileMetaData readFileMetaData(
190201
return md;
191202
}
192203

204+
public static FileMetaData readFileMetaData(
205+
InputStream from, boolean skipRowGroups, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize)
206+
throws IOException {
207+
FileMetaData md = new FileMetaData();
208+
if (skipRowGroups) {
209+
readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups, decryptor, AAD, maxMessageSize);
210+
} else {
211+
read(from, md, decryptor, AAD, maxMessageSize);
212+
}
213+
return md;
214+
}
215+
193216
public static void writeFileCryptoMetaData(
194217
org.apache.parquet.format.FileCryptoMetaData cryptoMetadata, OutputStream to) throws IOException {
195218
write(cryptoMetadata, to, null, null);
@@ -293,6 +316,17 @@ public static void readFileMetaData(
293316
BlockCipher.Decryptor decryptor,
294317
byte[] AAD)
295318
throws IOException {
319+
readFileMetaData(input, consumer, skipRowGroups, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE);
320+
}
321+
322+
public static void readFileMetaData(
323+
final InputStream input,
324+
final FileMetaDataConsumer consumer,
325+
boolean skipRowGroups,
326+
BlockCipher.Decryptor decryptor,
327+
byte[] AAD,
328+
int maxMessageSize)
329+
throws IOException {
296330
try {
297331
DelegatingFieldConsumer eventConsumer = fieldConsumer()
298332
.onField(VERSION, new I32Consumer() {
@@ -358,26 +392,54 @@ public void consume(RowGroup rowGroup) {
358392
byte[] plainText = decryptor.decrypt(input, AAD);
359393
from = new ByteArrayInputStream(plainText);
360394
}
361-
new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);
395+
new EventBasedThriftReader(protocol(from, maxMessageSize)).readStruct(eventConsumer);
362396
} catch (TException e) {
363397
throw new IOException("can not read FileMetaData: " + e.getMessage(), e);
364398
}
365399
}
366400

367401
private static TProtocol protocol(OutputStream to) throws TTransportException {
368-
return protocol(new TIOStreamTransport(to));
402+
return protocol(new TIOStreamTransport(to), DEFAULT_MAX_MESSAGE_SIZE);
369403
}
370404

371405
private static TProtocol protocol(InputStream from) throws TTransportException {
372-
return protocol(new TIOStreamTransport(from));
406+
return protocol(new TIOStreamTransport(from), DEFAULT_MAX_MESSAGE_SIZE);
407+
}
408+
409+
private static TProtocol protocol(InputStream from, int maxMessageSize) throws TTransportException {
410+
return protocol(new TIOStreamTransport(from), maxMessageSize);
373411
}
374412

375-
private static InterningProtocol protocol(TIOStreamTransport t) {
413+
private static InterningProtocol protocol(TIOStreamTransport t, int configuredMaxMessageSize)
414+
throws TTransportException, NumberFormatException {
415+
int maxMessageSize = configuredMaxMessageSize;
416+
if (configuredMaxMessageSize == -1) {
417+
// Set to default 100 MB
418+
maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
419+
}
420+
if (configuredMaxMessageSize <= 0) {
421+
throw new NumberFormatException("Max message size must be positive: " + configuredMaxMessageSize);
422+
}
423+
424+
TConfiguration config = t.getConfiguration();
425+
config.setMaxMessageSize(maxMessageSize);
426+
/*
427+
Reset known message size to 0 to force checking against the max message size.
428+
This is necessary when reusing the same transport for multiple reads/writes,
429+
as the known message size may be larger than the max message size.
430+
*/
431+
t.updateKnownMessageSize(0);
376432
return new InterningProtocol(new TCompactProtocol(t));
377433
}
378434

379435
private static <T extends TBase<?, ?>> T read(
380436
final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
437+
return read(input, tbase, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE);
438+
}
439+
440+
private static <T extends TBase<?, ?>> T read(
441+
final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize)
442+
throws IOException {
381443
final InputStream from;
382444
if (null == decryptor) {
383445
from = input;
@@ -387,7 +449,7 @@ private static InterningProtocol protocol(TIOStreamTransport t) {
387449
}
388450

389451
try {
390-
tbase.read(protocol(from));
452+
tbase.read(protocol(from, maxMessageSize));
391453
return tbase;
392454
} catch (TException e) {
393455
throw new IOException("can not read " + tbase.getClass() + ": " + e.getMessage(), e);

parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,13 +147,23 @@ public class ParquetMetadataConverter {
147147
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
148148
public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k
149149

150+
/**
151+
* Configuration property to control the Thrift max message size when reading Parquet metadata.
152+
* This is useful for files with very large metadata
153+
* Default value is 100 MB.
154+
*/
155+
public static final String PARQUET_THRIFT_STRING_SIZE_LIMIT = "parquet.thrift.string.size.limit";
156+
157+
private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB
158+
150159
private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class);
151160
private static final LogicalTypeConverterVisitor LOGICAL_TYPE_ANNOTATION_VISITOR =
152161
new LogicalTypeConverterVisitor();
153162
private static final ConvertedTypeConverterVisitor CONVERTED_TYPE_CONVERTER_VISITOR =
154163
new ConvertedTypeConverterVisitor();
155164
private final int statisticsTruncateLength;
156165
private final boolean useSignedStringMinMax;
166+
private final ParquetReadOptions options;
157167

158168
public ParquetMetadataConverter() {
159169
this(false);
@@ -173,19 +183,38 @@ public ParquetMetadataConverter(Configuration conf) {
173183
}
174184

175185
public ParquetMetadataConverter(ParquetReadOptions options) {
176-
this(options.useSignedStringMinMax());
186+
this(options.useSignedStringMinMax(), ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, options);
177187
}
178188

179189
private ParquetMetadataConverter(boolean useSignedStringMinMax) {
180190
this(useSignedStringMinMax, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
181191
}
182192

183193
private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength) {
194+
this(useSignedStringMinMax, statisticsTruncateLength, null);
195+
}
196+
197+
private ParquetMetadataConverter(
198+
boolean useSignedStringMinMax, int statisticsTruncateLength, ParquetReadOptions options) {
184199
if (statisticsTruncateLength <= 0) {
185200
throw new IllegalArgumentException("Truncate length should be greater than 0");
186201
}
187202
this.useSignedStringMinMax = useSignedStringMinMax;
188203
this.statisticsTruncateLength = statisticsTruncateLength;
204+
this.options = options;
205+
}
206+
207+
/**
208+
* Gets the configured max message size for Thrift deserialization.
209+
* Reads from ParquetReadOptions configuration, or returns -1 if not available.
210+
*
211+
* @return the max message size in bytes, or -1 to use the default
212+
*/
213+
private int getMaxMessageSize() {
214+
if (options != null && options.getConfiguration() != null) {
215+
return options.getConfiguration().getInt(PARQUET_THRIFT_STRING_SIZE_LIMIT, DEFAULT_MAX_MESSAGE_SIZE);
216+
}
217+
return -1;
189218
}
190219

191220
// NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate
@@ -1694,21 +1723,27 @@ public ParquetMetadata readParquetMetadata(
16941723
filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {
16951724
@Override
16961725
public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {
1697-
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
1726+
int maxMessageSize = getMaxMessageSize();
1727+
FileMetaData fileMetadata =
1728+
readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
16981729
return new FileMetaDataAndRowGroupOffsetInfo(
16991730
fileMetadata, generateRowGroupOffsets(fileMetadata));
17001731
}
17011732

17021733
@Override
17031734
public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {
1704-
FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
1735+
int maxMessageSize = getMaxMessageSize();
1736+
FileMetaData fileMetadata =
1737+
readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD, maxMessageSize);
17051738
return new FileMetaDataAndRowGroupOffsetInfo(
17061739
fileMetadata, generateRowGroupOffsets(fileMetadata));
17071740
}
17081741

17091742
@Override
17101743
public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {
1711-
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
1744+
int maxMessageSize = getMaxMessageSize();
1745+
FileMetaData fileMetadata =
1746+
readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
17121747
// We must generate the map *before* filtering because it modifies `fileMetadata`.
17131748
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
17141749
FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);
@@ -1717,7 +1752,9 @@ public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) thro
17171752

17181753
@Override
17191754
public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {
1720-
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
1755+
int maxMessageSize = getMaxMessageSize();
1756+
FileMetaData fileMetadata =
1757+
readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
17211758
// We must generate the map *before* filtering because it modifies `fileMetadata`.
17221759
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
17231760
FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.hadoop;
20+
21+
import static org.junit.Assert.*;
22+
23+
import java.io.File;
24+
import java.io.IOException;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.fs.Path;
27+
import org.apache.parquet.HadoopReadOptions;
28+
import org.apache.parquet.ParquetReadOptions;
29+
import org.apache.parquet.example.data.Group;
30+
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
31+
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
32+
import org.apache.parquet.hadoop.example.GroupWriteSupport;
33+
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
34+
import org.apache.parquet.hadoop.util.HadoopInputFile;
35+
import org.apache.parquet.hadoop.util.HadoopOutputFile;
36+
import org.apache.parquet.schema.MessageType;
37+
import org.apache.parquet.schema.MessageTypeParser;
38+
import org.junit.Before;
39+
import org.junit.Rule;
40+
import org.junit.Test;
41+
import org.junit.rules.TemporaryFolder;
42+
43+
public class TestParquetFileReaderMaxMessageSize {
44+
45+
public static Path TEST_FILE;
46+
public MessageType schema;
47+
48+
@Rule
49+
public final TemporaryFolder temp = new TemporaryFolder();
50+
51+
@Before
52+
public void testSetup() throws IOException {
53+
54+
File testParquetFile = temp.newFile();
55+
testParquetFile.delete();
56+
57+
TEST_FILE = new Path(testParquetFile.toURI());
58+
// Create a file with many columns
59+
StringBuilder schemaBuilder = new StringBuilder("message test_schema {");
60+
for (int i = 0; i < 2000; i++) {
61+
schemaBuilder.append("required int64 col_").append(i).append(";");
62+
}
63+
schemaBuilder.append("}");
64+
65+
schema = MessageTypeParser.parseMessageType(schemaBuilder.toString());
66+
67+
Configuration conf = new Configuration();
68+
GroupWriteSupport.setSchema(schema, conf);
69+
70+
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(HadoopOutputFile.fromPath(TEST_FILE, conf))
71+
.withConf(conf)
72+
.withType(schema)
73+
.build()) {
74+
75+
SimpleGroupFactory factory = new SimpleGroupFactory(schema);
76+
Group group = factory.newGroup();
77+
for (int col = 0; col < 2000; col++) {
78+
group.append("col_" + col, 1L);
79+
}
80+
writer.write(group);
81+
}
82+
}
83+
84+
/**
85+
* Test reading a file with many columns using custom max message size
86+
*/
87+
@Test
88+
public void testReadFileWithManyColumns() throws IOException {
89+
Configuration readConf = new Configuration();
90+
readConf.setInt("parquet.thrift.string.size.limit", 200 * 1024 * 1024);
91+
92+
ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
93+
94+
try (ParquetFileReader reader =
95+
ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
96+
97+
ParquetMetadata metadata = reader.getFooter();
98+
assertNotNull(metadata);
99+
assertEquals(schema, metadata.getFileMetaData().getSchema());
100+
assertTrue(metadata.getBlocks().size() > 0);
101+
}
102+
}
103+
104+
/**
105+
* Test that default configuration works for normal files
106+
*/
107+
@Test
108+
public void testReadNormalFileWithDefaultConfig() throws IOException {
109+
// Read with default configuration (no custom max message size)
110+
Configuration readConf = new Configuration();
111+
ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
112+
113+
try (ParquetFileReader reader =
114+
ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
115+
116+
ParquetMetadata metadata = reader.getFooter();
117+
assertNotNull(metadata);
118+
assertEquals(1, metadata.getBlocks().get(0).getRowCount());
119+
}
120+
}
121+
122+
/**
123+
* Test that insufficient max message size produces error
124+
*/
125+
@Test
126+
public void testInsufficientMaxMessageSizeError() throws IOException {
127+
// Try to read with very small max message size
128+
Configuration readConf = new Configuration();
129+
readConf.setInt("parquet.thrift.string.size.limit", 1); // Only 1 byte
130+
131+
ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
132+
133+
try (ParquetFileReader reader =
134+
ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
135+
fail("Should have thrown Message size exceeds limit due to MaxMessageSize");
136+
} catch (IOException e) {
137+
e.printStackTrace();
138+
assertTrue(
139+
"Error should mention TTransportException",
140+
e.getMessage().contains("Message size exceeds limit")
141+
|| e.getCause().getMessage().contains("Message size exceeds limit")
142+
|| e.getMessage().contains("MaxMessageSize reached")
143+
|| e.getCause().getMessage().contains("MaxMessageSize reached"));
144+
}
145+
}
146+
}

0 commit comments

Comments
 (0)