diff --git a/metaflow/runner/utils.py b/metaflow/runner/utils.py index abded521375..c01d2c81e22 100644 --- a/metaflow/runner/utils.py +++ b/metaflow/runner/utils.py @@ -4,6 +4,7 @@ import asyncio import tempfile import select +import fcntl from contextlib import contextmanager from subprocess import CalledProcessError from typing import Any, Dict, TYPE_CHECKING, ContextManager, Tuple @@ -129,6 +130,21 @@ def read_from_fifo_when_ready( data = os.read(fifo_fd, 8192) if data: content += data + # We got data! Now switch to blocking mode for guaranteed complete reads. + # In blocking mode, read() won't return 0 until writer closes AND all + # kernel buffers are drained - this is POSIX guaranteed. + flags = fcntl.fcntl(fifo_fd, fcntl.F_GETFL) + fcntl.fcntl(fifo_fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK) + + # Now do blocking reads until true EOF + while True: + chunk = os.read(fifo_fd, 8192) + if not chunk: + # True EOF - all data drained + break + content += chunk + # All data read, exit main loop + break else: if len(events): # We read an EOF -- consider the file done @@ -136,23 +152,10 @@ def read_from_fifo_when_ready( else: # We had no events (just a timeout) and the read didn't return # an exception so the file is still open; we continue waiting for data - # On some systems (notably MacOS), even after the file is closed on the - # other end, we may not get a BlockingIOError or proper EOF signal. - # Instead of using an arbitrary timeout, check if the writer process - # has actually exited. If it has and we have content, we can safely - # assume EOF. If the process is still running, continue waiting. - if content and check_process_exited(command_obj): - # Process has exited and we got an empty read with no poll events. - # This is EOF - break out to return the content we've collected. - break - # else: process is still running, continue waiting for more data + pass except BlockingIOError: - has_blocking_error = True - if content: - # The file was closed - break - # else, if we have no content, we continue waiting for the file to be open - # and written to. + # File not ready yet, continue waiting + pass if not content and check_process_exited(command_obj): raise CalledProcessError(command_obj.process.returncode, command_obj.command)