diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/ArrowSourceReader.java b/java/tools/src/main/java/org/apache/tsfile/tools/ArrowSourceReader.java index 683396783..bb6a09587 100644 --- a/java/tools/src/main/java/org/apache/tsfile/tools/ArrowSourceReader.java +++ b/java/tools/src/main/java/org/apache/tsfile/tools/ArrowSourceReader.java @@ -24,10 +24,13 @@ import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DateMilliVector; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.Float4Vector; import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.TimeStampVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -43,6 +46,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -240,7 +244,22 @@ private List getSchemaColumnNames() { } private Object extractValue(FieldVector vec, int row) { - if (vec instanceof BigIntVector) { + // Date / Timestamp checks must come BEFORE the BigIntVector/IntVector branches: although + // they hold int/long underneath, DateDayVector / TimeStampVector do NOT extend + // IntVector / BigIntVector, so without these branches Date columns fall through to the + // generic getObject().toString() path and produce strings that don't match TSDataType.DATE. + if (vec instanceof DateDayVector) { + // Days since 1970-01-01. ValueConverter.toLocalDate handles Integer → LocalDate. + return ((DateDayVector) vec).get(row); + } else if (vec instanceof DateMilliVector) { + // Millis since 1970-01-01; collapse to date. + long millis = ((DateMilliVector) vec).get(row); + return LocalDate.ofEpochDay(Math.floorDiv(millis, 86_400_000L)); + } else if (vec instanceof TimeStampVector) { + // Long in the vector's native precision; matches the precision detected by + // detectTimestampPrecision() and stored on the schema. + return ((TimeStampVector) vec).get(row); + } else if (vec instanceof BigIntVector) { return ((BigIntVector) vec).get(row); } else if (vec instanceof IntVector) { return ((IntVector) vec).get(row); diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/CsvSourceReader.java b/java/tools/src/main/java/org/apache/tsfile/tools/CsvSourceReader.java index 7c4dcf1ff..81911a702 100644 --- a/java/tools/src/main/java/org/apache/tsfile/tools/CsvSourceReader.java +++ b/java/tools/src/main/java/org/apache/tsfile/tools/CsvSourceReader.java @@ -225,8 +225,57 @@ private void ensureReaderOpen() throws IOException { } } - private String[] splitLine(String line) { - return line.split(separator, -1); + /** + * RFC 4180-style tokenizer that handles quoted fields with embedded delimiters and escaped quotes + * ({@code ""}). Multi-line quoted records are not supported — quoted values must not contain line + * breaks, since the surrounding read loop is line-oriented. + */ + String[] splitLine(String line) { + if (line.indexOf('"') < 0) { + return line.split(separator, -1); + } + List tokens = new ArrayList<>(); + StringBuilder cur = new StringBuilder(); + boolean inQuotes = false; + boolean fieldStart = true; + int sepLen = separator.length(); + int i = 0; + while (i < line.length()) { + char c = line.charAt(i); + if (inQuotes) { + if (c == '"') { + if (i + 1 < line.length() && line.charAt(i + 1) == '"') { + cur.append('"'); + i += 2; + continue; + } + inQuotes = false; + i++; + continue; + } + cur.append(c); + i++; + } else { + if (fieldStart && c == '"') { + inQuotes = true; + fieldStart = false; + i++; + continue; + } + if (line.regionMatches(i, separator, 0, sepLen)) { + tokens.add(cur.toString()); + cur.setLength(0); + fieldStart = true; + i += sepLen; + continue; + } + cur.append(c); + fieldStart = false; + i++; + } + } + tokens.add(cur.toString()); + return tokens.toArray(new String[0]); } private Object[] parseLine(String line) { diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/DateTimeUtils.java b/java/tools/src/main/java/org/apache/tsfile/tools/DateTimeUtils.java index 04452d3f5..3b9ef61a1 100644 --- a/java/tools/src/main/java/org/apache/tsfile/tools/DateTimeUtils.java +++ b/java/tools/src/main/java/org/apache/tsfile/tools/DateTimeUtils.java @@ -20,6 +20,7 @@ import java.time.DateTimeException; import java.time.Instant; +import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZoneOffset; import java.time.ZonedDateTime; @@ -448,12 +449,56 @@ public static long convertTimestampOrDatetimeStrToLongWithDefaultZone( } public static long convertDatetimeStrToLong(String str, ZoneId zoneId) { - return convertDatetimeStrToLong(str, toZoneOffset(zoneId), 0, "ms"); + return convertDatetimeStrToLongWithZoneId(str, zoneId, 0, "ms"); } public static long convertDatetimeStrToLong( String str, ZoneId zoneId, String timestampPrecision) { - return convertDatetimeStrToLong(str, toZoneOffset(zoneId), 0, timestampPrecision); + return convertDatetimeStrToLongWithZoneId(str, zoneId, 0, timestampPrecision); + } + + /** + * Resolve the offset based on the actual local datetime in the string (so DST is honored), + * instead of collapsing the {@link ZoneId} to a single offset using {@code Instant.now()}. + */ + private static long convertDatetimeStrToLongWithZoneId( + String str, ZoneId zoneId, int depth, String timestampPrecision) { + if (depth >= 2) { + throw new DateTimeException( + String.format( + "Failed to convert %s to millisecond, zone id is %s, " + + "please input like 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00", + str, zoneId)); + } + if (str.contains("Z")) { + return convertDatetimeStrToLongWithZoneId( + str.substring(0, str.indexOf('Z')) + "+00:00", zoneId, depth, timestampPrecision); + } else if (str.length() == 10) { + return convertDatetimeStrToLongWithZoneId( + str + "T00:00:00", zoneId, depth, timestampPrecision); + } else if (str.length() - str.lastIndexOf('+') != 6 + && str.length() - str.lastIndexOf('-') != 6) { + ZoneOffset offset = resolveLocalOffset(str, zoneId); + return convertDatetimeStrToLongWithZoneId( + str + offset, zoneId, depth + 1, timestampPrecision); + } else if (str.contains("[") || str.contains("]")) { + throw new DateTimeException( + String.format( + "%s with [time-region] at end is not supported now, " + + "please input like 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00", + str)); + } + return getInstantWithPrecision(str, timestampPrecision); + } + + private static ZoneOffset resolveLocalOffset(String str, ZoneId zoneId) { + String normalized = str.replace('/', '-').replace('.', '-').replace(' ', 'T'); + try { + LocalDateTime ldt = LocalDateTime.parse(normalized); + return zoneId.getRules().getOffset(ldt); + } catch (DateTimeParseException e) { + return zoneId.getRules().getOffset(Instant.now()); + } } public static long getInstantWithPrecision(String str, String timestampPrecision) { diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/ParquetSourceReader.java b/java/tools/src/main/java/org/apache/tsfile/tools/ParquetSourceReader.java index fcbf3f468..d8a3c241b 100644 --- a/java/tools/src/main/java/org/apache/tsfile/tools/ParquetSourceReader.java +++ b/java/tools/src/main/java/org/apache/tsfile/tools/ParquetSourceReader.java @@ -37,6 +37,8 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -254,12 +256,29 @@ private Object extractValue(Group group, int fieldIndex) { } return group.getBinary(fieldIndex, 0).getBytes(); case INT96: - return group.getBinary(fieldIndex, 0).getBytes(); + // Use getInt96 — INT96 values are Int96Value, not BinaryValue, so getBinary throws CCE. + return int96ToEpochNanos(group.getInt96(fieldIndex, 0).getBytes()); default: return group.getValueToString(fieldIndex, 0); } } + /** + * Decode a legacy Parquet INT96 timestamp (12 bytes: 8-byte little-endian nanoseconds-of-day + + * 4-byte little-endian Julian day number) to nanoseconds since the Unix epoch. + */ + static long int96ToEpochNanos(byte[] bytes) { + if (bytes == null || bytes.length != 12) { + throw new IllegalArgumentException( + "INT96 timestamp must be 12 bytes, got " + (bytes == null ? 0 : bytes.length)); + } + ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + long nanosOfDay = buf.getLong(); + int julianDay = buf.getInt(); + long daysSinceEpoch = (long) julianDay - 2440588L; + return Math.addExact(Math.multiplyExact(daysSinceEpoch, 86_400_000_000_000L), nanosOfDay); + } + static TSDataType mapParquetType(PrimitiveType pt) { LogicalTypeAnnotation logical = pt.getLogicalTypeAnnotation(); @@ -310,6 +329,9 @@ static String detectTimestampPrecision(PrimitiveType pt) { return null; } } + if (pt.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { + return "ns"; + } return null; } } diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/TabletBuilder.java b/java/tools/src/main/java/org/apache/tsfile/tools/TabletBuilder.java index 26a957fdb..3abf5fd78 100644 --- a/java/tools/src/main/java/org/apache/tsfile/tools/TabletBuilder.java +++ b/java/tools/src/main/java/org/apache/tsfile/tools/TabletBuilder.java @@ -90,7 +90,9 @@ public Tablet build(SourceBatch batch) { } boolean isMeasurement = tableSchema.getColumnTypes().get(col) == ColumnCategory.FIELD; - Object converted = ValueConverter.convert(rawValue, colSchema.getType(), isMeasurement); + Object converted = + ValueConverter.convert( + rawValue, colSchema.getType(), isMeasurement, importSchema.getTimePrecision()); tablet.addValue(colName, i, converted); } } diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/TsFileTool.java b/java/tools/src/main/java/org/apache/tsfile/tools/TsFileTool.java index bb68b99e1..dcf32f6f0 100644 --- a/java/tools/src/main/java/org/apache/tsfile/tools/TsFileTool.java +++ b/java/tools/src/main/java/org/apache/tsfile/tools/TsFileTool.java @@ -112,29 +112,34 @@ private static void processDirectory(File directory, ExecutorService executor) { private static boolean isAcceptedFormat(String fileName) { String lower = fileName.toLowerCase(); - String fmt = resolveFormat(fileName); + String detected = detectFormatByExtension(lower); if (formatStr != null) { - return fmt.equals(formatStr); + return formatStr.equals(detected); } - return lower.endsWith(".csv") - || lower.endsWith(".parquet") - || lower.endsWith(".arrow") - || lower.endsWith(".ipc") - || lower.endsWith(".feather"); + return detected != null; } - private static String resolveFormat(String fileName) { - if (formatStr != null) { - return formatStr; + private static String detectFormatByExtension(String lowerName) { + if (lowerName.endsWith(".csv")) { + return "csv"; } - String lower = fileName.toLowerCase(); - if (lower.endsWith(".parquet")) { + if (lowerName.endsWith(".parquet")) { return "parquet"; } - if (lower.endsWith(".arrow") || lower.endsWith(".ipc") || lower.endsWith(".feather")) { + if (lowerName.endsWith(".arrow") + || lowerName.endsWith(".ipc") + || lowerName.endsWith(".feather")) { return "arrow"; } - return "csv"; + return null; + } + + private static String resolveFormat(String fileName) { + if (formatStr != null) { + return formatStr; + } + String detected = detectFormatByExtension(fileName.toLowerCase()); + return detected != null ? detected : "csv"; } private static void processFile(File inputFile, ExecutorService executor) { diff --git a/java/tools/src/main/java/org/apache/tsfile/tools/ValueConverter.java b/java/tools/src/main/java/org/apache/tsfile/tools/ValueConverter.java index d9435eb1c..a45de52f9 100644 --- a/java/tools/src/main/java/org/apache/tsfile/tools/ValueConverter.java +++ b/java/tools/src/main/java/org/apache/tsfile/tools/ValueConverter.java @@ -22,20 +22,31 @@ import org.apache.tsfile.utils.Binary; import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeParseException; public class ValueConverter { public static Object convert(Object value, TSDataType targetType, boolean isMeasurement) { + return convert(value, targetType, isMeasurement, "ms"); + } + + public static Object convert( + Object value, TSDataType targetType, boolean isMeasurement, String timePrecision) { if (value == null) { return null; } if (value instanceof String) { - return fromString((String) value, targetType, isMeasurement); + return fromString((String) value, targetType, isMeasurement, timePrecision); } - return fromObject(value, targetType, isMeasurement); + return fromObject(value, targetType, isMeasurement, timePrecision); } - private static Object fromString(String value, TSDataType targetType, boolean isMeasurement) { + private static Object fromString( + String value, TSDataType targetType, boolean isMeasurement, String timePrecision) { switch (targetType) { case BOOLEAN: return Boolean.valueOf(value); @@ -55,12 +66,17 @@ private static Object fromString(String value, TSDataType targetType, boolean is return value; case BLOB: return new Binary(value.getBytes(StandardCharsets.UTF_8)); + case DATE: + return parseDate(value); + case TIMESTAMP: + return parseTimestamp(value, timePrecision); default: return value; } } - private static Object fromObject(Object value, TSDataType targetType, boolean isMeasurement) { + private static Object fromObject( + Object value, TSDataType targetType, boolean isMeasurement, String timePrecision) { switch (targetType) { case BOOLEAN: if (value instanceof Boolean) { @@ -119,8 +135,89 @@ private static Object fromObject(Object value, TSDataType targetType, boolean is return new Binary((byte[]) value); } return new Binary(value.toString().getBytes(StandardCharsets.UTF_8)); + case DATE: + return toLocalDate(value); + case TIMESTAMP: + return toEpochLong(value, timePrecision); default: return value; } } + + private static LocalDate parseDate(String value) { + String s = value.trim(); + try { + return LocalDate.parse(s, DateTimeUtils.ISO_LOCAL_DATE_WIDTH_1_2); + } catch (DateTimeParseException ignored) { + // try next format + } + try { + return LocalDate.parse(s, DateTimeUtils.ISO_LOCAL_DATE_WITH_SLASH); + } catch (DateTimeParseException ignored) { + // try next format + } + try { + return LocalDate.parse(s, DateTimeUtils.ISO_LOCAL_DATE_WITH_DOT); + } catch (DateTimeParseException ignored) { + // try LocalDate.parse default + } + try { + return LocalDate.parse(s); + } catch (DateTimeParseException e) { + throw new IllegalArgumentException("Cannot parse DATE value: " + value, e); + } + } + + private static LocalDate toLocalDate(Object value) { + if (value instanceof LocalDate) { + return (LocalDate) value; + } + if (value instanceof LocalDateTime) { + return ((LocalDateTime) value).toLocalDate(); + } + if (value instanceof Instant) { + return ((Instant) value).atZone(ZoneId.systemDefault()).toLocalDate(); + } + if (value instanceof Number) { + // Parquet/Arrow DATE is days since 1970-01-01 + return LocalDate.ofEpochDay(((Number) value).longValue()); + } + return parseDate(value.toString()); + } + + private static long parseTimestamp(String value, String timePrecision) { + String s = value.trim(); + try { + return Long.parseLong(s); + } catch (NumberFormatException ignored) { + // not numeric — fall through to datetime parsing + } + return DateTimeUtils.convertDatetimeStrToLong(s, ZoneId.systemDefault(), timePrecision); + } + + private static long toEpochLong(Object value, String timePrecision) { + if (value instanceof Long) { + return (Long) value; + } + if (value instanceof Number) { + return ((Number) value).longValue(); + } + if (value instanceof Instant) { + Instant instant = (Instant) value; + switch (timePrecision) { + case "ns": + return Math.addExact( + Math.multiplyExact(instant.getEpochSecond(), 1_000_000_000L), instant.getNano()); + case "us": + return Math.addExact( + Math.multiplyExact(instant.getEpochSecond(), 1_000_000L), instant.getNano() / 1_000); + case "s": + return instant.getEpochSecond(); + case "ms": + default: + return instant.toEpochMilli(); + } + } + return parseTimestamp(value.toString(), timePrecision); + } } diff --git a/java/tools/src/test/java/org/apache/tsfile/tools/ArrowSourceReaderTest.java b/java/tools/src/test/java/org/apache/tsfile/tools/ArrowSourceReaderTest.java index 81cb603f3..685f7514f 100644 --- a/java/tools/src/test/java/org/apache/tsfile/tools/ArrowSourceReaderTest.java +++ b/java/tools/src/test/java/org/apache/tsfile/tools/ArrowSourceReaderTest.java @@ -25,13 +25,19 @@ import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DateMilliVector; import org.apache.arrow.vector.Float4Vector; import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampNanoVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.types.DateUnit; import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; @@ -496,4 +502,175 @@ public void testMultipleRecordBatches() throws Exception { assertNull(reader.readBatch()); } } + + // --- Date / Timestamp vectors are NOT subclasses of IntVector / BigIntVector --- + // Before the fix, these fell through to vec.getObject().toString() — producing a String + // representation that did not match the schema's inferred TSDataType. + + @Test + public void testDateDayVectorReturnsEpochDayInteger() throws Exception { + List fields = new ArrayList<>(); + fields.add(new Field("time", FieldType.notNullable(new ArrowType.Int(64, true)), null)); + fields.add( + new Field("birthday", FieldType.notNullable(new ArrowType.Date(DateUnit.DAY)), null)); + Schema arrowSchema = new Schema(fields); + + File file = + writeArrowFile( + "date_day.arrow", + arrowSchema, + (root, writer) -> { + BigIntVector tv = (BigIntVector) root.getVector("time"); + DateDayVector dv = (DateDayVector) root.getVector("birthday"); + tv.allocateNew(2); + dv.allocateNew(2); + tv.set(0, 1000L); + tv.set(1, 2000L); + dv.set(0, 19737); // 2024-01-15 + dv.set(1, 0); // 1970-01-01 + root.setRowCount(2); + writer.writeBatch(); + }); + + try (ArrowSourceReader reader = new ArrowSourceReader(file)) { + ImportSchema schema = reader.inferSchema(); + assertEquals(TSDataType.DATE, findField(schema.fieldColumns(), "birthday").getDataType()); + + SourceBatch batch = reader.readBatch(); + assertNotNull(batch); + assertEquals(2, batch.getRowCount()); + // Must be an Integer (epoch days), not a stringified value. + assertTrue(batch.getValue(0, 1) instanceof Integer); + assertEquals(19737, batch.getValue(0, 1)); + assertEquals(0, batch.getValue(1, 1)); + } + } + + @Test + public void testDateMilliVectorCollapsesToLocalDate() throws Exception { + List fields = new ArrayList<>(); + fields.add(new Field("time", FieldType.notNullable(new ArrowType.Int(64, true)), null)); + fields.add( + new Field("dob", FieldType.notNullable(new ArrowType.Date(DateUnit.MILLISECOND)), null)); + Schema arrowSchema = new Schema(fields); + + long jan15 = 19737L * 86_400_000L; // 2024-01-15 UTC midnight in epoch ms + File file = + writeArrowFile( + "date_milli.arrow", + arrowSchema, + (root, writer) -> { + BigIntVector tv = (BigIntVector) root.getVector("time"); + DateMilliVector dv = (DateMilliVector) root.getVector("dob"); + tv.allocateNew(1); + dv.allocateNew(1); + tv.set(0, 1000L); + dv.set(0, jan15); + root.setRowCount(1); + writer.writeBatch(); + }); + + try (ArrowSourceReader reader = new ArrowSourceReader(file)) { + reader.inferSchema(); + SourceBatch batch = reader.readBatch(); + assertNotNull(batch); + // DateMilli is collapsed to LocalDate so downstream DATE handling treats it consistently. + assertTrue(batch.getValue(0, 1) instanceof java.time.LocalDate); + assertEquals(java.time.LocalDate.of(2024, 1, 15), batch.getValue(0, 1)); + } + } + + @Test + public void testTimeStampMilliVectorReturnsLong() throws Exception { + List fields = new ArrayList<>(); + fields.add( + new Field( + "time", + FieldType.notNullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)), + null)); + fields.add( + new Field( + "value", + FieldType.notNullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + null)); + Schema arrowSchema = new Schema(fields); + + File file = + writeArrowFile( + "ts_milli.arrow", + arrowSchema, + (root, writer) -> { + TimeStampMilliVector tv = (TimeStampMilliVector) root.getVector("time"); + Float8Vector vv = (Float8Vector) root.getVector("value"); + tv.allocateNew(2); + vv.allocateNew(2); + tv.set(0, 1705276800000L); // 2024-01-15T00:00:00Z + tv.set(1, 1705280400000L); // 2024-01-15T01:00:00Z + vv.set(0, 10.0); + vv.set(1, 20.0); + root.setRowCount(2); + writer.writeBatch(); + }); + + try (ArrowSourceReader reader = new ArrowSourceReader(file)) { + ImportSchema schema = reader.inferSchema(); + assertEquals("ms", schema.getTimePrecision()); + + SourceBatch batch = reader.readBatch(); + assertNotNull(batch); + assertEquals(2, batch.getRowCount()); + assertTrue(batch.getValue(0, 0) instanceof Long); + assertEquals(1705276800000L, batch.getValue(0, 0)); + assertEquals(1705280400000L, batch.getValue(1, 0)); + } + } + + @Test + public void testTimeStampNanoVectorReturnsLongNanos() throws Exception { + List fields = new ArrayList<>(); + fields.add( + new Field( + "time", + FieldType.notNullable(new ArrowType.Timestamp(TimeUnit.NANOSECOND, null)), + null)); + fields.add( + new Field( + "value", + FieldType.notNullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + null)); + Schema arrowSchema = new Schema(fields); + + File file = + writeArrowFile( + "ts_nano.arrow", + arrowSchema, + (root, writer) -> { + TimeStampNanoVector tv = (TimeStampNanoVector) root.getVector("time"); + Float8Vector vv = (Float8Vector) root.getVector("value"); + tv.allocateNew(1); + vv.allocateNew(1); + tv.set(0, 1705276800123456789L); + vv.set(0, 1.5); + root.setRowCount(1); + writer.writeBatch(); + }); + + try (ArrowSourceReader reader = new ArrowSourceReader(file)) { + ImportSchema schema = reader.inferSchema(); + assertEquals("ns", schema.getTimePrecision()); + + SourceBatch batch = reader.readBatch(); + assertNotNull(batch); + assertEquals(1705276800123456789L, batch.getValue(0, 0)); + } + } + + private ImportSchema.SourceColumn findField(List fields, String name) { + for (ImportSchema.SourceColumn f : fields) { + if (f.getName().equals(name)) { + return f; + } + } + throw new AssertionError("Field not found: " + name); + } } diff --git a/java/tools/src/test/java/org/apache/tsfile/tools/CsvSourceReaderTest.java b/java/tools/src/test/java/org/apache/tsfile/tools/CsvSourceReaderTest.java index ea89e6aef..9a99d12ba 100644 --- a/java/tools/src/test/java/org/apache/tsfile/tools/CsvSourceReaderTest.java +++ b/java/tools/src/test/java/org/apache/tsfile/tools/CsvSourceReaderTest.java @@ -432,4 +432,109 @@ public void testMultipleBatchesReturnAllData() throws Exception { assertEquals(50, totalRows); } } + + // --- Quoted field tokenizer (RFC 4180-ish) --- + + @Test + public void testSplitLineNoQuotesUsesFastPath() throws Exception { + CsvSourceReader reader = + new CsvSourceReader(new File(testDir, "dummy.csv"), ",", DEFAULT_CHUNK_SIZE_FOR_TEST); + String[] tokens = reader.splitLine("1000,2.5,hello"); + assertEquals(3, tokens.length); + assertEquals("1000", tokens[0]); + assertEquals("2.5", tokens[1]); + assertEquals("hello", tokens[2]); + } + + @Test + public void testSplitLineQuotedFieldWithEmbeddedComma() throws Exception { + CsvSourceReader reader = + new CsvSourceReader(new File(testDir, "dummy.csv"), ",", DEFAULT_CHUNK_SIZE_FOR_TEST); + String[] tokens = reader.splitLine("1000,\"hello,world\",2.5"); + assertEquals(3, tokens.length); + assertEquals("1000", tokens[0]); + assertEquals("hello,world", tokens[1]); + assertEquals("2.5", tokens[2]); + } + + @Test + public void testSplitLineEscapedDoubleQuotes() throws Exception { + CsvSourceReader reader = + new CsvSourceReader(new File(testDir, "dummy.csv"), ",", DEFAULT_CHUNK_SIZE_FOR_TEST); + String[] tokens = reader.splitLine("1000,\"she said \"\"hi\"\"\",done"); + assertEquals(3, tokens.length); + assertEquals("1000", tokens[0]); + assertEquals("she said \"hi\"", tokens[1]); + assertEquals("done", tokens[2]); + } + + @Test + public void testSplitLineEmptyQuotedField() throws Exception { + CsvSourceReader reader = + new CsvSourceReader(new File(testDir, "dummy.csv"), ",", DEFAULT_CHUNK_SIZE_FOR_TEST); + String[] tokens = reader.splitLine("1000,\"\",2.5"); + assertEquals(3, tokens.length); + assertEquals("", tokens[1]); + } + + @Test + public void testSplitLineTrailingEmptyField() throws Exception { + CsvSourceReader reader = + new CsvSourceReader(new File(testDir, "dummy.csv"), ",", DEFAULT_CHUNK_SIZE_FOR_TEST); + String[] tokens = reader.splitLine("1000,2.5,"); + assertEquals(3, tokens.length); + assertEquals("", tokens[2]); + } + + @Test + public void testSplitLineMultipleQuotedFields() throws Exception { + CsvSourceReader reader = + new CsvSourceReader(new File(testDir, "dummy.csv"), ",", DEFAULT_CHUNK_SIZE_FOR_TEST); + String[] tokens = reader.splitLine("\"a,b\",\"c,d\",\"e\""); + assertEquals(3, tokens.length); + assertEquals("a,b", tokens[0]); + assertEquals("c,d", tokens[1]); + assertEquals("e", tokens[2]); + } + + @Test + public void testSplitLineTabSeparator() throws Exception { + CsvSourceReader reader = + new CsvSourceReader(new File(testDir, "dummy.csv"), "\t", DEFAULT_CHUNK_SIZE_FOR_TEST); + String[] tokens = reader.splitLine("1000\t\"hello\tworld\"\t2.5"); + assertEquals(3, tokens.length); + assertEquals("hello\tworld", tokens[1]); + } + + @Test + public void testReadBatchWithQuotedFields() throws Exception { + File csv = + writeCsv( + "quoted.csv", + "time,note,value\n" + + "1000,\"hello, world\",1.5\n" + + "2000,\"she said \"\"hi\"\"\",2.5\n" + + "3000,plain,3.5\n"); + + ImportSchema schema = + buildSchema( + "time", + null, + new String[] {"time", "INT64"}, + new String[] {"note", "STRING"}, + new String[] {"value", "DOUBLE"}); + + try (CsvSourceReader reader = new CsvSourceReader(csv, schema)) { + SourceBatch batch = reader.readBatch(); + assertNotNull(batch); + assertEquals(3, batch.getRowCount()); + assertEquals("hello, world", batch.getValue(0, 1)); + assertEquals("she said \"hi\"", batch.getValue(1, 1)); + assertEquals("plain", batch.getValue(2, 1)); + assertEquals("1.5", batch.getValue(0, 2)); + assertEquals("2.5", batch.getValue(1, 2)); + } + } + + private static final long DEFAULT_CHUNK_SIZE_FOR_TEST = 1024 * 1024; } diff --git a/java/tools/src/test/java/org/apache/tsfile/tools/DateTimeUtilsTest.java b/java/tools/src/test/java/org/apache/tsfile/tools/DateTimeUtilsTest.java new file mode 100644 index 000000000..2cb5dad47 --- /dev/null +++ b/java/tools/src/test/java/org/apache/tsfile/tools/DateTimeUtilsTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.tools; + +import org.junit.Test; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; + +import static org.junit.Assert.assertEquals; + +public class DateTimeUtilsTest { + + private static final ZoneId NY = ZoneId.of("America/New_York"); + + /** + * 2024-01-15 (winter) in NY is UTC-5 (EST). Resolving the offset must NOT depend on the current + * wall-clock time of the JVM, which previously used Instant.now() and would set EDT (UTC-4) when + * the program ran in summer. + */ + @Test + public void testWinterDateGetsStandardOffset() { + long actual = DateTimeUtils.convertDatetimeStrToLong("2024-01-15T12:00:00", NY, "ms"); + long expected = + LocalDateTime.of(2024, 1, 15, 12, 0).toInstant(ZoneOffset.ofHours(-5)).toEpochMilli(); + assertEquals(expected, actual); + } + + /** 2024-07-15 (summer) in NY is UTC-4 (EDT). */ + @Test + public void testSummerDateGetsDaylightOffset() { + long actual = DateTimeUtils.convertDatetimeStrToLong("2024-07-15T12:00:00", NY, "ms"); + long expected = + LocalDateTime.of(2024, 7, 15, 12, 0).toInstant(ZoneOffset.ofHours(-4)).toEpochMilli(); + assertEquals(expected, actual); + } + + /** A summer and a winter date in NY should be exactly one hour apart at the same wall time. */ + @Test + public void testWinterVsSummerOffsetDiffers() { + long winter = DateTimeUtils.convertDatetimeStrToLong("2024-01-15T12:00:00", NY, "ms"); + long summer = DateTimeUtils.convertDatetimeStrToLong("2024-07-15T12:00:00", NY, "ms"); + // Both reference the same date 6 months apart at the same wall-clock time. + // The summer one is "later" in absolute terms by 6 months minus 1h of DST shift. + long winterUtc = + LocalDateTime.of(2024, 1, 15, 12, 0).toInstant(ZoneOffset.ofHours(-5)).toEpochMilli(); + long summerUtc = + LocalDateTime.of(2024, 7, 15, 12, 0).toInstant(ZoneOffset.ofHours(-4)).toEpochMilli(); + assertEquals(winterUtc, winter); + assertEquals(summerUtc, summer); + } + + /** Fixed-offset zones must keep working unchanged. */ + @Test + public void testFixedOffsetZoneUnchanged() { + long actual = + DateTimeUtils.convertDatetimeStrToLong("2024-01-15T12:00:00", ZoneOffset.ofHours(8), "ms"); + long expected = + LocalDateTime.of(2024, 1, 15, 12, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli(); + assertEquals(expected, actual); + } + + /** Date-only strings still get expanded to midnight. */ + @Test + public void testDateOnlyString() { + long actual = DateTimeUtils.convertDatetimeStrToLong("2024-01-15", NY, "ms"); + long expected = + LocalDateTime.of(2024, 1, 15, 0, 0).toInstant(ZoneOffset.ofHours(-5)).toEpochMilli(); + assertEquals(expected, actual); + } + + /** Strings with explicit offset must ignore the supplied ZoneId. */ + @Test + public void testExplicitOffsetIsRespected() { + long actual = DateTimeUtils.convertDatetimeStrToLong("2024-01-15T12:00:00+09:00", NY, "ms"); + long expected = + LocalDateTime.of(2024, 1, 15, 12, 0).toInstant(ZoneOffset.ofHours(9)).toEpochMilli(); + assertEquals(expected, actual); + } +} diff --git a/java/tools/src/test/java/org/apache/tsfile/tools/ParquetSourceReaderTest.java b/java/tools/src/test/java/org/apache/tsfile/tools/ParquetSourceReaderTest.java index 984be8aa7..424ae9cc4 100644 --- a/java/tools/src/test/java/org/apache/tsfile/tools/ParquetSourceReaderTest.java +++ b/java/tools/src/test/java/org/apache/tsfile/tools/ParquetSourceReaderTest.java @@ -464,4 +464,49 @@ public void testEmptyFile() throws Exception { assertNull(batch); } } + + // --- INT96 timestamp decoding --- + + @Test + public void testInt96UnixEpoch() { + // Julian day 2440588 = 1970-01-01; 0 nanos-of-day → 0 epoch nanos + byte[] bytes = makeInt96(2440588, 0L); + assertEquals(0L, ParquetSourceReader.int96ToEpochNanos(bytes)); + } + + @Test + public void testInt96KnownDateMillis() { + // 2024-01-15 00:00:00 UTC: epoch days = 19737, julian day = 19737 + 2440588 = 2460325 + byte[] bytes = makeInt96(2460325, 0L); + long nanos = ParquetSourceReader.int96ToEpochNanos(bytes); + assertEquals(1705276800000L, nanos / 1_000_000L); + } + + @Test + public void testInt96NanosOfDayPreserved() { + // 1970-01-01 + 12:34:56.123456789 of day + long nanosOfDay = ((12L * 3600 + 34 * 60 + 56) * 1_000_000_000L) + 123_456_789L; + byte[] bytes = makeInt96(2440588, nanosOfDay); + assertEquals(nanosOfDay, ParquetSourceReader.int96ToEpochNanos(bytes)); + } + + @Test + public void testInt96BeforeEpoch() { + // 1969-12-31 = julian day 2440587, 0 nanos-of-day → -86400 * 1e9 + byte[] bytes = makeInt96(2440587, 0L); + assertEquals(-86_400_000_000_000L, ParquetSourceReader.int96ToEpochNanos(bytes)); + } + + @Test(expected = IllegalArgumentException.class) + public void testInt96WrongLength() { + ParquetSourceReader.int96ToEpochNanos(new byte[8]); + } + + private static byte[] makeInt96(int julianDay, long nanosOfDay) { + java.nio.ByteBuffer buf = + java.nio.ByteBuffer.allocate(12).order(java.nio.ByteOrder.LITTLE_ENDIAN); + buf.putLong(nanosOfDay); + buf.putInt(julianDay); + return buf.array(); + } } diff --git a/java/tools/src/test/java/org/apache/tsfile/tools/TsFileToolEndToEndTest.java b/java/tools/src/test/java/org/apache/tsfile/tools/TsFileToolEndToEndTest.java new file mode 100644 index 000000000..994854a39 --- /dev/null +++ b/java/tools/src/test/java/org/apache/tsfile/tools/TsFileToolEndToEndTest.java @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.tools; + +import org.apache.tsfile.external.commons.io.FileUtils; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.controller.CachedChunkLoaderImpl; +import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl; +import org.apache.tsfile.read.query.executor.TableQueryExecutor; +import org.apache.tsfile.read.reader.block.TsBlockReader; +import org.apache.tsfile.utils.DateUtils; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.NanoTime; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.LocalOutputFile; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * End-to-end tests that drive TsFileTool.main and verify the produced TsFile contents. These cover + * scenarios that unit tests alone cannot prove out: + * + *
    + *
  1. DATE / TIMESTAMP columns are actually writable through CSV schema mode. + *
  2. Arrow DATE / TIMESTAMP vectors are writable through auto mode. + *
  3. Parquet DATE / TIMESTAMP logical types in auto mode produce correct values. + *
  4. Legacy Parquet INT96 timestamps decode correctly instead of crashing. + *
  5. {@code --format} filtering rejects foreign-extension files in directory mode. + *
+ */ +public class TsFileToolEndToEndTest { + + private final String testDir = "target" + File.separator + "endToEndTest"; + private final String inputDir = testDir + File.separator + "input"; + private final String outputDir = testDir + File.separator + "output"; + private final String failedDir = testDir + File.separator + "failed"; + + @Before + public void setUp() { + new File(testDir).mkdirs(); + new File(inputDir).mkdirs(); + new File(outputDir).mkdirs(); + } + + @After + public void tearDown() throws Exception { + FileUtils.deleteDirectory(new File(testDir)); + } + + // =================================================================================== + // Fix 1 — DATE / TIMESTAMP via CSV schema mode + // =================================================================================== + + @Test + public void testEndToEndCsvDateAndTimestamp() throws Exception { + String csvPath = inputDir + File.separator + "events.csv"; + try (BufferedWriter w = new BufferedWriter(new FileWriter(csvPath))) { + w.write("time,birthday,event_ts\n"); + // time in ms, birthday as DATE, event_ts as TIMESTAMP literal (no offset, JVM zone) + w.write("1000,2024-01-15,2024-01-15T12:00:00+00:00\n"); + w.write("2000,1990-06-20,2024-01-16T08:30:00+00:00\n"); + w.write("3000,2030-12-31,2024-01-17T23:59:59+00:00\n"); + } + + String schemaPath = inputDir + File.separator + "schema.txt"; + try (BufferedWriter w = new BufferedWriter(new FileWriter(schemaPath))) { + w.write("table_name=root.events\n"); + w.write("time_precision=ms\n"); + w.write("has_header=true\n"); + w.write("separator=,\n\n"); + w.write("time_column=time\n"); + w.write("csv_columns\n"); + w.write("time INT64,\n"); + w.write("birthday DATE,\n"); + w.write("event_ts TIMESTAMP\n"); + } + + TsFileTool.main( + new String[] { + "-s" + new File(csvPath).getAbsolutePath(), + "-t" + new File(outputDir).getAbsolutePath(), + "-schema" + new File(schemaPath).getAbsolutePath() + }); + + String tsfile = outputDir + File.separator + "events.tsfile"; + assertTrue("TsFile must be produced", new File(tsfile).exists()); + + try (TsFileSequenceReader seq = new TsFileSequenceReader(tsfile)) { + TableQueryExecutor exec = + new TableQueryExecutor( + new MetadataQuerierByFileImpl(seq), + new CachedChunkLoaderImpl(seq), + TableQueryExecutor.TableQueryOrdering.DEVICE); + TsBlockReader reader = + exec.query("root.events", Arrays.asList("birthday", "event_ts"), null, null, null); + + List birthdays = new ArrayList<>(); + List tsValues = new ArrayList<>(); + while (reader.hasNext()) { + TsBlock block = reader.next(); + for (int i = 0; i < block.getPositionCount(); i++) { + birthdays.add(block.getColumn(0).getInt(i)); + tsValues.add(block.getColumn(1).getLong(i)); + } + } + + assertEquals(3, birthdays.size()); + // TsFile encodes DATE as YYYYMMDD int, decode via DateUtils. + assertEquals(LocalDate.of(2024, 1, 15), DateUtils.parseIntToLocalDate(birthdays.get(0))); + assertEquals(LocalDate.of(1990, 6, 20), DateUtils.parseIntToLocalDate(birthdays.get(1))); + assertEquals(LocalDate.of(2030, 12, 31), DateUtils.parseIntToLocalDate(birthdays.get(2))); + assertEquals(1705320000000L, (long) tsValues.get(0)); + assertEquals(1705393800000L, (long) tsValues.get(1)); + assertEquals(1705535999000L, (long) tsValues.get(2)); + } + } + + // =================================================================================== + // Arrow DATE / TIMESTAMP logical vectors in auto mode + // =================================================================================== + + @Test + public void testEndToEndArrowAutoDateAndTimestamp() throws Exception { + List fields = new ArrayList<>(); + fields.add( + new Field( + "time", + FieldType.notNullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)), + null)); + fields.add( + new Field("birthday", FieldType.notNullable(new ArrowType.Date(DateUnit.DAY)), null)); + fields.add( + new Field( + "event_ts", + FieldType.notNullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)), + null)); + fields.add( + new Field( + "score", + FieldType.notNullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), + null)); + + String arrowPath = inputDir + File.separator + "arrow_dt.arrow"; + writeArrow( + arrowPath, + new Schema(fields), + (root, writer) -> { + TimeStampMilliVector time = (TimeStampMilliVector) root.getVector("time"); + DateDayVector birthday = (DateDayVector) root.getVector("birthday"); + TimeStampMilliVector eventTs = (TimeStampMilliVector) root.getVector("event_ts"); + Float8Vector score = (Float8Vector) root.getVector("score"); + + time.allocateNew(3); + birthday.allocateNew(3); + eventTs.allocateNew(3); + score.allocateNew(3); + + time.set(0, 1705276800000L); + time.set(1, 1705363200000L); + time.set(2, 1705449600000L); + birthday.set(0, 19737); + birthday.set(1, 19738); + birthday.set(2, 19739); + eventTs.set(0, 1705320000000L); + eventTs.set(1, 1705393800000L); + eventTs.set(2, 1705535999000L); + score.set(0, 1.5); + score.set(1, 2.5); + score.set(2, 3.5); + + root.setRowCount(3); + writer.writeBatch(); + }); + + TsFileTool.main( + new String[] { + "-s" + new File(arrowPath).getAbsolutePath(), + "-t" + new File(outputDir).getAbsolutePath(), + "--format", + "arrow" + }); + + String tsfile = outputDir + File.separator + "arrow_dt.tsfile"; + assertTrue("TsFile must be produced", new File(tsfile).exists()); + + try (TsFileSequenceReader seq = new TsFileSequenceReader(tsfile)) { + TableQueryExecutor exec = + new TableQueryExecutor( + new MetadataQuerierByFileImpl(seq), + new CachedChunkLoaderImpl(seq), + TableQueryExecutor.TableQueryOrdering.DEVICE); + TsBlockReader reader = + exec.query("arrow_dt", Arrays.asList("birthday", "event_ts", "score"), null, null, null); + + List times = new ArrayList<>(); + List birthdays = new ArrayList<>(); + List eventTsValues = new ArrayList<>(); + List scores = new ArrayList<>(); + while (reader.hasNext()) { + TsBlock block = reader.next(); + for (int i = 0; i < block.getPositionCount(); i++) { + times.add(block.getTimeByIndex(i)); + birthdays.add(block.getColumn(0).getInt(i)); + eventTsValues.add(block.getColumn(1).getLong(i)); + scores.add(block.getColumn(2).getDouble(i)); + } + } + + assertEquals(3, times.size()); + assertEquals(1705276800000L, (long) times.get(0)); + assertEquals(1705363200000L, (long) times.get(1)); + assertEquals(1705449600000L, (long) times.get(2)); + assertEquals(LocalDate.of(2024, 1, 15), DateUtils.parseIntToLocalDate(birthdays.get(0))); + assertEquals(LocalDate.of(2024, 1, 16), DateUtils.parseIntToLocalDate(birthdays.get(1))); + assertEquals(LocalDate.of(2024, 1, 17), DateUtils.parseIntToLocalDate(birthdays.get(2))); + assertEquals(1705320000000L, (long) eventTsValues.get(0)); + assertEquals(1705393800000L, (long) eventTsValues.get(1)); + assertEquals(1705535999000L, (long) eventTsValues.get(2)); + assertEquals(1.5, scores.get(0), 1e-9); + assertEquals(2.5, scores.get(1), 1e-9); + assertEquals(3.5, scores.get(2), 1e-9); + } + } + + // =================================================================================== + // Fix 2 — Parquet DATE / TIMESTAMP logical types in auto mode + // =================================================================================== + + @Test + public void testEndToEndParquetAutoDateAndTimestamp() throws Exception { + MessageType pqSchema = + Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT64) + .as(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)) + .named("time") + .required(PrimitiveType.PrimitiveTypeName.INT32) + .as(LogicalTypeAnnotation.dateType()) + .named("birthday") + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("score") + .named("auto_dt"); + + String pqPath = inputDir + File.separator + "auto_dt.parquet"; + SimpleGroupFactory factory = new SimpleGroupFactory(pqSchema); + List rows = new ArrayList<>(); + // birthday as epoch day: 2024-01-15 = 19737 + rows.add( + factory.newGroup().append("time", 1000L).append("birthday", 19737).append("score", 1.5)); + rows.add( + factory.newGroup().append("time", 2000L).append("birthday", 19738).append("score", 2.5)); + rows.add( + factory.newGroup().append("time", 3000L).append("birthday", 19739).append("score", 3.5)); + writeParquet(pqPath, pqSchema, rows); + + TsFileTool.main( + new String[] { + "-s" + new File(pqPath).getAbsolutePath(), + "-t" + new File(outputDir).getAbsolutePath(), + "--format", + "parquet" + }); + + String tsfile = outputDir + File.separator + "auto_dt.tsfile"; + assertTrue("TsFile must be produced", new File(tsfile).exists()); + + try (TsFileSequenceReader seq = new TsFileSequenceReader(tsfile)) { + TableQueryExecutor exec = + new TableQueryExecutor( + new MetadataQuerierByFileImpl(seq), + new CachedChunkLoaderImpl(seq), + TableQueryExecutor.TableQueryOrdering.DEVICE); + TsBlockReader reader = + exec.query("auto_dt", Arrays.asList("birthday", "score"), null, null, null); + + List birthdays = new ArrayList<>(); + List scores = new ArrayList<>(); + while (reader.hasNext()) { + TsBlock block = reader.next(); + for (int i = 0; i < block.getPositionCount(); i++) { + birthdays.add(block.getColumn(0).getInt(i)); + scores.add(block.getColumn(1).getDouble(i)); + } + } + + assertEquals(3, birthdays.size()); + assertEquals(LocalDate.of(2024, 1, 15), DateUtils.parseIntToLocalDate(birthdays.get(0))); + assertEquals(LocalDate.of(2024, 1, 16), DateUtils.parseIntToLocalDate(birthdays.get(1))); + assertEquals(LocalDate.of(2024, 1, 17), DateUtils.parseIntToLocalDate(birthdays.get(2))); + assertEquals(1.5, scores.get(0), 1e-9); + assertEquals(2.5, scores.get(1), 1e-9); + assertEquals(3.5, scores.get(2), 1e-9); + } + } + + // =================================================================================== + // Fix 3 — Parquet INT96 timestamp (Spark / Hive legacy format) + // =================================================================================== + + @Test + public void testEndToEndParquetInt96() throws Exception { + MessageType pqSchema = + Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT96) + .named("time") + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("value") + .named("legacy"); + + String pqPath = inputDir + File.separator + "legacy.parquet"; + SimpleGroupFactory factory = new SimpleGroupFactory(pqSchema); + List rows = new ArrayList<>(); + // 2024-01-15 00:00:00 UTC, 01:00:00 UTC, 02:00:00 UTC + rows.add(factory.newGroup().append("time", new NanoTime(2460325, 0L)).append("value", 10.0)); + rows.add( + factory + .newGroup() + .append("time", new NanoTime(2460325, 3_600_000_000_000L)) + .append("value", 20.0)); + rows.add( + factory + .newGroup() + .append("time", new NanoTime(2460325, 7_200_000_000_000L)) + .append("value", 30.0)); + writeParquet(pqPath, pqSchema, rows); + + TsFileTool.main( + new String[] { + "-s" + new File(pqPath).getAbsolutePath(), + "-t" + new File(outputDir).getAbsolutePath(), + "--format", + "parquet" + }); + + String tsfile = outputDir + File.separator + "legacy.tsfile"; + assertTrue("TsFile must be produced", new File(tsfile).exists()); + + try (TsFileSequenceReader seq = new TsFileSequenceReader(tsfile)) { + TableQueryExecutor exec = + new TableQueryExecutor( + new MetadataQuerierByFileImpl(seq), + new CachedChunkLoaderImpl(seq), + TableQueryExecutor.TableQueryOrdering.DEVICE); + TsBlockReader reader = + exec.query("legacy", Collections.singletonList("value"), null, null, null); + + List times = new ArrayList<>(); + List values = new ArrayList<>(); + while (reader.hasNext()) { + TsBlock block = reader.next(); + for (int i = 0; i < block.getPositionCount(); i++) { + times.add(block.getTimeByIndex(i)); + values.add(block.getColumn(0).getDouble(i)); + } + } + + // Auto mode detects INT96 → precision ns + assertEquals(3, times.size()); + assertEquals(1705276800L * 1_000_000_000L, (long) times.get(0)); + assertEquals(1705276800L * 1_000_000_000L + 3_600_000_000_000L, (long) times.get(1)); + assertEquals(1705276800L * 1_000_000_000L + 7_200_000_000_000L, (long) times.get(2)); + assertEquals(10.0, values.get(0), 1e-9); + assertEquals(20.0, values.get(1), 1e-9); + assertEquals(30.0, values.get(2), 1e-9); + } + } + + // =================================================================================== + // Fix 4 — --format filter in directory mode ignores foreign extensions + // =================================================================================== + + @Test + public void testEndToEndFormatFilterIgnoresForeignFiles() throws Exception { + // The CSV gets processed; README.md / build.log / notes.parquet must be ignored + // (not processed, not copied to failed dir). + try (BufferedWriter w = + new BufferedWriter(new FileWriter(inputDir + File.separator + "data.csv"))) { + w.write("time,val\n"); + w.write("1000,10.0\n"); + w.write("2000,20.0\n"); + } + try (BufferedWriter w = + new BufferedWriter(new FileWriter(inputDir + File.separator + "README.md"))) { + w.write("# This is a readme, not data\n"); + } + try (BufferedWriter w = + new BufferedWriter(new FileWriter(inputDir + File.separator + "build.log"))) { + w.write("some log line\n"); + } + // Even a real Parquet file should be skipped when --format=csv. + MessageType pqSchema = + Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT64) + .named("time") + .required(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("v") + .named("notes"); + SimpleGroupFactory pf = new SimpleGroupFactory(pqSchema); + writeParquet( + inputDir + File.separator + "notes.parquet", + pqSchema, + Collections.singletonList(pf.newGroup().append("time", 1000L).append("v", 1.0))); + + TsFileTool.main( + new String[] { + "-s" + new File(inputDir).getAbsolutePath(), + "-t" + new File(outputDir).getAbsolutePath(), + "-fail_dir" + new File(failedDir).getAbsolutePath(), + "--format", + "csv" + }); + + assertTrue(new File(outputDir, "data.tsfile").exists()); + + // No tsfile should have been produced for the foreign files + assertFalse(new File(outputDir, "README.tsfile").exists()); + assertFalse(new File(outputDir, "build.tsfile").exists()); + assertFalse(new File(outputDir, "notes.tsfile").exists()); + + // And they must not have been copied into the failed dir either, since they were + // never processed in the first place. + File fd = new File(failedDir); + if (fd.exists()) { + File[] failed = fd.listFiles(); + String[] names = failed == null ? new String[0] : namesOf(failed); + for (String n : names) { + assertFalse("Foreign file leaked into failed dir: " + n, n.equals("README.md")); + assertFalse("Foreign file leaked into failed dir: " + n, n.equals("build.log")); + assertFalse("Foreign file leaked into failed dir: " + n, n.equals("notes.parquet")); + } + } + } + + // =================================================================================== + // Helpers + // =================================================================================== + + private interface ArrowWriteCallback { + void write(VectorSchemaRoot root, ArrowFileWriter writer) throws IOException; + } + + private static void writeArrow(String path, Schema schema, ArrowWriteCallback callback) + throws IOException { + File file = new File(path); + if (file.exists()) { + file.delete(); + } + try (BufferAllocator allocator = new RootAllocator(); + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + FileOutputStream fos = new FileOutputStream(file); + ArrowFileWriter writer = new ArrowFileWriter(root, null, fos.getChannel())) { + writer.start(); + callback.write(root, writer); + writer.end(); + } + } + + private static void writeParquet(String path, MessageType schema, List rows) + throws IOException { + File file = new File(path); + if (file.exists()) { + file.delete(); + } + try (ParquetWriter writer = + ExampleParquetWriter.builder(new LocalOutputFile(file.toPath())) + .withType(schema) + .withCompressionCodec(CompressionCodecName.UNCOMPRESSED) + .build()) { + for (Group r : rows) { + writer.write(r); + } + } + } + + private static String[] namesOf(File[] files) { + String[] names = new String[files.length]; + for (int i = 0; i < files.length; i++) { + names[i] = files[i].getName(); + } + return names; + } +} diff --git a/java/tools/src/test/java/org/apache/tsfile/tools/TsFileToolFormatFilterTest.java b/java/tools/src/test/java/org/apache/tsfile/tools/TsFileToolFormatFilterTest.java new file mode 100644 index 000000000..a0d5819fb --- /dev/null +++ b/java/tools/src/test/java/org/apache/tsfile/tools/TsFileToolFormatFilterTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.tools; + +import org.junit.After; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TsFileToolFormatFilterTest { + + @After + public void resetFormatStr() throws Exception { + setFormatStr(null); + } + + // --- No --format set: only known extensions are accepted --- + + @Test + public void testNoFormatAcceptsCsv() throws Exception { + setFormatStr(null); + assertTrue(callIsAcceptedFormat("data.csv")); + } + + @Test + public void testNoFormatAcceptsParquet() throws Exception { + setFormatStr(null); + assertTrue(callIsAcceptedFormat("data.parquet")); + } + + @Test + public void testNoFormatAcceptsArrowVariants() throws Exception { + setFormatStr(null); + assertTrue(callIsAcceptedFormat("data.arrow")); + assertTrue(callIsAcceptedFormat("data.ipc")); + assertTrue(callIsAcceptedFormat("data.feather")); + } + + @Test + public void testNoFormatRejectsReadmeAndLog() throws Exception { + setFormatStr(null); + assertFalse(callIsAcceptedFormat("README.md")); + assertFalse(callIsAcceptedFormat("import.log")); + assertFalse(callIsAcceptedFormat("notes.txt")); + } + + // --- --format=csv: only .csv passes; README/.log/.parquet do NOT --- + + @Test + public void testCsvFormatRejectsForeignExtensions() throws Exception { + setFormatStr("csv"); + assertTrue(callIsAcceptedFormat("data.csv")); + assertFalse(callIsAcceptedFormat("README.md")); + assertFalse(callIsAcceptedFormat("import.log")); + assertFalse(callIsAcceptedFormat("data.parquet")); + assertFalse(callIsAcceptedFormat("data.arrow")); + assertFalse(callIsAcceptedFormat("noext")); + } + + // --- --format=parquet: only .parquet passes --- + + @Test + public void testParquetFormatRejectsForeignExtensions() throws Exception { + setFormatStr("parquet"); + assertTrue(callIsAcceptedFormat("data.parquet")); + assertFalse(callIsAcceptedFormat("data.csv")); + assertFalse(callIsAcceptedFormat("README.md")); + } + + // --- --format=arrow: .arrow/.ipc/.feather pass, others rejected --- + + @Test + public void testArrowFormatRejectsForeignExtensions() throws Exception { + setFormatStr("arrow"); + assertTrue(callIsAcceptedFormat("data.arrow")); + assertTrue(callIsAcceptedFormat("data.ipc")); + assertTrue(callIsAcceptedFormat("data.feather")); + assertFalse(callIsAcceptedFormat("data.csv")); + assertFalse(callIsAcceptedFormat("README.md")); + } + + // --- Case-insensitive extension matching --- + + @Test + public void testUpperCaseExtensionAccepted() throws Exception { + setFormatStr("csv"); + assertTrue(callIsAcceptedFormat("DATA.CSV")); + } + + // --- resolveFormat still works for files that already pass the filter --- + + @Test + public void testResolveFormatByExtension() throws Exception { + setFormatStr(null); + assertEquals("csv", callResolveFormat("data.csv")); + assertEquals("parquet", callResolveFormat("data.parquet")); + assertEquals("arrow", callResolveFormat("data.feather")); + } + + // --- helpers --- + + private static void setFormatStr(String value) throws Exception { + Field f = TsFileTool.class.getDeclaredField("formatStr"); + f.setAccessible(true); + f.set(null, value); + } + + private static boolean callIsAcceptedFormat(String fileName) throws Exception { + Method m = TsFileTool.class.getDeclaredMethod("isAcceptedFormat", String.class); + m.setAccessible(true); + return (boolean) m.invoke(null, fileName); + } + + private static String callResolveFormat(String fileName) throws Exception { + Method m = TsFileTool.class.getDeclaredMethod("resolveFormat", String.class); + m.setAccessible(true); + return (String) m.invoke(null, fileName); + } +} diff --git a/java/tools/src/test/java/org/apache/tsfile/tools/ValueConverterTest.java b/java/tools/src/test/java/org/apache/tsfile/tools/ValueConverterTest.java index 0c87e1717..7404649de 100644 --- a/java/tools/src/test/java/org/apache/tsfile/tools/ValueConverterTest.java +++ b/java/tools/src/test/java/org/apache/tsfile/tools/ValueConverterTest.java @@ -25,6 +25,9 @@ import org.junit.Test; import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -184,4 +187,103 @@ public void testObjectToStringForText() { Object result = ValueConverter.convert(12345, TSDataType.TEXT, false); assertEquals("12345", result); } + + // --- DATE --- + + @Test + public void testStringDashToDate() { + Object result = ValueConverter.convert("2024-01-15", TSDataType.DATE, true); + assertEquals(LocalDate.of(2024, 1, 15), result); + } + + @Test + public void testStringSlashToDate() { + Object result = ValueConverter.convert("2024/01/15", TSDataType.DATE, true); + assertEquals(LocalDate.of(2024, 1, 15), result); + } + + @Test + public void testStringDotToDate() { + Object result = ValueConverter.convert("2024.01.15", TSDataType.DATE, true); + assertEquals(LocalDate.of(2024, 1, 15), result); + } + + @Test + public void testLocalDatePassthrough() { + LocalDate d = LocalDate.of(2030, 6, 1); + assertEquals(d, ValueConverter.convert(d, TSDataType.DATE, true)); + } + + @Test + public void testLocalDateTimeToDate() { + LocalDateTime ldt = LocalDateTime.of(2024, 3, 10, 5, 30); + assertEquals(LocalDate.of(2024, 3, 10), ValueConverter.convert(ldt, TSDataType.DATE, true)); + } + + @Test + public void testEpochDayIntegerToDate() { + // 1970-01-01 = epoch day 0; 2024-01-15 = day 19737 + Object result = ValueConverter.convert(19737, TSDataType.DATE, true); + assertEquals(LocalDate.of(2024, 1, 15), result); + } + + @Test + public void testEpochDayLongToDate() { + Object result = ValueConverter.convert(0L, TSDataType.DATE, true); + assertEquals(LocalDate.ofEpochDay(0), result); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidDateThrows() { + ValueConverter.convert("not-a-date", TSDataType.DATE, true); + } + + // --- TIMESTAMP --- + + @Test + public void testStringNumericToTimestamp() { + Object result = ValueConverter.convert("1700000000000", TSDataType.TIMESTAMP, true, "ms"); + assertEquals(1700000000000L, result); + } + + @Test + public void testLongPassthroughToTimestamp() { + Object result = ValueConverter.convert(1700000000000L, TSDataType.TIMESTAMP, true, "ms"); + assertEquals(1700000000000L, result); + } + + @Test + public void testIntegerWidenedToTimestamp() { + Object result = ValueConverter.convert(123, TSDataType.TIMESTAMP, true, "ms"); + assertEquals(123L, result); + } + + @Test + public void testInstantToTimestampMs() { + Instant i = Instant.ofEpochSecond(1700000000L, 500_000_000); + Object result = ValueConverter.convert(i, TSDataType.TIMESTAMP, true, "ms"); + assertEquals(1700000000_500L, result); + } + + @Test + public void testInstantToTimestampUs() { + Instant i = Instant.ofEpochSecond(1700000000L, 500_000_000); + Object result = ValueConverter.convert(i, TSDataType.TIMESTAMP, true, "us"); + assertEquals(1700000000_500_000L, result); + } + + @Test + public void testInstantToTimestampNs() { + Instant i = Instant.ofEpochSecond(1700000000L, 123_456_789); + Object result = ValueConverter.convert(i, TSDataType.TIMESTAMP, true, "ns"); + assertEquals(1700000000_123_456_789L, result); + } + + @Test + public void testIsoDateStringToTimestampMs() { + // 2024-01-15T00:00:00 in UTC = 1705276800000ms + Object result = + ValueConverter.convert("2024-01-15T00:00:00+00:00", TSDataType.TIMESTAMP, true, "ms"); + assertEquals(1705276800000L, result); + } }