Skip to content

Commit 53ab4a8

Browse files
npowNissan Pow
andauthored
[S3] Log original error code for transient errors (Netflix#2616)
Currently, it's difficult to tell why transient S3 operations are retrying. Added the original error code to the logs. Example: ``` Transient S3 failure (attempt #1) -- total success: 1, last attempt 1/2 -- remaining: 1 (ParamValidationError) Transient S3 failure (attempt #2) -- total success: 1, last attempt 0/1 -- remaining: 1 (ParamValidationError) Upload result: [] ``` --------- Co-authored-by: Nissan Pow <[email protected]>
1 parent 5be555e commit 53ab4a8

File tree

4 files changed

+238
-73
lines changed

4 files changed

+238
-73
lines changed

metaflow/metaflow_config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@
109109
# top-level retries)
110110
S3_TRANSIENT_RETRY_COUNT = from_conf("S3_TRANSIENT_RETRY_COUNT", 20)
111111

112+
# Whether to log transient retry messages to stdout
113+
S3_LOG_TRANSIENT_RETRIES = from_conf("S3_LOG_TRANSIENT_RETRIES", False)
114+
112115
# S3 retry configuration used in the aws client
113116
# Use the adaptive retry strategy by default
114117
S3_CLIENT_RETRY_CONFIG = from_conf(

metaflow/plugins/datatools/s3/s3.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
DATATOOLS_S3ROOT,
1919
S3_RETRY_COUNT,
2020
S3_TRANSIENT_RETRY_COUNT,
21+
S3_LOG_TRANSIENT_RETRIES,
2122
S3_SERVER_SIDE_ENCRYPTION,
2223
S3_WORKER_COUNT,
2324
TEMPDIR,
@@ -1760,17 +1761,35 @@ def try_s3_op(last_ok_count, pending_retries, out_lines, inject_failures):
17601761
# due to a transient failure so we try again.
17611762
transient_retry_count += 1
17621763
total_ok_count += last_ok_count
1763-
print(
1764-
"Transient S3 failure (attempt #%d) -- total success: %d, "
1765-
"last attempt %d/%d -- remaining: %d"
1766-
% (
1767-
transient_retry_count,
1768-
total_ok_count,
1769-
last_ok_count,
1770-
last_ok_count + last_retry_count,
1771-
len(pending_retries),
1764+
1765+
if S3_LOG_TRANSIENT_RETRIES:
1766+
# Extract transient error type from pending retry lines
1767+
error_info = ""
1768+
if pending_retries:
1769+
try:
1770+
# Parse the first line to get transient error type
1771+
first_retry = json.loads(
1772+
pending_retries[0].decode("utf-8").strip()
1773+
)
1774+
if "transient_error_type" in first_retry:
1775+
error_info = (
1776+
" (%s)" % first_retry["transient_error_type"]
1777+
)
1778+
except (json.JSONDecodeError, IndexError, KeyError):
1779+
pass
1780+
1781+
print(
1782+
"Transient S3 failure (attempt #%d) -- total success: %d, "
1783+
"last attempt %d/%d -- remaining: %d%s"
1784+
% (
1785+
transient_retry_count,
1786+
total_ok_count,
1787+
last_ok_count,
1788+
last_ok_count + last_retry_count,
1789+
len(pending_retries),
1790+
error_info,
1791+
)
17721792
)
1773-
)
17741793
if inject_failures == 0:
17751794
# Don't sleep when we are "faking" the failures
17761795
self._jitter_sleep(transient_retry_count)

metaflow/plugins/datatools/s3/s3op.py

Lines changed: 90 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,9 @@ def op_info(url):
285285
"%d %d\n" % (idx, -ERROR_OUT_OF_DISK_SPACE)
286286
)
287287
else:
288-
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
288+
result_file.write(
289+
"%d %d %s\n" % (idx, -ERROR_TRANSIENT, "OSError")
290+
)
289291
result_file.flush()
290292
continue
291293
except MetaflowException:
@@ -297,7 +299,9 @@ def op_info(url):
297299
tmp.close()
298300
os.unlink(tmp.name)
299301
# assume anything else is transient
300-
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
302+
result_file.write(
303+
"%d %d %s\n" % (idx, -ERROR_TRANSIENT, type(e).__name__)
304+
)
301305
result_file.flush()
302306
continue
303307
# If we need the metadata, get it and write it out
@@ -368,7 +372,9 @@ def op_info(url):
368372
raise
369373
except (SSLError, Exception) as e:
370374
# assume anything else is transient
371-
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
375+
result_file.write(
376+
"%d %d %s\n" % (idx, -ERROR_TRANSIENT, type(e).__name__)
377+
)
372378
result_file.flush()
373379
continue
374380
except:
@@ -399,20 +405,21 @@ def handle_client_error(err, idx, result_file):
399405
raise err
400406

401407
error_code = normalize_client_error(err)
408+
original_error_code = err.response["Error"]["Code"]
409+
402410
if error_code == 404:
403411
result_file.write("%d %d\n" % (idx, -ERROR_URL_NOT_FOUND))
404412
result_file.flush()
405413
elif error_code == 403:
406414
result_file.write("%d %d\n" % (idx, -ERROR_URL_ACCESS_DENIED))
407415
result_file.flush()
408416
elif error_code == 503:
409-
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
417+
result_file.write("%d %d %s\n" % (idx, -ERROR_TRANSIENT, original_error_code))
410418
result_file.flush()
411419
else:
412420
# optimistically assume it is a transient error
413-
result_file.write("%d %d\n" % (idx, -ERROR_TRANSIENT))
421+
result_file.write("%d %d %s\n" % (idx, -ERROR_TRANSIENT, original_error_code))
414422
result_file.flush()
415-
# TODO specific error message for out of disk space
416423

417424

418425
def start_workers(mode, urls, num_workers, inject_failure, s3config):
@@ -424,6 +431,7 @@ def start_workers(mode, urls, num_workers, inject_failure, s3config):
424431
random.seed()
425432

426433
sz_results = []
434+
transient_error_type = None
427435
# 1. push sources and destinations to the queue
428436
# We only push if we don't inject a failure; otherwise, we already set the sz_results
429437
# appropriately with the result of the injected failure.
@@ -478,13 +486,19 @@ def start_workers(mode, urls, num_workers, inject_failure, s3config):
478486
# Read the output file if all went well
479487
with open(out_path, "r") as out_file:
480488
for line in out_file:
481-
line_split = line.split(" ")
482-
sz_results[int(line_split[0])] = int(line_split[1])
489+
line_split = line.split(" ", 2)
490+
idx = int(line_split[0])
491+
size = int(line_split[1])
492+
sz_results[idx] = size
493+
494+
# For transient errors, store the transient error type (should be the same for all)
495+
if size == -ERROR_TRANSIENT and len(line_split) > 2:
496+
transient_error_type = line_split[2].strip()
483497
else:
484498
# Put this process back in the processes to check
485499
new_procs[proc] = out_path
486500
procs = new_procs
487-
return sz_results
501+
return sz_results, transient_error_type
488502

489503

490504
def process_urls(mode, urls, verbose, inject_failure, num_workers, s3config):
@@ -493,7 +507,9 @@ def process_urls(mode, urls, verbose, inject_failure, num_workers, s3config):
493507
print("%sing %d files.." % (mode.capitalize(), len(urls)), file=sys.stderr)
494508

495509
start = time.time()
496-
sz_results = start_workers(mode, urls, num_workers, inject_failure, s3config)
510+
sz_results, transient_error_type = start_workers(
511+
mode, urls, num_workers, inject_failure, s3config
512+
)
497513
end = time.time()
498514

499515
if verbose:
@@ -510,7 +526,7 @@ def process_urls(mode, urls, verbose, inject_failure, num_workers, s3config):
510526
),
511527
file=sys.stderr,
512528
)
513-
return sz_results
529+
return sz_results, transient_error_type
514530

515531

516532
# Utility functions
@@ -719,9 +735,21 @@ def generate_local_path(url, range="whole", suffix=None):
719735
quoted = url_quote(url)
720736
fname = quoted.split(b"/")[-1].replace(b".", b"_").replace(b"-", b"_")
721737
sha = sha1(quoted).hexdigest()
738+
739+
# Truncate fname to ensure the final filename doesn't exceed filesystem limits.
740+
# Most filesystems have a 255 character limit. The structure is:
741+
# <40-char-sha>-<fname>-<range>[-<suffix>]
742+
# We need to leave room for: sha (40) + hyphens (2-3) + range (~10) + suffix (~10)
743+
# This leaves roughly 190 characters for fname. We use 150 to be safe.
744+
fname_decoded = fname.decode("utf-8")
745+
max_fname_len = 150
746+
if len(fname_decoded) > max_fname_len:
747+
# Truncate and add an ellipsis to indicate truncation
748+
fname_decoded = fname_decoded[:max_fname_len] + "..."
749+
722750
if suffix:
723-
return "-".join((sha, fname.decode("utf-8"), range, suffix))
724-
return "-".join((sha, fname.decode("utf-8"), range))
751+
return "-".join((sha, fname_decoded, range, suffix))
752+
return "-".join((sha, fname_decoded, range))
725753

726754

727755
def parallel_op(op, lst, num_workers):
@@ -858,7 +886,7 @@ def lst(
858886
urllist = []
859887
to_iterate, _ = _populate_prefixes(prefixes, inputs)
860888
for _, prefix, url, _ in to_iterate:
861-
src = urlparse(url)
889+
src = urlparse(url, allow_fragments=False)
862890
url = S3Url(
863891
url=url,
864892
bucket=src.netloc,
@@ -964,7 +992,7 @@ def _files():
964992
yield input_line_idx, local, url, content_type, metadata, encryption
965993

966994
def _make_url(idx, local, user_url, content_type, metadata, encryption):
967-
src = urlparse(user_url)
995+
src = urlparse(user_url, allow_fragments=False)
968996
url = S3Url(
969997
url=user_url,
970998
bucket=src.netloc,
@@ -992,7 +1020,7 @@ def _make_url(idx, local, user_url, content_type, metadata, encryption):
9921020
ul_op = "upload"
9931021
if not overwrite:
9941022
ul_op = "info_upload"
995-
sz_results = process_urls(
1023+
sz_results, transient_error_type = process_urls(
9961024
ul_op, urls, verbose, inject_failure, num_workers, s3config
9971025
)
9981026
retry_lines = []
@@ -1010,19 +1038,17 @@ def _make_url(idx, local, user_url, content_type, metadata, encryption):
10101038
elif listing and sz == 0:
10111039
out_lines.append(format_result_line(url.idx, url.url) + "\n")
10121040
elif sz == -ERROR_TRANSIENT:
1013-
retry_lines.append(
1014-
json.dumps(
1015-
{
1016-
"idx": url.idx,
1017-
"url": url.url,
1018-
"local": url.local,
1019-
"content_type": url.content_type,
1020-
"metadata": url.metadata,
1021-
"encryption": url.encryption,
1022-
}
1023-
)
1024-
+ "\n"
1025-
)
1041+
retry_data = {
1042+
"idx": url.idx,
1043+
"url": url.url,
1044+
"local": url.local,
1045+
"content_type": url.content_type,
1046+
"metadata": url.metadata,
1047+
"encryption": url.encryption,
1048+
}
1049+
if transient_error_type:
1050+
retry_data["transient_error_type"] = transient_error_type
1051+
retry_lines.append(json.dumps(retry_data) + "\n")
10261052
# Output something to get a total count the first time around
10271053
if not is_transient_retry:
10281054
out_lines.append("%d %s\n" % (url.idx, TRANSIENT_RETRY_LINE_CONTENT))
@@ -1060,22 +1086,21 @@ def _populate_prefixes(prefixes, inputs):
10601086
for idx, l in enumerate(f, start=len(prefixes)):
10611087
s = l.split(b" ")
10621088
if len(s) == 1:
1089+
# User input format: <url>
10631090
url = url_unquote(s[0].strip())
10641091
prefixes.append((idx, url, url, None))
10651092
elif len(s) == 2:
1093+
# User input format: <url> <range>
10661094
url = url_unquote(s[0].strip())
10671095
prefixes.append((idx, url, url, url_unquote(s[1].strip())))
1068-
else:
1096+
elif len(s) in (4, 5):
1097+
# Retry format: <idx> <prefix> <url> <range> [<transient_error_type>]
1098+
# The transient_error_type (5th field) is optional and only used for logging.
1099+
# Lines with other field counts (e.g., 3) are silently ignored as invalid.
10691100
is_transient_retry = True
1070-
if len(s) == 3:
1071-
prefix = url = url_unquote(s[1].strip())
1072-
range_info = url_unquote(s[2].strip())
1073-
else:
1074-
# Special case when we have both prefix and URL -- this is
1075-
# used in recursive gets for example
1076-
prefix = url_unquote(s[1].strip())
1077-
url = url_unquote(s[2].strip())
1078-
range_info = url_unquote(s[3].strip())
1101+
prefix = url_unquote(s[1].strip())
1102+
url = url_unquote(s[2].strip())
1103+
range_info = url_unquote(s[3].strip())
10791104
if range_info == "<norange>":
10801105
range_info = None
10811106
prefixes.append(
@@ -1139,7 +1164,7 @@ def get(
11391164
urllist = []
11401165
to_iterate, is_transient_retry = _populate_prefixes(prefixes, inputs)
11411166
for idx, prefix, url, r in to_iterate:
1142-
src = urlparse(url)
1167+
src = urlparse(url, allow_fragments=False)
11431168
url = S3Url(
11441169
url=url,
11451170
bucket=src.netloc,
@@ -1186,7 +1211,7 @@ def get(
11861211

11871212
# exclude the non-existent files from loading
11881213
to_load = [url for url, size in urls if size is not None]
1189-
sz_results = process_urls(
1214+
sz_results, transient_error_type = process_urls(
11901215
dl_op, to_load, verbose, inject_failure, num_workers, s3config
11911216
)
11921217
# We check if there is any access denied
@@ -1222,21 +1247,19 @@ def get(
12221247
break
12231248
out_lines.append(format_result_line(url.idx, url.url) + "\n")
12241249
elif sz == -ERROR_TRANSIENT:
1225-
retry_lines.append(
1226-
" ".join(
1227-
[
1228-
str(url.idx),
1229-
url_quote(url.prefix).decode(encoding="utf-8"),
1230-
url_quote(url.url).decode(encoding="utf-8"),
1231-
(
1232-
url_quote(url.range).decode(encoding="utf-8")
1233-
if url.range
1234-
else "<norange>"
1235-
),
1236-
]
1237-
)
1238-
+ "\n"
1239-
)
1250+
retry_line_parts = [
1251+
str(url.idx),
1252+
url_quote(url.prefix).decode(encoding="utf-8"),
1253+
url_quote(url.url).decode(encoding="utf-8"),
1254+
(
1255+
url_quote(url.range).decode(encoding="utf-8")
1256+
if url.range
1257+
else "<norange>"
1258+
),
1259+
]
1260+
if transient_error_type:
1261+
retry_line_parts.append(transient_error_type)
1262+
retry_lines.append(" ".join(retry_line_parts) + "\n")
12401263
# First time around, we output something to indicate the total length
12411264
if not is_transient_retry:
12421265
out_lines.append("%d %s\n" % (url.idx, TRANSIENT_RETRY_LINE_CONTENT))
@@ -1288,7 +1311,7 @@ def info(
12881311
urllist = []
12891312
to_iterate, is_transient_retry = _populate_prefixes(prefixes, inputs)
12901313
for idx, prefix, url, _ in to_iterate:
1291-
src = urlparse(url)
1314+
src = urlparse(url, allow_fragments=False)
12921315
url = S3Url(
12931316
url=url,
12941317
bucket=src.netloc,
@@ -1302,7 +1325,7 @@ def info(
13021325
exit(ERROR_INVALID_URL, url)
13031326
urllist.append(url)
13041327

1305-
sz_results = process_urls(
1328+
sz_results, transient_error_type = process_urls(
13061329
"info", urllist, verbose, inject_failure, num_workers, s3config
13071330
)
13081331

@@ -1315,10 +1338,15 @@ def info(
13151338
format_result_line(url.idx, url.prefix, url.url, url.local) + "\n"
13161339
)
13171340
else:
1318-
retry_lines.append(
1319-
"%d %s <norange>\n"
1320-
% (url.idx, url_quote(url.url).decode(encoding="utf-8"))
1321-
)
1341+
retry_line_parts = [
1342+
str(url.idx),
1343+
url_quote(url.prefix).decode(encoding="utf-8"),
1344+
url_quote(url.url).decode(encoding="utf-8"),
1345+
"<norange>",
1346+
]
1347+
if transient_error_type:
1348+
retry_line_parts.append(transient_error_type)
1349+
retry_lines.append(" ".join(retry_line_parts) + "\n")
13221350
if not is_transient_retry:
13231351
out_lines.append("%d %s\n" % (url.idx, TRANSIENT_RETRY_LINE_CONTENT))
13241352

0 commit comments

Comments
 (0)