|  | 
| 50 | 50 | dist_dir = current_dir + "/dist/" | 
| 51 | 51 | 
 | 
| 52 | 52 | 
 | 
|  | 53 | +def is_safe_archive_path(path): | 
|  | 54 | +    # Check for absolute paths (both Unix and Windows style) | 
|  | 55 | +    if path.startswith('/') or (len(path) > 1 and path[1] == ':' and path[2] in '\\/'): | 
|  | 56 | +        raise ValueError(f"Absolute path not allowed: {path}") | 
|  | 57 | + | 
|  | 58 | +    # Normalize the path to handle any path separators | 
|  | 59 | +    normalized_path = os.path.normpath(path) | 
|  | 60 | + | 
|  | 61 | +    # Check for directory traversal attempts using normalized path | 
|  | 62 | +    if ".." in normalized_path.split(os.sep): | 
|  | 63 | +        raise ValueError(f"Directory traversal not allowed: {path}") | 
|  | 64 | + | 
|  | 65 | +    # Additional check for paths that would escape the target directory | 
|  | 66 | +    if normalized_path.startswith(".."): | 
|  | 67 | +        raise ValueError(f"Path would escape target directory: {path}") | 
|  | 68 | + | 
|  | 69 | +    # Check for any remaining directory traversal patterns in the original path | 
|  | 70 | +    # This catches cases that might not be normalized properly | 
|  | 71 | +    path_parts = path.replace('\\', '/').split('/') | 
|  | 72 | +    if '..' in path_parts: | 
|  | 73 | +        raise ValueError(f"Directory traversal not allowed: {path}") | 
|  | 74 | + | 
|  | 75 | +    return True | 
|  | 76 | + | 
|  | 77 | + | 
|  | 78 | +def safe_tar_extract(tar_file, destination): | 
|  | 79 | +    # Validate all paths before extraction | 
|  | 80 | +    for member in tar_file.getmembers(): | 
|  | 81 | +        is_safe_archive_path(member.name) | 
|  | 82 | + | 
|  | 83 | +    # If all paths are safe, proceed with extraction | 
|  | 84 | +    tar_file.extractall(destination, filter="tar") | 
|  | 85 | + | 
|  | 86 | + | 
|  | 87 | +def safe_zip_extract(zip_file, destination): | 
|  | 88 | +    # Validate all paths before extraction | 
|  | 89 | +    for name in zip_file.namelist(): | 
|  | 90 | +        is_safe_archive_path(name) | 
|  | 91 | + | 
|  | 92 | +    # If all paths are safe, proceed with extraction | 
|  | 93 | +    zip_file.extractall(destination) | 
|  | 94 | + | 
|  | 95 | + | 
| 53 | 96 | def sha256sum(filename, blocksize=65536): | 
| 54 | 97 |     hash = hashlib.sha256() | 
| 55 | 98 |     with open(filename, "rb") as f: | 
| @@ -212,6 +255,10 @@ def unpack(filename, destination, force_extract, checksum):  # noqa: C901 | 
| 212 | 255 |         print("File corrupted or incomplete!") | 
| 213 | 256 |         cfile = None | 
| 214 | 257 |         file_is_corrupted = True | 
|  | 258 | +    except ValueError as e: | 
|  | 259 | +        print(f"Security validation failed: {e}") | 
|  | 260 | +        cfile = None | 
|  | 261 | +        file_is_corrupted = True | 
| 215 | 262 | 
 | 
| 216 | 263 |     if file_is_corrupted: | 
| 217 | 264 |         corrupted_filename = filename + ".corrupted" | 
| @@ -243,15 +290,15 @@ def unpack(filename, destination, force_extract, checksum):  # noqa: C901 | 
| 243 | 290 |     if filename.endswith("tar.gz"): | 
| 244 | 291 |         if not cfile: | 
| 245 | 292 |             cfile = tarfile.open(filename, "r:gz") | 
| 246 |  | -        cfile.extractall(destination, filter="tar") | 
|  | 293 | +        safe_tar_extract(cfile, destination) | 
| 247 | 294 |     elif filename.endswith("tar.xz"): | 
| 248 | 295 |         if not cfile: | 
| 249 | 296 |             cfile = tarfile.open(filename, "r:xz") | 
| 250 |  | -        cfile.extractall(destination, filter="tar") | 
|  | 297 | +        safe_tar_extract(cfile, destination) | 
| 251 | 298 |     elif filename.endswith("zip"): | 
| 252 | 299 |         if not cfile: | 
| 253 | 300 |             cfile = zipfile.ZipFile(filename) | 
| 254 |  | -        cfile.extractall(destination) | 
|  | 301 | +        safe_zip_extract(cfile, destination) | 
| 255 | 302 |     else: | 
| 256 | 303 |         raise NotImplementedError("Unsupported archive type") | 
| 257 | 304 | 
 | 
|  | 
0 commit comments