Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,12 @@ Result<PartitionPathFormat> HivePartitioning::FormatValues(
// field_index <-> path nesting relation
segments[i] = name + "=" + hive_options_.null_fallback;
} else {
segments[i] = name + "=" + arrow::util::UriEscape(values[i]->ToString());
std::string value_str = values[i]->ToString();
if (segment_encoding() == SegmentEncoding::Uri) {
segments[i] = name + "=" + arrow::util::UriEscape(value_str);
} else {
segments[i] = name + "=" + value_str;
}
}
}

Expand Down
72 changes: 72 additions & 0 deletions cpp/src/arrow/dataset/partition_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,78 @@ TEST_F(TestPartitioning, WriteHiveWithSlashesInValues) {
}
}

TEST_F(TestPartitioning, WriteHiveWithSlashesInValuesDisableUrlEncoding) {
// Test for url_encode_hive_values=false functionality
// Verify that HivePartitioning can write without URL encoding when segment_encoding=None
fs::TimePoint mock_now = std::chrono::system_clock::now();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<fs::FileSystem> filesystem,
fs::internal::MockFileSystem::Make(mock_now, {}));
auto base_path = "";
ASSERT_OK(filesystem->CreateDir(base_path));

// Create an Arrow Table with special characters that would be URL encoded
auto schema = arrow::schema(
{arrow::field("a", arrow::int64()), arrow::field("part", arrow::utf8())});

auto table = TableFromJSON(schema, {
R"([
[0, "test space"],
[1, "test_slash"],
[2, "test&ampersand"],
[3, "test%percent"]
])",
});

// Write it using Datasets with URL encoding DISABLED
auto dataset = std::make_shared<dataset::InMemoryDataset>(table);
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());

auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});

// Create HivePartitioning with SegmentEncoding::None to disable URL encoding
dataset::HivePartitioningOptions hive_options;
hive_options.segment_encoding = dataset::SegmentEncoding::None;
auto partitioning = std::make_shared<dataset::HivePartitioning>(
partition_schema, ArrayVector(), hive_options);

auto ipc_format = std::make_shared<dataset::IpcFileFormat>();
dataset::FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = ipc_format->DefaultWriteOptions();
write_options.filesystem = filesystem;
write_options.base_dir = base_path;
write_options.partitioning = partitioning;
write_options.basename_template = "part{i}.arrow";
ASSERT_OK(dataset::FileSystemDataset::Write(write_options, scanner));

auto mockfs =
arrow::internal::checked_pointer_cast<fs::internal::MockFileSystem>(filesystem);
auto all_dirs = mockfs->AllDirs();

// Verify directories are NOT URL encoded (clean directory names)
// We expect exactly 4 directories, one for each unique partition value
ASSERT_EQ(all_dirs.size(), 4);

// Check that directories contain the expected unencoded values
std::vector<std::string> expected_parts = {"test space", "test_slash", "test&ampersand", "test%percent"};
std::set<std::string> found_parts;

for (const auto& dir : all_dirs) {
// Extract partition value from "part=value" format
std::string dir_path = dir.full_path;
if (dir_path.substr(0, 5) == "part=") {
std::string part_value = dir_path.substr(5); // Remove "part=" prefix
found_parts.insert(part_value);
}
}

// Verify we found all expected partition values without URL encoding
for (const auto& expected_part : expected_parts) {
ASSERT_TRUE(found_parts.count(expected_part) > 0)
<< "Expected partition value '" << expected_part << "' not found";
}
}

TEST_F(TestPartitioning, EtlThenHive) {
FieldVector etl_fields{field("year", int16()), field("month", int8()),
field("day", int8()), field("hour", int8())};
Expand Down
48 changes: 40 additions & 8 deletions python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ def dataset(source, schema=None, format=None, filesystem=None,
)


def _ensure_write_partitioning(part, schema, flavor):
def _ensure_write_partitioning(part, schema, flavor, url_encode_hive_values=True):
if isinstance(part, PartitioningFactory):
raise ValueError("A PartitioningFactory cannot be used. "
"Did you call the partitioning function "
Expand All @@ -824,15 +824,39 @@ def _ensure_write_partitioning(part, schema, flavor):
"Providing a partitioning_flavor with "
"a Partitioning object is not supported"
)
elif isinstance(part, HivePartitioning):
# If a HivePartitioning object is passed directly, recreate it with the
# desired segment_encoding based on url_encode_hive_values
desired_encoding = "uri" if url_encode_hive_values else "none"
part = HivePartitioning(
part.schema,
dictionaries=part.dictionaries,
segment_encoding=desired_encoding
)
elif isinstance(part, (tuple, list)):
# Name of fields were provided instead of a partitioning object.
# Create a partitioning factory with those field names.
part = partitioning(
schema=pa.schema([schema.field(f) for f in part]),
flavor=flavor
)
if flavor == "hive":
# For Hive partitioning, we need to create HivePartitioning directly
# to control segment_encoding for URL encoding behavior
segment_encoding = "uri" if url_encode_hive_values else "none"
part = HivePartitioning(
pa.schema([schema.field(f) for f in part]),
segment_encoding=segment_encoding
)
else:
part = partitioning(
schema=pa.schema([schema.field(f) for f in part]),
flavor=flavor
)
elif part is None:
part = partitioning(pa.schema([]), flavor=flavor)
if flavor == "hive":
# For Hive partitioning, we need to create HivePartitioning directly
# to control segment_encoding for URL encoding behavior
segment_encoding = "uri" if url_encode_hive_values else "none"
part = HivePartitioning(pa.schema([]), segment_encoding=segment_encoding)
else:
part = partitioning(pa.schema([]), flavor=flavor)

if not isinstance(part, Partitioning):
raise ValueError(
Expand All @@ -849,7 +873,8 @@ def write_dataset(data, base_dir, *, basename_template=None, format=None,
preserve_order=False, max_partitions=None, max_open_files=None,
max_rows_per_file=None, min_rows_per_group=None,
max_rows_per_group=None, file_visitor=None,
existing_data_behavior='error', create_dir=True):
existing_data_behavior='error', create_dir=True,
url_encode_hive_values=True):
"""
Write a dataset to a given format and partitioning.

Expand Down Expand Up @@ -957,6 +982,12 @@ def file_visitor(written_file):
create_dir : bool, default True
If False, directories will not be created. This can be useful for
filesystems that do not require directories.
url_encode_hive_values : bool, default True
When using Hive partitioning, whether to URL-encode partition values.
If True (default), special characters in partition values will be
URL-encoded (e.g., "Product A/B" becomes "Product%20A%2FB").
If False, partition values will be used as-is in directory names.
This parameter only affects Hive-style partitioning.
"""
from pyarrow.fs import _resolve_filesystem_and_path

Expand Down Expand Up @@ -1019,7 +1050,8 @@ def file_visitor(written_file):
partitioning_schema = data.schema
partitioning = _ensure_write_partitioning(partitioning,
schema=partitioning_schema,
flavor=partitioning_flavor)
flavor=partitioning_flavor,
url_encode_hive_values=url_encode_hive_values)

filesystem, base_dir = _resolve_filesystem_and_path(base_dir, filesystem)

Expand Down
63 changes: 63 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2383,6 +2383,69 @@ def test_hive_partitioning_dictionary_key(multisourcefs):
assert actual == month_dictionary


def test_hive_partitioning_url_encoding(tempdir):
# Test url_encode_hive_values parameter for write_dataset
# Note: Forward slash (/) cannot be used in directory names on Unix systems,
# so we test with other special characters that are valid in filenames
table = pa.table({
'id': [1, 2, 3],
'category': ['Product A+B', 'Site C&D', 'Normal Item']
})

# Test with URL encoding enabled (default)
encoded_dir = tempdir / 'encoded'
ds.write_dataset(table, encoded_dir, format='ipc',
partitioning=['category'],
partitioning_flavor='hive',
url_encode_hive_values=True)

# Check that directories are URL-encoded
dirs = [d.name for d in encoded_dir.iterdir() if d.is_dir()]
assert 'category=Product%20A%2BB' in dirs
assert 'category=Site%20C%26D' in dirs
assert 'category=Normal%20Item' in dirs

# Test with URL encoding disabled
not_encoded_dir = tempdir / 'not_encoded'
ds.write_dataset(table, not_encoded_dir, format='ipc',
partitioning=['category'],
partitioning_flavor='hive',
url_encode_hive_values=False)

# Check that directories are NOT URL-encoded
dirs = [d.name for d in not_encoded_dir.iterdir() if d.is_dir()]
assert 'category=Product A+B' in dirs
assert 'category=Site C&D' in dirs
assert 'category=Normal Item' in dirs

# Test that both datasets can be read correctly
encoded_dataset = ds.dataset(encoded_dir, format='ipc', partitioning='hive')
not_encoded_dataset = ds.dataset(not_encoded_dir, format='ipc', partitioning='hive')

# Both should read the same data
encoded_table = encoded_dataset.to_table().sort_by('id')
not_encoded_table = not_encoded_dataset.to_table().sort_by('id')
original_table = table.sort_by('id')

assert encoded_table.equals(original_table)
assert not_encoded_table.equals(original_table)

# Test with explicitly created HivePartitioning object
explicit_dir = tempdir / 'explicit'
partitioning = ds.HivePartitioning(
pa.schema([pa.field('category', pa.string())]),
segment_encoding='none'
)
ds.write_dataset(table, explicit_dir, format='ipc',
partitioning=partitioning,
url_encode_hive_values=False) # Should be respected

dirs = [d.name for d in explicit_dir.iterdir() if d.is_dir()]
assert 'category=Product A+B' in dirs
assert 'category=Site C&D' in dirs
assert 'category=Normal Item' in dirs


def _create_single_file(base_dir, table=None, row_group_size=None):
if table is None:
table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5})
Expand Down
Loading
Loading