Skip to content

Commit 49d53af

Browse files
committed
feat: synchronize agent's stdout/stderr logging
How it works: - The agent vm emits separator text before and after the text. - The main vm send the action and waits for the separator text line from the agent vm. - After the result have been read by the main vm it expects another separator text line. After consuming the line it redirects output to the agent log. Edge cases: - Timeouts and Agent VM crashes: - The main vm's stdout and stderr tasks will exit due to the closed stream. The main vm is supposed to dispose the failed agent.
1 parent 34ccc66 commit 49d53af

File tree

3 files changed

+190
-39
lines changed

3 files changed

+190
-39
lines changed
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation. Oracle designates this
8+
* particular file as subject to the "Classpath" exception as provided
9+
* by Oracle in the LICENSE file that accompanied this code.
10+
*
11+
* This code is distributed in the hope that it will be useful, but WITHOUT
12+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14+
* version 2 for more details (a copy is included in the LICENSE file that
15+
* accompanied this code).
16+
*
17+
* You should have received a copy of the GNU General Public License version
18+
* 2 along with this work; if not, write to the Free Software Foundation,
19+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20+
*
21+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22+
* or visit www.oracle.com if you need additional information or have any
23+
* questions.
24+
*/
25+
26+
package com.sun.javatest.regtest.agent;
27+
28+
import java.io.BufferedReader;
29+
import java.io.IOException;
30+
import java.io.InputStreamReader;
31+
import java.util.concurrent.ExecutionException;
32+
import java.util.concurrent.ExecutorService;
33+
import java.util.concurrent.Executors;
34+
import java.util.concurrent.Future;
35+
import java.util.concurrent.ThreadFactory;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.TimeoutException;
38+
import java.util.function.BiConsumer;
39+
40+
/**
41+
* Handles stdout/stderr process output from the agent.
42+
*/
43+
public class AgentProcessLogger {
44+
45+
/**
46+
* Constructs a thread pool to handle agent process output
47+
* and creates stdout and stderr readers
48+
*
49+
* @param p agent process
50+
*/
51+
public AgentProcessLogger(Process p) {
52+
executorService = Executors.newFixedThreadPool(2, new ThreadFactory() {
53+
@Override
54+
public Thread newThread(Runnable runnable) {
55+
Thread th = new Thread(runnable);
56+
th.setDaemon(true);
57+
return th;
58+
}
59+
});
60+
stdOut = new BufferedReader(new InputStreamReader(p.getInputStream()));
61+
stdErr = new BufferedReader(new InputStreamReader(p.getErrorStream()));
62+
}
63+
64+
/**
65+
* Starts logging output and error streams to the specified consumer
66+
*
67+
* @param logConsumer log consumer, has two parameters - stream name and
68+
* the log line
69+
*/
70+
public void startLogging(BiConsumer<String, String> logConsumer) {
71+
if (inputDone != null || errorDone != null) {
72+
throw new RuntimeException("call stopLogging first");
73+
}
74+
inputDone = executorService.submit(() -> captureLog("stdout", stdOut, logConsumer));
75+
errorDone = executorService.submit(() -> captureLog("stderr", stdErr, logConsumer));
76+
}
77+
78+
/**
79+
* Waits for the logging tasks to finish
80+
*
81+
* @throws ExecutionException
82+
* @throws InterruptedException
83+
* @throws TimeoutException logging task failed to stop within 60 seconds
84+
*/
85+
public void stopLogging() throws ExecutionException, InterruptedException, TimeoutException {
86+
inputDone.get(60, TimeUnit.SECONDS);
87+
errorDone.get(60, TimeUnit.SECONDS);
88+
inputDone = null;
89+
errorDone = null;
90+
}
91+
92+
/**
93+
* Wait for logging tasks to finish and shutdown the thread pool
94+
*/
95+
public void shutdown() {
96+
try {
97+
stopLogging();
98+
} catch (ExecutionException | InterruptedException | TimeoutException ex) {
99+
// ignore exception, the process is terminating
100+
}
101+
executorService.shutdown();
102+
}
103+
104+
/**
105+
* Forward log lines to the consumer, stop forwarding on the separator
106+
* line
107+
*
108+
* @param streamName name of the stream
109+
* @param reader process's stream reader
110+
*/
111+
private Void captureLog(String streamName, BufferedReader reader, BiConsumer<String, String> consumer) {
112+
try {
113+
String line = null;
114+
while ((line = reader.readLine()) != null) {
115+
if (AgentServer.PROCESS_OUTPUT_SEPARATOR.equals(line)) {
116+
break;
117+
}
118+
consumer.accept(streamName, line);
119+
}
120+
} catch (IOException ex) {
121+
// ignore the exception, the reader might be closed
122+
}
123+
return null;
124+
}
125+
126+
private final ExecutorService executorService;
127+
private final BufferedReader stdOut;
128+
private final BufferedReader stdErr;
129+
private Future<Void> inputDone;
130+
private Future<Void> errorDone;
131+
}

src/share/classes/com/sun/javatest/regtest/agent/AgentServer.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
import java.io.DataInputStream;
3131
import java.io.DataOutputStream;
3232
import java.io.File;
33+
import java.io.FileDescriptor;
3334
import java.io.FileWriter;
3435
import java.io.IOException;
3536
import java.io.OutputStream;
37+
import java.io.OutputStreamWriter;
3638
import java.io.PrintStream;
3739
import java.io.PrintWriter;
3840
import java.io.Writer;
@@ -93,6 +95,9 @@ public static void main(String... args) {
9395
public static final byte KEEPALIVE = 5;
9496
public static final byte CLOSE = 6;
9597

98+
public static final String PROCESS_OUTPUT_SEPARATOR =
99+
"------ This line is the stdout/stderr output separator ------";
100+
96101
/**
97102
* Send KEEPALIVE bytes periodically to a stream.
98103
* The bytes are written every {@code WRITE_TIMEOUT} milliseconds.
@@ -236,6 +241,7 @@ public void run() throws IOException {
236241
try {
237242
int op;
238243
while ((op = in.read()) != -1) {
244+
writeProcessOutputSeparator();
239245
switch (op) {
240246
case DO_COMPILE:
241247
doCompile();
@@ -252,6 +258,8 @@ public void run() throws IOException {
252258
throw new Error("Agent.Server: unexpected op: " + op);
253259
}
254260
out.flush();
261+
// signal end of section output for the log writer
262+
writeProcessOutputSeparator();
255263
}
256264
} finally {
257265
keepAlive.finished();
@@ -260,6 +268,20 @@ public void run() throws IOException {
260268
}
261269
}
262270

271+
private void writeProcessOutputSeparator() {
272+
try {
273+
processStdOut.write(PROCESS_OUTPUT_SEPARATOR);
274+
processStdOut.write(System.lineSeparator());
275+
processStdOut.flush();
276+
processStdErr.write(PROCESS_OUTPUT_SEPARATOR);
277+
processStdErr.write(System.lineSeparator());
278+
processStdErr.flush();
279+
}
280+
catch (IOException e ){
281+
// ignore exception as the agent process may be killed
282+
}
283+
}
284+
263285
private void doCompile() throws IOException {
264286
if (traceServer) {
265287
traceOut.println("Agent.Server.doCompile");
@@ -388,7 +410,8 @@ void log(String message) {
388410
private final PrintWriter logWriter;
389411
private final int id;
390412
private final Map<OutputKind, Writer> writers = new EnumMap<>(OutputKind.class);
391-
413+
private final OutputStreamWriter processStdOut = new FileWriter(FileDescriptor.out);
414+
private final OutputStreamWriter processStdErr = new FileWriter(FileDescriptor.err);
392415
/**
393416
* Create an output stream for output to be sent back to the client via the server connection.
394417
* @param kind the kind of stream

src/share/classes/com/sun/javatest/regtest/exec/Agent.java

Lines changed: 35 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,12 @@
2626
package com.sun.javatest.regtest.exec;
2727

2828

29-
import java.io.BufferedReader;
3029
import java.io.DataInputStream;
3130
import java.io.DataOutputStream;
3231
import java.io.EOFException;
3332
import java.io.File;
3433
import java.io.FileWriter;
3534
import java.io.IOException;
36-
import java.io.InputStream;
37-
import java.io.InputStreamReader;
3835
import java.io.PrintStream;
3936
import java.io.PrintWriter;
4037
import java.net.InetAddress;
@@ -60,13 +57,16 @@
6057
import java.util.TreeSet;
6158
import java.util.WeakHashMap;
6259
import java.util.concurrent.CountDownLatch;
60+
import java.util.concurrent.ExecutionException;
6361
import java.util.concurrent.TimeUnit;
62+
import java.util.concurrent.TimeoutException;
6463

6564
import com.sun.javatest.Status;
6665
import com.sun.javatest.TestResult;
6766
import com.sun.javatest.WorkDirectory;
6867
import com.sun.javatest.regtest.TimeoutHandler;
6968
import com.sun.javatest.regtest.agent.ActionHelper;
69+
import com.sun.javatest.regtest.agent.AgentProcessLogger;
7070
import com.sun.javatest.regtest.agent.AgentServer;
7171
import com.sun.javatest.regtest.agent.Alarm;
7272
import com.sun.javatest.regtest.agent.Flags;
@@ -186,8 +186,9 @@ private Agent(File dir, JDK jdk, List<String> vmOpts, Map<String, String> envVar
186186
env.clear();
187187
env.putAll(envVars);
188188
agentServerProcess = process = pb.start();
189-
copyAgentProcessStream("stdout", process.getInputStream());
190-
copyAgentProcessStream("stderr", process.getErrorStream());
189+
190+
processLogger = new AgentProcessLogger(process);
191+
startAgentLog();
191192

192193
try {
193194
final int ACCEPT_TIMEOUT = (int) (60 * 1000 * timeoutFactor);
@@ -219,29 +220,10 @@ private Agent(File dir, JDK jdk, List<String> vmOpts, Map<String, String> envVar
219220
}
220221

221222
/**
222-
* Reads the output written by an agent process, and copies it either to
223-
* the current TestResult object (when one is available) or to the agent's
224-
* log file, if output is found while there is no test using the agent.
225-
*
226-
* @param name the name of the stream
227-
* @param in the stream
223+
* Writes process input and error stream to the agent log.
228224
*/
229-
void copyAgentProcessStream(final String name, final InputStream in) {
230-
Thread t = new Thread() {
231-
@Override
232-
public void run() {
233-
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(in))) {
234-
String line;
235-
while ((line = inReader.readLine()) != null) {
236-
handleProcessStreamLine(name, line);
237-
}
238-
} catch (IOException e) {
239-
// ignore
240-
}
241-
}
242-
};
243-
t.setDaemon(true);
244-
t.start();
225+
private void startAgentLog() {
226+
processLogger.startLogging( (String stream, String logLine) -> log(stream + ": " + logLine));
245227
}
246228

247229
/**
@@ -276,30 +258,35 @@ public void run() {
276258
* @param section the test result section to be used, or {@code null}
277259
*/
278260
private synchronized void captureProcessStreams(TestResult.Section section) {
261+
if (currentTestResultSection != section) {
262+
try {
263+
processLogger.stopLogging();
264+
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
265+
log("Failed to stop agent logging" + ex);
266+
}
267+
}
279268
currentTestResultSection = section;
280269
if (currentTestResultSection == null) {
281270
for (PrintWriter pw : processStreamWriters.values()) {
282271
pw.close();
283272
}
284273
processStreamWriters.clear();
274+
startAgentLog();
275+
} else {
276+
processLogger.startLogging(this::handleProcessStreamLine);
285277
}
286278
}
287279

288280
/**
289281
* Saves a line of output that was written by the agent to stdout (fd1) or stderr (fd2).
290-
* If there is a current test result section, the line is saved there;
291-
* otherwise it is written to the agent log file.
282+
* If there is a current test result section, the line is saved there.
292283
*
293284
* @param name the name of the stream from which the line was read
294285
* @param line the line that was read
295286
*/
296287
private synchronized void handleProcessStreamLine(String name, String line) {
297-
if (currentTestResultSection == null) {
298-
log(name + ": " + line);
299-
} else {
300-
processStreamWriters.computeIfAbsent(name, currentTestResultSection::createOutput)
301-
.println(line);
302-
}
288+
processStreamWriters.computeIfAbsent(name, currentTestResultSection::createOutput)
289+
.println(line);
303290
}
304291

305292
public boolean matches(File execDir, JDK jdk, List<String> vmOpts) {
@@ -409,18 +396,25 @@ public void run() {
409396
Status actionStatus = null;
410397
keepAlive.setEnabled(false);
411398
try {
412-
captureProcessStreams(trs);
413399
synchronized (out) {
414400
agentAction.send();
415401
}
402+
// The agent sends process output separator in response
403+
// to receiving a command. Wait for the separator and
404+
// redirect log to the test result section
405+
captureProcessStreams(trs);
416406
trace(actionName + ": request sent");
417407
actionStatus = readResults(trs);
408+
// The agent will be disposed on exception.
409+
// Reset the agent log only if the agent can be reused.
410+
// The agent will send process output separator on
411+
// command execution.
412+
captureProcessStreams(null);
418413
return actionStatus;
419414
} catch (IOException e) {
420415
trace(actionName + ": error " + e);
421416
throw new Fault(e);
422417
} finally {
423-
captureProcessStreams(null);
424418
alarm.cancel();
425419
keepAlive.setEnabled(true);
426420
if (alarm.didFire()) {
@@ -509,7 +503,9 @@ public void close() {
509503
alarm.cancel();
510504
Thread.interrupted(); // clear any interrupted status
511505
}
512-
506+
// Ensure that thread pool threads are shut down
507+
// and the agent log is fully written
508+
processLogger.shutdown();
513509
log("Closed");
514510
}
515511

@@ -626,6 +622,7 @@ private void log(String message, PrintStream out) {
626622
final List<String> vmOpts;
627623
final File execDir;
628624
final Process process;
625+
final AgentProcessLogger processLogger;
629626
final DataInputStream in;
630627
final DataOutputStream out;
631628
final KeepAlive keepAlive;

0 commit comments

Comments
 (0)