Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DateMilliVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand All @@ -43,6 +46,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -240,7 +244,22 @@ private List<String> getSchemaColumnNames() {
}

private Object extractValue(FieldVector vec, int row) {
if (vec instanceof BigIntVector) {
// Date / Timestamp checks must come BEFORE the BigIntVector/IntVector branches: although
// they hold int/long underneath, DateDayVector / TimeStampVector do NOT extend
// IntVector / BigIntVector, so without these branches Date columns fall through to the
// generic getObject().toString() path and produce strings that don't match TSDataType.DATE.
if (vec instanceof DateDayVector) {
// Days since 1970-01-01. ValueConverter.toLocalDate handles Integer → LocalDate.
return ((DateDayVector) vec).get(row);
} else if (vec instanceof DateMilliVector) {
// Millis since 1970-01-01; collapse to date.
long millis = ((DateMilliVector) vec).get(row);
return LocalDate.ofEpochDay(Math.floorDiv(millis, 86_400_000L));
} else if (vec instanceof TimeStampVector) {
// Long in the vector's native precision; matches the precision detected by
// detectTimestampPrecision() and stored on the schema.
return ((TimeStampVector) vec).get(row);
} else if (vec instanceof BigIntVector) {
return ((BigIntVector) vec).get(row);
} else if (vec instanceof IntVector) {
return ((IntVector) vec).get(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,57 @@ private void ensureReaderOpen() throws IOException {
}
}

private String[] splitLine(String line) {
return line.split(separator, -1);
/**
* RFC 4180-style tokenizer that handles quoted fields with embedded delimiters and escaped quotes
* ({@code ""}). Multi-line quoted records are not supported — quoted values must not contain line
* breaks, since the surrounding read loop is line-oriented.
*/
String[] splitLine(String line) {
if (line.indexOf('"') < 0) {
return line.split(separator, -1);
}
List<String> tokens = new ArrayList<>();
StringBuilder cur = new StringBuilder();
boolean inQuotes = false;
boolean fieldStart = true;
int sepLen = separator.length();
int i = 0;
while (i < line.length()) {
char c = line.charAt(i);
if (inQuotes) {
if (c == '"') {
if (i + 1 < line.length() && line.charAt(i + 1) == '"') {
cur.append('"');
i += 2;
continue;
}
inQuotes = false;
i++;
continue;
}
cur.append(c);
i++;
} else {
if (fieldStart && c == '"') {
inQuotes = true;
fieldStart = false;
i++;
continue;
}
if (line.regionMatches(i, separator, 0, sepLen)) {
tokens.add(cur.toString());
cur.setLength(0);
fieldStart = true;
i += sepLen;
continue;
}
cur.append(c);
fieldStart = false;
i++;
}
}
tokens.add(cur.toString());
return tokens.toArray(new String[0]);
}

private Object[] parseLine(String line) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.DateTimeException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -448,12 +449,56 @@ public static long convertTimestampOrDatetimeStrToLongWithDefaultZone(
}

public static long convertDatetimeStrToLong(String str, ZoneId zoneId) {
return convertDatetimeStrToLong(str, toZoneOffset(zoneId), 0, "ms");
return convertDatetimeStrToLongWithZoneId(str, zoneId, 0, "ms");
}

public static long convertDatetimeStrToLong(
String str, ZoneId zoneId, String timestampPrecision) {
return convertDatetimeStrToLong(str, toZoneOffset(zoneId), 0, timestampPrecision);
return convertDatetimeStrToLongWithZoneId(str, zoneId, 0, timestampPrecision);
}

/**
* Resolve the offset based on the actual local datetime in the string (so DST is honored),
* instead of collapsing the {@link ZoneId} to a single offset using {@code Instant.now()}.
*/
private static long convertDatetimeStrToLongWithZoneId(
String str, ZoneId zoneId, int depth, String timestampPrecision) {
if (depth >= 2) {
throw new DateTimeException(
String.format(
"Failed to convert %s to millisecond, zone id is %s, "
+ "please input like 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00",
str, zoneId));
}
if (str.contains("Z")) {
return convertDatetimeStrToLongWithZoneId(
str.substring(0, str.indexOf('Z')) + "+00:00", zoneId, depth, timestampPrecision);
} else if (str.length() == 10) {
return convertDatetimeStrToLongWithZoneId(
str + "T00:00:00", zoneId, depth, timestampPrecision);
} else if (str.length() - str.lastIndexOf('+') != 6
&& str.length() - str.lastIndexOf('-') != 6) {
ZoneOffset offset = resolveLocalOffset(str, zoneId);
return convertDatetimeStrToLongWithZoneId(
str + offset, zoneId, depth + 1, timestampPrecision);
} else if (str.contains("[") || str.contains("]")) {
throw new DateTimeException(
String.format(
"%s with [time-region] at end is not supported now, "
+ "please input like 2011-12-03T10:15:30 or 2011-12-03T10:15:30+01:00",
str));
}
return getInstantWithPrecision(str, timestampPrecision);
}

private static ZoneOffset resolveLocalOffset(String str, ZoneId zoneId) {
String normalized = str.replace('/', '-').replace('.', '-').replace(' ', 'T');
try {
LocalDateTime ldt = LocalDateTime.parse(normalized);
return zoneId.getRules().getOffset(ldt);
} catch (DateTimeParseException e) {
return zoneId.getRules().getOffset(Instant.now());
}
}

public static long getInstantWithPrecision(String str, String timestampPrecision) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -254,12 +256,29 @@ private Object extractValue(Group group, int fieldIndex) {
}
return group.getBinary(fieldIndex, 0).getBytes();
case INT96:
return group.getBinary(fieldIndex, 0).getBytes();
// Use getInt96 — INT96 values are Int96Value, not BinaryValue, so getBinary throws CCE.
return int96ToEpochNanos(group.getInt96(fieldIndex, 0).getBytes());
default:
return group.getValueToString(fieldIndex, 0);
}
}

/**
* Decode a legacy Parquet INT96 timestamp (12 bytes: 8-byte little-endian nanoseconds-of-day +
* 4-byte little-endian Julian day number) to nanoseconds since the Unix epoch.
*/
static long int96ToEpochNanos(byte[] bytes) {
if (bytes == null || bytes.length != 12) {
throw new IllegalArgumentException(
"INT96 timestamp must be 12 bytes, got " + (bytes == null ? 0 : bytes.length));
}
ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
long nanosOfDay = buf.getLong();
int julianDay = buf.getInt();
long daysSinceEpoch = (long) julianDay - 2440588L;
return Math.addExact(Math.multiplyExact(daysSinceEpoch, 86_400_000_000_000L), nanosOfDay);
}

static TSDataType mapParquetType(PrimitiveType pt) {
LogicalTypeAnnotation logical = pt.getLogicalTypeAnnotation();

Expand Down Expand Up @@ -310,6 +329,9 @@ static String detectTimestampPrecision(PrimitiveType pt) {
return null;
}
}
if (pt.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
return "ns";
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ public Tablet build(SourceBatch batch) {
}

boolean isMeasurement = tableSchema.getColumnTypes().get(col) == ColumnCategory.FIELD;
Object converted = ValueConverter.convert(rawValue, colSchema.getType(), isMeasurement);
Object converted =
ValueConverter.convert(
rawValue, colSchema.getType(), isMeasurement, importSchema.getTimePrecision());
tablet.addValue(colName, i, converted);
}
}
Expand Down
33 changes: 19 additions & 14 deletions java/tools/src/main/java/org/apache/tsfile/tools/TsFileTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,29 +112,34 @@ private static void processDirectory(File directory, ExecutorService executor) {

private static boolean isAcceptedFormat(String fileName) {
String lower = fileName.toLowerCase();
String fmt = resolveFormat(fileName);
String detected = detectFormatByExtension(lower);
if (formatStr != null) {
return fmt.equals(formatStr);
return formatStr.equals(detected);
}
return lower.endsWith(".csv")
|| lower.endsWith(".parquet")
|| lower.endsWith(".arrow")
|| lower.endsWith(".ipc")
|| lower.endsWith(".feather");
return detected != null;
}

private static String resolveFormat(String fileName) {
if (formatStr != null) {
return formatStr;
private static String detectFormatByExtension(String lowerName) {
if (lowerName.endsWith(".csv")) {
return "csv";
}
String lower = fileName.toLowerCase();
if (lower.endsWith(".parquet")) {
if (lowerName.endsWith(".parquet")) {
return "parquet";
}
if (lower.endsWith(".arrow") || lower.endsWith(".ipc") || lower.endsWith(".feather")) {
if (lowerName.endsWith(".arrow")
|| lowerName.endsWith(".ipc")
|| lowerName.endsWith(".feather")) {
return "arrow";
}
return "csv";
return null;
}

private static String resolveFormat(String fileName) {
if (formatStr != null) {
return formatStr;
}
String detected = detectFormatByExtension(fileName.toLowerCase());
return detected != null ? detected : "csv";
}

private static void processFile(File inputFile, ExecutorService executor) {
Expand Down
Loading