Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.parquet.format.event.TypedConsumer.I64Consumer;
import org.apache.parquet.format.event.TypedConsumer.StringConsumer;
import org.apache.thrift.TBase;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
Expand All @@ -59,6 +60,7 @@
public class Util {

private static final int INIT_MEM_ALLOC_ENCR_BUFFER = 100;
private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB

public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream to) throws IOException {
writeColumnIndex(columnIndex, to, null, null);
Expand Down Expand Up @@ -156,6 +158,15 @@ public static FileMetaData readFileMetaData(InputStream from, BlockCipher.Decryp
return read(from, new FileMetaData(), decryptor, AAD);
}

public static FileMetaData readFileMetaData(InputStream from, int maxMessageSize) throws IOException {
return readFileMetaData(from, null, null, maxMessageSize);
}

public static FileMetaData readFileMetaData(
InputStream from, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize) throws IOException {
return read(from, new FileMetaData(), decryptor, AAD, maxMessageSize);
}

public static void writeColumnMetaData(
ColumnMetaData columnMetaData, OutputStream to, BlockCipher.Encryptor encryptor, byte[] AAD)
throws IOException {
Expand Down Expand Up @@ -190,6 +201,18 @@ public static FileMetaData readFileMetaData(
return md;
}

public static FileMetaData readFileMetaData(
InputStream from, boolean skipRowGroups, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize)
throws IOException {
FileMetaData md = new FileMetaData();
if (skipRowGroups) {
readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups, decryptor, AAD, maxMessageSize);
} else {
read(from, md, decryptor, AAD, maxMessageSize);
}
return md;
}

public static void writeFileCryptoMetaData(
org.apache.parquet.format.FileCryptoMetaData cryptoMetadata, OutputStream to) throws IOException {
write(cryptoMetadata, to, null, null);
Expand Down Expand Up @@ -293,6 +316,17 @@ public static void readFileMetaData(
BlockCipher.Decryptor decryptor,
byte[] AAD)
throws IOException {
readFileMetaData(input, consumer, skipRowGroups, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE);
}

public static void readFileMetaData(
final InputStream input,
final FileMetaDataConsumer consumer,
boolean skipRowGroups,
BlockCipher.Decryptor decryptor,
byte[] AAD,
int maxMessageSize)
throws IOException {
try {
DelegatingFieldConsumer eventConsumer = fieldConsumer()
.onField(VERSION, new I32Consumer() {
Expand Down Expand Up @@ -358,26 +392,54 @@ public void consume(RowGroup rowGroup) {
byte[] plainText = decryptor.decrypt(input, AAD);
from = new ByteArrayInputStream(plainText);
}
new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);
new EventBasedThriftReader(protocol(from, maxMessageSize)).readStruct(eventConsumer);
} catch (TException e) {
throw new IOException("can not read FileMetaData: " + e.getMessage(), e);
}
}

private static TProtocol protocol(OutputStream to) throws TTransportException {
return protocol(new TIOStreamTransport(to));
return protocol(new TIOStreamTransport(to), DEFAULT_MAX_MESSAGE_SIZE);
}

private static TProtocol protocol(InputStream from) throws TTransportException {
return protocol(new TIOStreamTransport(from));
return protocol(new TIOStreamTransport(from), DEFAULT_MAX_MESSAGE_SIZE);
}

private static TProtocol protocol(InputStream from, int maxMessageSize) throws TTransportException {
return protocol(new TIOStreamTransport(from), maxMessageSize);
}

private static InterningProtocol protocol(TIOStreamTransport t) {
private static InterningProtocol protocol(TIOStreamTransport t, int configuredMaxMessageSize)
throws TTransportException, NumberFormatException {
int maxMessageSize = configuredMaxMessageSize;
if (configuredMaxMessageSize == -1) {
// Set to default 100 MB
maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
}
if (configuredMaxMessageSize <= 0) {
throw new NumberFormatException("Max message size must be positive: " + configuredMaxMessageSize);
}

TConfiguration config = t.getConfiguration();
config.setMaxMessageSize(maxMessageSize);
/*
Reset known message size to 0 to force checking against the max message size.
This is necessary when reusing the same transport for multiple reads/writes,
as the known message size may be larger than the max message size.
*/
t.updateKnownMessageSize(0);
return new InterningProtocol(new TCompactProtocol(t));
}

private static <T extends TBase<?, ?>> T read(
final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
return read(input, tbase, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE);
}

private static <T extends TBase<?, ?>> T read(
final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize)
throws IOException {
final InputStream from;
if (null == decryptor) {
from = input;
Expand All @@ -387,7 +449,7 @@ private static InterningProtocol protocol(TIOStreamTransport t) {
}

try {
tbase.read(protocol(from));
tbase.read(protocol(from, maxMessageSize));
return tbase;
} catch (TException e) {
throw new IOException("can not read " + tbase.getClass() + ": " + e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,23 @@ public class ParquetMetadataConverter {
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k

/**
* Configuration property to control the Thrift max message size when reading Parquet metadata.
* This is useful for files with very large metadata
* Default value is 100 MB.
*/
public static final String PARQUET_THRIFT_STRING_SIZE_LIMIT = "parquet.thrift.string.size.limit";

private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB

private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class);
private static final LogicalTypeConverterVisitor LOGICAL_TYPE_ANNOTATION_VISITOR =
new LogicalTypeConverterVisitor();
private static final ConvertedTypeConverterVisitor CONVERTED_TYPE_CONVERTER_VISITOR =
new ConvertedTypeConverterVisitor();
private final int statisticsTruncateLength;
private final boolean useSignedStringMinMax;
private final ParquetReadOptions options;

public ParquetMetadataConverter() {
this(false);
Expand All @@ -173,19 +183,38 @@ public ParquetMetadataConverter(Configuration conf) {
}

public ParquetMetadataConverter(ParquetReadOptions options) {
this(options.useSignedStringMinMax());
this(options.useSignedStringMinMax(), ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, options);
}

private ParquetMetadataConverter(boolean useSignedStringMinMax) {
this(useSignedStringMinMax, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
}

private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength) {
this(useSignedStringMinMax, statisticsTruncateLength, null);
}

private ParquetMetadataConverter(
boolean useSignedStringMinMax, int statisticsTruncateLength, ParquetReadOptions options) {
if (statisticsTruncateLength <= 0) {
throw new IllegalArgumentException("Truncate length should be greater than 0");
}
this.useSignedStringMinMax = useSignedStringMinMax;
this.statisticsTruncateLength = statisticsTruncateLength;
this.options = options;
}

/**
* Gets the configured max message size for Thrift deserialization.
* Reads from ParquetReadOptions configuration, or returns -1 if not available.
*
* @return the max message size in bytes, or -1 to use the default
*/
private int getMaxMessageSize() {
if (options != null && options.getConfiguration() != null) {
return options.getConfiguration().getInt(PARQUET_THRIFT_STRING_SIZE_LIMIT, DEFAULT_MAX_MESSAGE_SIZE);
}
return -1;
}

// NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate
Expand Down Expand Up @@ -1694,21 +1723,27 @@ public ParquetMetadata readParquetMetadata(
filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
int maxMessageSize = getMaxMessageSize();
FileMetaData fileMetadata =
readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
return new FileMetaDataAndRowGroupOffsetInfo(
fileMetadata, generateRowGroupOffsets(fileMetadata));
}

@Override
public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {
FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
int maxMessageSize = getMaxMessageSize();
FileMetaData fileMetadata =
readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD, maxMessageSize);
return new FileMetaDataAndRowGroupOffsetInfo(
fileMetadata, generateRowGroupOffsets(fileMetadata));
}

@Override
public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
int maxMessageSize = getMaxMessageSize();
FileMetaData fileMetadata =
readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
// We must generate the map *before* filtering because it modifies `fileMetadata`.
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);
Expand All @@ -1717,7 +1752,9 @@ public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) thro

@Override
public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {
FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
int maxMessageSize = getMaxMessageSize();
FileMetaData fileMetadata =
readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
// We must generate the map *before* filtering because it modifies `fileMetadata`.
Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

import static org.junit.Assert.*;

import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestParquetFileReaderMaxMessageSize {

public static Path TEST_FILE;
public MessageType schema;

@Rule
public final TemporaryFolder temp = new TemporaryFolder();

@Before
public void testSetup() throws IOException {

File testParquetFile = temp.newFile();
testParquetFile.delete();

TEST_FILE = new Path(testParquetFile.toURI());
// Create a file with many columns
StringBuilder schemaBuilder = new StringBuilder("message test_schema {");
for (int i = 0; i < 2000; i++) {
schemaBuilder.append("required int64 col_").append(i).append(";");
}
schemaBuilder.append("}");

schema = MessageTypeParser.parseMessageType(schemaBuilder.toString());

Configuration conf = new Configuration();
GroupWriteSupport.setSchema(schema, conf);

try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(HadoopOutputFile.fromPath(TEST_FILE, conf))
.withConf(conf)
.withType(schema)
.build()) {

SimpleGroupFactory factory = new SimpleGroupFactory(schema);
Group group = factory.newGroup();
for (int col = 0; col < 2000; col++) {
group.append("col_" + col, 1L);
}
writer.write(group);
}
}

/**
* Test reading a file with many columns using custom max message size
*/
@Test
public void testReadFileWithManyColumns() throws IOException {
Configuration readConf = new Configuration();
readConf.setInt("parquet.thrift.string.size.limit", 200 * 1024 * 1024);

ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();

try (ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {

ParquetMetadata metadata = reader.getFooter();
assertNotNull(metadata);
assertEquals(schema, metadata.getFileMetaData().getSchema());
assertTrue(metadata.getBlocks().size() > 0);
}
}

/**
* Test that default configuration works for normal files
*/
@Test
public void testReadNormalFileWithDefaultConfig() throws IOException {
// Read with default configuration (no custom max message size)
Configuration readConf = new Configuration();
ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();

try (ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {

ParquetMetadata metadata = reader.getFooter();
assertNotNull(metadata);
assertEquals(1, metadata.getBlocks().get(0).getRowCount());
}
}

/**
* Test that insufficient max message size produces error
*/
@Test
public void testInsufficientMaxMessageSizeError() throws IOException {
// Try to read with very small max message size
Configuration readConf = new Configuration();
readConf.setInt("parquet.thrift.string.size.limit", 1); // Only 1 byte

ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();

try (ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
fail("Should have thrown Message size exceeds limit due to MaxMessageSize");
} catch (IOException e) {
e.printStackTrace();
assertTrue(
"Error should mention TTransportException",
e.getMessage().contains("Message size exceeds limit")
|| e.getCause().getMessage().contains("Message size exceeds limit")
|| e.getMessage().contains("MaxMessageSize reached")
|| e.getCause().getMessage().contains("MaxMessageSize reached"));
}
}
}