diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 4d5c6e8ce7a..c2d43c3c3af 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -755,7 +755,12 @@ Result HivePartitioning::FormatValues( // field_index <-> path nesting relation segments[i] = name + "=" + hive_options_.null_fallback; } else { - segments[i] = name + "=" + arrow::util::UriEscape(values[i]->ToString()); + std::string value_str = values[i]->ToString(); + if (segment_encoding() == SegmentEncoding::Uri) { + segments[i] = name + "=" + arrow::util::UriEscape(value_str); + } else { + segments[i] = name + "=" + value_str; + } } } diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 9f0bd7c0be0..9e1990fd665 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -945,6 +945,78 @@ TEST_F(TestPartitioning, WriteHiveWithSlashesInValues) { } } +TEST_F(TestPartitioning, WriteHiveWithSlashesInValuesDisableUrlEncoding) { + // Test for url_encode_hive_values=false functionality + // Verify that HivePartitioning can write without URL encoding when segment_encoding=None + fs::TimePoint mock_now = std::chrono::system_clock::now(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr filesystem, + fs::internal::MockFileSystem::Make(mock_now, {})); + auto base_path = ""; + ASSERT_OK(filesystem->CreateDir(base_path)); + + // Create an Arrow Table with special characters that would be URL encoded + auto schema = arrow::schema( + {arrow::field("a", arrow::int64()), arrow::field("part", arrow::utf8())}); + + auto table = TableFromJSON(schema, { + R"([ + [0, "test space"], + [1, "test_slash"], + [2, "test&ersand"], + [3, "test%percent"] + ])", + }); + + // Write it using Datasets with URL encoding DISABLED + auto dataset = std::make_shared(table); + ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); + ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); + + auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())}); + + // Create HivePartitioning with SegmentEncoding::None to disable URL encoding + dataset::HivePartitioningOptions hive_options; + hive_options.segment_encoding = dataset::SegmentEncoding::None; + auto partitioning = std::make_shared( + partition_schema, ArrayVector(), hive_options); + + auto ipc_format = std::make_shared(); + dataset::FileSystemDatasetWriteOptions write_options; + write_options.file_write_options = ipc_format->DefaultWriteOptions(); + write_options.filesystem = filesystem; + write_options.base_dir = base_path; + write_options.partitioning = partitioning; + write_options.basename_template = "part{i}.arrow"; + ASSERT_OK(dataset::FileSystemDataset::Write(write_options, scanner)); + + auto mockfs = + arrow::internal::checked_pointer_cast(filesystem); + auto all_dirs = mockfs->AllDirs(); + + // Verify directories are NOT URL encoded (clean directory names) + // We expect exactly 4 directories, one for each unique partition value + ASSERT_EQ(all_dirs.size(), 4); + + // Check that directories contain the expected unencoded values + std::vector expected_parts = {"test space", "test_slash", "test&ersand", "test%percent"}; + std::set found_parts; + + for (const auto& dir : all_dirs) { + // Extract partition value from "part=value" format + std::string dir_path = dir.full_path; + if (dir_path.substr(0, 5) == "part=") { + std::string part_value = dir_path.substr(5); // Remove "part=" prefix + found_parts.insert(part_value); + } + } + + // Verify we found all expected partition values without URL encoding + for (const auto& expected_part : expected_parts) { + ASSERT_TRUE(found_parts.count(expected_part) > 0) + << "Expected partition value '" << expected_part << "' not found"; + } +} + TEST_F(TestPartitioning, EtlThenHive) { FieldVector etl_fields{field("year", int16()), field("month", int8()), field("day", int8()), field("hour", int8())}; diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 039da8c0d56..6051391b602 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -813,7 +813,7 @@ def dataset(source, schema=None, format=None, filesystem=None, ) -def _ensure_write_partitioning(part, schema, flavor): +def _ensure_write_partitioning(part, schema, flavor, url_encode_hive_values=True): if isinstance(part, PartitioningFactory): raise ValueError("A PartitioningFactory cannot be used. " "Did you call the partitioning function " @@ -824,15 +824,39 @@ def _ensure_write_partitioning(part, schema, flavor): "Providing a partitioning_flavor with " "a Partitioning object is not supported" ) + elif isinstance(part, HivePartitioning): + # If a HivePartitioning object is passed directly, recreate it with the + # desired segment_encoding based on url_encode_hive_values + desired_encoding = "uri" if url_encode_hive_values else "none" + part = HivePartitioning( + part.schema, + dictionaries=part.dictionaries, + segment_encoding=desired_encoding + ) elif isinstance(part, (tuple, list)): # Name of fields were provided instead of a partitioning object. # Create a partitioning factory with those field names. - part = partitioning( - schema=pa.schema([schema.field(f) for f in part]), - flavor=flavor - ) + if flavor == "hive": + # For Hive partitioning, we need to create HivePartitioning directly + # to control segment_encoding for URL encoding behavior + segment_encoding = "uri" if url_encode_hive_values else "none" + part = HivePartitioning( + pa.schema([schema.field(f) for f in part]), + segment_encoding=segment_encoding + ) + else: + part = partitioning( + schema=pa.schema([schema.field(f) for f in part]), + flavor=flavor + ) elif part is None: - part = partitioning(pa.schema([]), flavor=flavor) + if flavor == "hive": + # For Hive partitioning, we need to create HivePartitioning directly + # to control segment_encoding for URL encoding behavior + segment_encoding = "uri" if url_encode_hive_values else "none" + part = HivePartitioning(pa.schema([]), segment_encoding=segment_encoding) + else: + part = partitioning(pa.schema([]), flavor=flavor) if not isinstance(part, Partitioning): raise ValueError( @@ -849,7 +873,8 @@ def write_dataset(data, base_dir, *, basename_template=None, format=None, preserve_order=False, max_partitions=None, max_open_files=None, max_rows_per_file=None, min_rows_per_group=None, max_rows_per_group=None, file_visitor=None, - existing_data_behavior='error', create_dir=True): + existing_data_behavior='error', create_dir=True, + url_encode_hive_values=True): """ Write a dataset to a given format and partitioning. @@ -957,6 +982,12 @@ def file_visitor(written_file): create_dir : bool, default True If False, directories will not be created. This can be useful for filesystems that do not require directories. + url_encode_hive_values : bool, default True + When using Hive partitioning, whether to URL-encode partition values. + If True (default), special characters in partition values will be + URL-encoded (e.g., "Product A/B" becomes "Product%20A%2FB"). + If False, partition values will be used as-is in directory names. + This parameter only affects Hive-style partitioning. """ from pyarrow.fs import _resolve_filesystem_and_path @@ -1019,7 +1050,8 @@ def file_visitor(written_file): partitioning_schema = data.schema partitioning = _ensure_write_partitioning(partitioning, schema=partitioning_schema, - flavor=partitioning_flavor) + flavor=partitioning_flavor, + url_encode_hive_values=url_encode_hive_values) filesystem, base_dir = _resolve_filesystem_and_path(base_dir, filesystem) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 32bcebb28de..fbbbc99ec58 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -2383,6 +2383,69 @@ def test_hive_partitioning_dictionary_key(multisourcefs): assert actual == month_dictionary +def test_hive_partitioning_url_encoding(tempdir): + # Test url_encode_hive_values parameter for write_dataset + # Note: Forward slash (/) cannot be used in directory names on Unix systems, + # so we test with other special characters that are valid in filenames + table = pa.table({ + 'id': [1, 2, 3], + 'category': ['Product A+B', 'Site C&D', 'Normal Item'] + }) + + # Test with URL encoding enabled (default) + encoded_dir = tempdir / 'encoded' + ds.write_dataset(table, encoded_dir, format='ipc', + partitioning=['category'], + partitioning_flavor='hive', + url_encode_hive_values=True) + + # Check that directories are URL-encoded + dirs = [d.name for d in encoded_dir.iterdir() if d.is_dir()] + assert 'category=Product%20A%2BB' in dirs + assert 'category=Site%20C%26D' in dirs + assert 'category=Normal%20Item' in dirs + + # Test with URL encoding disabled + not_encoded_dir = tempdir / 'not_encoded' + ds.write_dataset(table, not_encoded_dir, format='ipc', + partitioning=['category'], + partitioning_flavor='hive', + url_encode_hive_values=False) + + # Check that directories are NOT URL-encoded + dirs = [d.name for d in not_encoded_dir.iterdir() if d.is_dir()] + assert 'category=Product A+B' in dirs + assert 'category=Site C&D' in dirs + assert 'category=Normal Item' in dirs + + # Test that both datasets can be read correctly + encoded_dataset = ds.dataset(encoded_dir, format='ipc', partitioning='hive') + not_encoded_dataset = ds.dataset(not_encoded_dir, format='ipc', partitioning='hive') + + # Both should read the same data + encoded_table = encoded_dataset.to_table().sort_by('id') + not_encoded_table = not_encoded_dataset.to_table().sort_by('id') + original_table = table.sort_by('id') + + assert encoded_table.equals(original_table) + assert not_encoded_table.equals(original_table) + + # Test with explicitly created HivePartitioning object + explicit_dir = tempdir / 'explicit' + partitioning = ds.HivePartitioning( + pa.schema([pa.field('category', pa.string())]), + segment_encoding='none' + ) + ds.write_dataset(table, explicit_dir, format='ipc', + partitioning=partitioning, + url_encode_hive_values=False) # Should be respected + + dirs = [d.name for d in explicit_dir.iterdir() if d.is_dir()] + assert 'category=Product A+B' in dirs + assert 'category=Site C&D' in dirs + assert 'category=Normal Item' in dirs + + def _create_single_file(base_dir, table=None, row_group_size=None): if table is None: table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5}) diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index 9b6727211b2..fd7988d70ea 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -39,6 +39,8 @@ #' If not specified, it defaults to `"part-{i}."`. #' @param hive_style logical: write partition segments as Hive-style #' (`key1=value1/key2=value2/file.ext`) or as just bare values. Default is `TRUE`. +#' @param url_encode_hive_values logical: URL-encode Hive partition values when +#' using Hive-style partitioning. Default is `TRUE`. Only applies when `hive_style` is `TRUE`. #' @param existing_data_behavior The behavior to use when there is already data #' in the destination directory. Must be one of "overwrite", "error", or #' "delete_matching". @@ -124,13 +126,13 @@ #' write_dataset(two_levels_tree_no_hive, hive_style = FALSE) #' list.files(two_levels_tree_no_hive, recursive = TRUE) #' @export -write_dataset <- function( - dataset, +write_dataset <- function(dataset, path, format = c("parquet", "feather", "arrow", "ipc", "csv", "tsv", "txt", "text"), partitioning = dplyr::group_vars(dataset), basename_template = paste0("part-{i}.", as.character(format)), hive_style = TRUE, + url_encode_hive_values = TRUE, existing_data_behavior = c("overwrite", "error", "delete_matching"), max_partitions = 1024L, max_open_files = 900L, @@ -138,8 +140,7 @@ write_dataset <- function( min_rows_per_group = 0L, max_rows_per_group = bitwShiftL(1, 20), create_directory = TRUE, - ... -) { + ...) { format <- match.arg(format) if (format %in% c("feather", "ipc")) { format <- "arrow" @@ -175,9 +176,11 @@ write_dataset <- function( if (!inherits(partitioning, "Partitioning")) { partition_schema <- final_node$schema[partitioning] if (isTRUE(hive_style)) { + segment_encoding <- if (url_encode_hive_values) "uri" else "none" partitioning <- HivePartitioning$create( partition_schema, - null_fallback = list(...)$null_fallback + null_fallback = list(...)$null_fallback, + segment_encoding = segment_encoding ) } else { partitioning <- DirectoryPartitioning$create(partition_schema) @@ -187,27 +190,27 @@ write_dataset <- function( path_and_fs <- get_path_and_filesystem(path) dots <- list(...) - if (format %in% c("txt", "text") && !any(c("delimiter", "delim") %in% names(dots))) { + # Remove url_encode_hive_values from dots as it's not a FileWriteOptions parameter + file_write_dots <- dots[!names(dots) %in% "url_encode_hive_values"] + + if (format %in% c("txt", "text") && !any(c("delimiter", "delim") %in% names(file_write_dots))) { stop("A delimiter must be given for a txt format.") } - if (format == "tsv" && any(c("delimiter", "delim") %in% names(dots))) { + if (format == "tsv" && any(c("delimiter", "delim") %in% names(file_write_dots))) { stop("Can't set a delimiter for the tsv format.") } output_schema <- final_node$schema # This is a workaround because CsvFileFormat$create defaults the delimiter to "," if (format == "tsv") { - options <- FileWriteOptions$create( - format, - column_names = names(output_schema), - delimiter = "\t", - ... + options <- do.call( + FileWriteOptions$create, + c(list(format = format, column_names = names(output_schema), delimiter = "\t"), file_write_dots) ) } else { - options <- FileWriteOptions$create( - format, - column_names = names(output_schema), - ... + options <- do.call( + FileWriteOptions$create, + c(list(format = format, column_names = names(output_schema)), file_write_dots) ) } @@ -267,12 +270,12 @@ write_dataset <- function( #' #' @seealso [write_dataset()] #' @export -write_delim_dataset <- function( - dataset, +write_delim_dataset <- function(dataset, path, partitioning = dplyr::group_vars(dataset), basename_template = "part-{i}.txt", hive_style = TRUE, + url_encode_hive_values = TRUE, existing_data_behavior = c("overwrite", "error", "delete_matching"), max_partitions = 1024L, max_open_files = 900L, @@ -284,8 +287,7 @@ write_delim_dataset <- function( delim = ",", na = "", eol = "\n", - quote = c("needed", "all", "none") -) { + quote = c("needed", "all", "none")) { if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) { max_rows_per_group <- max_rows_per_file } @@ -301,6 +303,7 @@ write_delim_dataset <- function( partitioning = partitioning, basename_template = basename_template, hive_style = hive_style, + url_encode_hive_values = url_encode_hive_values, existing_data_behavior = existing_data_behavior, max_partitions = max_partitions, max_open_files = max_open_files, @@ -318,24 +321,24 @@ write_delim_dataset <- function( #' @rdname write_delim_dataset #' @export -write_csv_dataset <- function( - dataset, - path, - partitioning = dplyr::group_vars(dataset), - basename_template = "part-{i}.csv", - hive_style = TRUE, - existing_data_behavior = c("overwrite", "error", "delete_matching"), - max_partitions = 1024L, - max_open_files = 900L, - max_rows_per_file = 0L, - min_rows_per_group = 0L, - max_rows_per_group = bitwShiftL(1, 20), - col_names = TRUE, - batch_size = 1024L, - delim = ",", - na = "", - eol = "\n", - quote = c("needed", "all", "none") +write_csv_dataset <- function(dataset, + path, + partitioning = dplyr::group_vars(dataset), + basename_template = "part-{i}.csv", + hive_style = TRUE, + url_encode_hive_values = TRUE, + existing_data_behavior = c("overwrite", "error", "delete_matching"), + max_partitions = 1024L, + max_open_files = 900L, + max_rows_per_file = 0L, + min_rows_per_group = 0L, + max_rows_per_group = bitwShiftL(1, 20), + col_names = TRUE, + batch_size = 1024L, + delim = ",", + na = "", + eol = "\n", + quote = c("needed", "all", "none")) { ) { if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) { max_rows_per_group <- max_rows_per_file @@ -352,6 +355,7 @@ write_csv_dataset <- function( partitioning = partitioning, basename_template = basename_template, hive_style = hive_style, + url_encode_hive_values = url_encode_hive_values, existing_data_behavior = existing_data_behavior, max_partitions = max_partitions, max_open_files = max_open_files, @@ -369,12 +373,12 @@ write_csv_dataset <- function( #' @rdname write_delim_dataset #' @export -write_tsv_dataset <- function( - dataset, +write_tsv_dataset <- function(dataset, path, partitioning = dplyr::group_vars(dataset), basename_template = "part-{i}.tsv", hive_style = TRUE, + url_encode_hive_values = TRUE, existing_data_behavior = c("overwrite", "error", "delete_matching"), max_partitions = 1024L, max_open_files = 900L, @@ -385,7 +389,7 @@ write_tsv_dataset <- function( batch_size = 1024L, na = "", eol = "\n", - quote = c("needed", "all", "none") + quote = c("needed", "all", "none")) { ) { if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) { max_rows_per_group <- max_rows_per_file @@ -402,6 +406,7 @@ write_tsv_dataset <- function( partitioning = partitioning, basename_template = basename_template, hive_style = hive_style, + url_encode_hive_values = url_encode_hive_values, existing_data_behavior = existing_data_behavior, max_partitions = max_partitions, max_open_files = max_open_files, diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index d675e4950d2..cfb1b2011a3 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -1015,3 +1015,47 @@ test_that("Dataset write wrappers can write flat files using readr::write_csv() c("true", "false", "NOVALUE", "true", "false", "true", "false", "NOVALUE", "true", "false") ) }) + +test_that("write_dataset respects url_encode_hive_values parameter", { + skip_if_not_available("parquet") + + # Create test data with special characters that would be URL encoded + # Note: Forward slash (/) cannot be used in directory names on Unix systems, + # so we test with other special characters that are valid in filenames + test_df <- data.frame( + value = c(1, 2, 3, 4), + category = c("test space", "test+plus", "test%percent", "normal"), + stringsAsFactors = FALSE + ) + + # Test with URL encoding enabled (default) + dst_dir_encoded <- make_temp_dir() + write_dataset(test_df, dst_dir_encoded, partitioning = "category", hive_style = TRUE, url_encode_hive_values = TRUE) + + # Test with URL encoding disabled + dst_dir_not_encoded <- make_temp_dir() + write_dataset(test_df, dst_dir_not_encoded, partitioning = "category", hive_style = TRUE, url_encode_hive_values = FALSE) + + # Check that the directories are different (encoded vs not encoded) + encoded_dirs <- list.dirs(dst_dir_encoded, recursive = FALSE) + not_encoded_dirs <- list.dirs(dst_dir_not_encoded, recursive = FALSE) + + # The encoded version should have URL-encoded directory names + expect_true(any(grepl("test%20space", encoded_dirs))) + expect_true(any(grepl("test%2Bplus", encoded_dirs))) + expect_true(any(grepl("test%25percent", encoded_dirs))) + + # The non-encoded version should have raw directory names + expect_true(any(grepl("test space", not_encoded_dirs, fixed = TRUE))) + expect_true(any(grepl("test+plus", not_encoded_dirs, fixed = TRUE))) + expect_true(any(grepl("test%percent", not_encoded_dirs, fixed = TRUE))) + + # Both datasets should be readable and equivalent when loaded + ds_encoded <- open_dataset(dst_dir_encoded) + ds_not_encoded <- open_dataset(dst_dir_not_encoded) + + expect_equal( + arrange(ds_encoded %>% collect(), value), + arrange(ds_not_encoded %>% collect(), value) + ) +})