Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions kubeflow/trainer/backends/kubernetes/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import uuid

from kubeflow_trainer_api import models
from kubernetes import client, config, watch
from kubernetes import client, config

import kubeflow.common.constants as common_constants
from kubeflow.common.types import KubernetesBackendConfig
Expand Down Expand Up @@ -606,16 +606,21 @@ def _read_pod_logs(self, pod_name: str, container_name: str, follow: bool) -> It
"""Read logs from a pod container."""
try:
if follow:
log_stream = watch.Watch().stream(
self.core_api.read_namespaced_pod_log,
# Stream logs using response.stream() after calling read_namespaced_pod_log
# with _preload_content=False to get a streaming response.
response = self.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self.namespace,
container=container_name,
follow=True,
_preload_content=False,
)

# Stream logs incrementally.
yield from log_stream # type: ignore
# Stream logs incrementally using response.stream().
for line in response.stream():
if line:
# Decode bytes to string and yield each line.
yield line.decode("utf-8").rstrip("\n")
Comment on lines +619 to +623
Comment on lines +619 to +623
else:
logs = self.core_api.read_namespaced_pod_log(
name=pod_name,
Expand Down
23 changes: 22 additions & 1 deletion kubeflow/trainer/backends/kubernetes/backend_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,20 @@ def mock_read_namespaced_pod_log(*args, **kwargs):
"""Simulate log retrieval from a pod."""
if kwargs.get("namespace") == FAIL_LOGS:
raise Exception("Failed to read logs")

# Handle streaming case: when _preload_content=False and follow=True
if kwargs.get("_preload_content") is False and kwargs.get("follow") is True:
# Return a mock response object with a stream() method
mock_response = Mock()

def mock_stream():
"""Mock stream generator that yields log lines as bytes."""
yield b"test log content"

mock_response.stream = mock_stream
return mock_response
Comment on lines +456 to +466

# Non-streaming case: return plain text logs
return "test log content"


Expand Down Expand Up @@ -1418,14 +1432,21 @@ def test_list_jobs(kubernetes_backend, test_case):
config={"name": BASIC_TRAIN_JOB_NAME, "namespace": FAIL_LOGS},
expected_error=RuntimeError,
),
TestCase(
name="valid flow with follow=True for streaming logs",
expected_status=SUCCESS,
config={"name": BASIC_TRAIN_JOB_NAME, "follow": True},
expected_output=["test log content"],
),
Comment on lines +1435 to +1440
],
)
def test_get_job_logs(kubernetes_backend, test_case):
"""Test KubernetesBackend.get_job_logs with basic success path."""
print("Executing test:", test_case.name)
try:
kubernetes_backend.namespace = test_case.config.get("namespace", DEFAULT_NAMESPACE)
logs = kubernetes_backend.get_job_logs(test_case.config.get("name"))
follow = test_case.config.get("follow", False)
logs = kubernetes_backend.get_job_logs(test_case.config.get("name"), follow=follow)
# Convert iterator to list for comparison.
logs_list = list(logs)
assert test_case.expected_status == SUCCESS
Expand Down
Loading