diff --git a/newrelic-agent/src/main/java/com/newrelic/agent/RPMService.java b/newrelic-agent/src/main/java/com/newrelic/agent/RPMService.java index ea183900c8..e248ea54a1 100644 --- a/newrelic-agent/src/main/java/com/newrelic/agent/RPMService.java +++ b/newrelic-agent/src/main/java/com/newrelic/agent/RPMService.java @@ -62,6 +62,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; /** @@ -95,6 +96,7 @@ public class RPMService extends AbstractService implements IRPMService, Environm private long connectionTimestamp = 0; private final AtomicInteger last503Error = new AtomicInteger(0); private final AtomicInteger retryCount = new AtomicInteger(0); + private final ReentrantLock reentrantLock = new ReentrantLock(); private String rpmLink; private long lastReportTime; @@ -248,47 +250,54 @@ private Map getSettings(boolean sendEnvironmentInfo) { * Notify RPM that this agent has launched, and obtain the agent run id */ @Override - public synchronized void launch() throws Exception { - if (isConnected()) { - return; - } - - Map data = doConnect(); - Agent.LOG.log(Level.FINER, "Connection response : {0}", data); - List requiredParams = new ArrayList<>(Arrays.asList(COLLECT_ERRORS_KEY, COLLECT_TRACES_KEY, DATA_REPORT_PERIOD_KEY)); - if (!data.keySet().containsAll(requiredParams)) { - requiredParams.removeAll(data.keySet()); - throw new UnexpectedException(MessageFormat.format("Missing the following connection parameters: {0}", requiredParams)); - } - Agent.LOG.log(Level.INFO, "Agent {0} connected to {1}", toString(), getHostString()); + public void launch() throws Exception { + reentrantLock.lock(); try { - logCollectorMessages(data); - } catch (Exception ex) { - Agent.LOG.log(Level.FINEST, ex, "Error processing collector connect messages"); - } + if (isConnected()) { + return; + } - AgentConfig config = null; - if (connectionConfigListener != null) { - // Merge server-side data with local config before notifying connection listeners - config = connectionConfigListener.connected(this, data); - } + Map data = doConnect(); + Agent.LOG.log(Level.FINER, "Connection response : {0}", data); + List requiredParams = new ArrayList<>(Arrays.asList(COLLECT_ERRORS_KEY, COLLECT_TRACES_KEY, DATA_REPORT_PERIOD_KEY)); + if (!data.keySet().containsAll(requiredParams)) { + requiredParams.removeAll(data.keySet()); + throw new UnexpectedException(MessageFormat.format("Missing the following connection parameters: {0}", requiredParams)); + } + Agent.LOG.log(Level.INFO, "Agent {0} connected to {1}", toString(), getHostString()); + + try { + logCollectorMessages(data); + } catch (Exception ex) { + Agent.LOG.log(Level.FINEST, ex, "Error processing collector connect messages"); + } - connectionTimestamp = System.nanoTime(); - connected = true; - hasEverConnected = true; - entityGuid = data.get("entity_guid") != null ? data.get("entity_guid").toString() : ""; + AgentConfig config = null; + if (connectionConfigListener != null) { + // Merge server-side data with local config before notifying connection listeners + config = connectionConfigListener.connected(this, data); + } - if (connectionListener != null) { - config = config != null ? config : ServiceFactory.getConfigService().getDefaultAgentConfig(); - connectionListener.connected(this, config); - } + connectionTimestamp = System.nanoTime(); + connected = true; + hasEverConnected = true; + entityGuid = data.get("entity_guid") != null ? data.get("entity_guid").toString() : ""; - String agentRunToken = (String) data.get(ConnectionResponse.AGENT_RUN_ID_KEY); - Map requestMetadata = (Map) data.get(ConnectionResponse.REQUEST_HEADERS); - for (AgentConnectionEstablishedListener listener : agentConnectionEstablishedListeners) { - listener.onEstablished(appName, agentRunToken, requestMetadata); + if (connectionListener != null) { + config = config != null ? config : ServiceFactory.getConfigService().getDefaultAgentConfig(); + connectionListener.connected(this, config); + } + + String agentRunToken = (String) data.get(ConnectionResponse.AGENT_RUN_ID_KEY); + Map requestMetadata = (Map) data.get(ConnectionResponse.REQUEST_HEADERS); + for (AgentConnectionEstablishedListener listener : agentConnectionEstablishedListeners) { + listener.onEstablished(appName, agentRunToken, requestMetadata); + } + } finally { + reentrantLock.unlock(); } + } private Map doConnect() throws Exception { @@ -379,15 +388,21 @@ private void disconnect() { } @Override - public synchronized void reconnect() { - Agent.LOG.log(Level.INFO, "{0} is reconnecting", getApplicationName()); + public void reconnect() { + reentrantLock.lock(); try { - shutdown(); - } catch (Exception e) { - // ignore + Agent.LOG.log(Level.INFO, "{0} is reconnecting", getApplicationName()); + try { + shutdown(); + } catch (Exception e) { + // ignore + } finally { + reconnectAsync(); + } } finally { - reconnectAsync(); + reentrantLock.unlock(); } + } @Override @@ -725,8 +740,16 @@ public boolean isMainApp() { /** * notify RPM that the agent is shutting down */ - public synchronized void shutdown() throws Exception { - disconnect(); + public void shutdown() throws Exception { + if (reentrantLock.tryLock(5, TimeUnit.SECONDS)) { + try { + disconnect(); + } finally { + reentrantLock.unlock(); + } + } else { + Agent.LOG.log(Level.WARNING, "Unable to acquire lock for shutdown within timeout - shutdown may already be in progress"); + } } @Override @@ -740,6 +763,9 @@ public void harvestNow() { // // Note: even if we are not connected, we don't need to initiate a connection attempt - although there are // several cases, bottom line is that the Agent should already be attempting to connect. + // + // Update: Replaced the synchronized block with a reentrant lock with timeout. Same thing in the + // shutdown() method, so threads can now fail gracefully rather than deadlocking indefinitely. final int MAX_WAIT_SECONDS = 10; final long end = System.currentTimeMillis() + MAX_WAIT_SECONDS * 1000L; @@ -747,18 +773,23 @@ public void harvestNow() { Throwable trouble = null; while (!done && System.currentTimeMillis() < end) { + boolean lockAcquired = false; try { - synchronized (this) { + if (reentrantLock.tryLock(200, TimeUnit.MILLISECONDS)) { + lockAcquired = true; if (isConnected()) { ServiceFactory.getHarvestService().harvestNow(); done = true; } } - Thread.sleep(200); } catch (InterruptedException iex) { // sleep returned early - ignore it - the process is ending anyway } catch (Exception ex) { trouble = ex; + } finally { + if (lockAcquired) { + reentrantLock.unlock(); + } } } diff --git a/newrelic-agent/src/main/java/com/newrelic/agent/core/CoreServiceImpl.java b/newrelic-agent/src/main/java/com/newrelic/agent/core/CoreServiceImpl.java index d75bb1a92f..a594027592 100644 --- a/newrelic-agent/src/main/java/com/newrelic/agent/core/CoreServiceImpl.java +++ b/newrelic-agent/src/main/java/com/newrelic/agent/core/CoreServiceImpl.java @@ -30,6 +30,7 @@ import java.text.MessageFormat; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; public class CoreServiceImpl extends AbstractService implements CoreService, HealthDataProducer { @@ -37,7 +38,7 @@ public class CoreServiceImpl extends AbstractService implements CoreService, Hea private final Instrumentation instrumentation; private volatile InstrumentationProxy instrumentationProxy; private final List healthDataChangeListeners = new CopyOnWriteArrayList<>(); - + private final AtomicBoolean shutdownInProgress = new AtomicBoolean(false); public CoreServiceImpl(Instrumentation instrumentation) { super(CoreService.class.getName()); @@ -125,7 +126,7 @@ private void jvmShutdown(long startTime) { } if (config.isSendDataOnExit() && ((System.currentTimeMillis() - startTime) >= config.getSendDataOnExitThresholdInMillis())) { - // Grab all RPMService instances (may be multiple with auto_app_naming enabled) and harvest them + // Grab all RPMService instances (maybe multiple with auto_app_naming enabled) and harvest them List rpmServices = ServiceFactory.getRPMServiceManager().getRPMServices(); for (IRPMService rpmService : rpmServices) { rpmService.harvestNow(); @@ -137,12 +138,15 @@ private void jvmShutdown(long startTime) { getLogger().fine("Agent JVM shutdown hook: done."); } - private synchronized void shutdown() { - try { - ServiceFactory.getServiceManager().stop(); - getLogger().info("New Relic Agent has shutdown"); - } catch (Throwable t) { - Agent.LOG.log(Level.SEVERE, t, "Error shutting down New Relic Agent"); + private void shutdown() { + // Prevent multiple shutdown attempts occurring at the same time + if (shutdownInProgress.compareAndSet(false, true)) { + try { + ServiceFactory.getServiceManager().stop(); + getLogger().info("New Relic Agent has shutdown"); + } catch (Throwable t) { + Agent.LOG.log(Level.SEVERE, t, "Error shutting down New Relic Agent"); + } } } diff --git a/newrelic-agent/src/test/java/com/newrelic/agent/RPMServiceTest.java b/newrelic-agent/src/test/java/com/newrelic/agent/RPMServiceTest.java index da7f6cd4f9..5d0602c07e 100644 --- a/newrelic-agent/src/test/java/com/newrelic/agent/RPMServiceTest.java +++ b/newrelic-agent/src/test/java/com/newrelic/agent/RPMServiceTest.java @@ -1225,6 +1225,154 @@ public void forceRestartExceptionWithPut() throws Exception { doForceRestartException(); } + @Test(timeout = 15000) + public void concurrentHarvestAndShutdown_willNotDeadlock() throws Exception { + final RPMService rmpService = createAndLaunchRPMService(); + + final AtomicReference harvestError = new AtomicReference<>(); + final AtomicReference shutdownError = new AtomicReference<>(); + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(2); + + // This thread calls harvestNow() + Thread harvestThread = new Thread(() -> { + try { + startLatch.await(); + rmpService.harvestNow(); + } catch (Throwable t) { + harvestError.set(t); + } finally { + doneLatch.countDown(); + } + }, "harvest"); + + // This one calls shutdown() + Thread shutdownThread = new Thread(() -> { + try { + startLatch.await(); + rmpService.shutdown(); + } catch (Throwable t) { + shutdownError.set(t); + } finally { + doneLatch.countDown(); + } + }, "shutdown"); + + // Fire off both threads and start them both simultaneously by triggering the startLatch instance + harvestThread.start(); + shutdownThread.start(); + startLatch.countDown(); + + // Wait for both to complete (or timeout after 15 seconds) + boolean completed = doneLatch.await(15, TimeUnit.SECONDS); + + assertTrue("harvest() and shutdown() should both complete without timeout (no deadlock)", completed); + + // Verify no unexpected exceptions (IllegalMonitorStateException should not occur) + if (harvestError.get() != null && !(harvestError.get() instanceof InterruptedException)) { + assertFalse("harvest() should not throw IllegalMonitorStateException", + harvestError.get() instanceof IllegalMonitorStateException); + } + if (shutdownError.get() != null) { + assertFalse("shutdown() should not throw IllegalMonitorStateException", + shutdownError.get() instanceof IllegalMonitorStateException); + } + } + + @Test(timeout = 10000) + public void multipleConcurrentShutdownCalls_doNotDeadlock() throws Exception { + final RPMService rpmService = createAndLaunchRPMService(); + + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(3); + final AtomicInteger successCount = new AtomicInteger(0); + + // 3 threads all try to shutdown() simultaneously + for (int i = 0; i < 3; i++) { + new Thread(() -> { + try { + startLatch.await(); + rpmService.shutdown(); + successCount.incrementAndGet(); + } catch (Exception e) { + // Expected that some may fail, but should not deadlock + } finally { + doneLatch.countDown(); + } + }, "shutdown-" + i).start(); + } + + startLatch.countDown(); + boolean completed = doneLatch.await(10, TimeUnit.SECONDS); + + assertTrue("All shutdown() calls should complete without deadlock", completed); + assertTrue("At least one shutdown should succeed", successCount.get() >= 1); + } + + @Test(timeout = 15000) + public void reconnect_duringHarvest_shouldNotDeadlock() throws Exception { + final RPMService rpmService = createAndLaunchRPMService(); + + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(2); + final AtomicReference error = new AtomicReference<>(); + + // Simulate harvest cycle + Thread harvestThread = new Thread(() -> { + try { + startLatch.await(); + for (int i = 0; i < 5 && !Thread.interrupted(); i++) { + Thread.sleep(100); + } + rpmService.harvestNow(); + } catch (Throwable t) { + if (!(t instanceof InterruptedException)) { + error.set(t); + } + } finally { + doneLatch.countDown(); + } + }, "harvest"); + + // Reconnect call + Thread reconnectThread = new Thread(() -> { + try { + startLatch.await(); + Thread.sleep(50); // Let harvest start first + rpmService.reconnect(); + } catch (Throwable t) { + error.set(t); + } finally { + doneLatch.countDown(); + } + }, "reconnect"); + + harvestThread.start(); + reconnectThread.start(); + + startLatch.countDown(); + + boolean completed = doneLatch.await(15, TimeUnit.SECONDS); + assertTrue("Operations should complete without deadlock", completed); + + if (error.get() != null) { + assertFalse("Should not throw IllegalMonitorStateException", + error.get() instanceof IllegalMonitorStateException); + } + + rpmService.shutdown(); + } + + private RPMService createAndLaunchRPMService() throws Exception { + Map config = createStagingMap(true, false); + createServiceManager(config); + + List appNames = singletonList("MyApplication"); + RPMService rpmService = new RPMService(appNames, null, null, Collections.emptyList()); + rpmService.launch(); + return rpmService; + } + private void doForceRestartException() throws Exception { MockDataSenderFactory dataSenderFactory = new MockDataSenderFactory(); DataSenderFactory.setDataSenderFactory(dataSenderFactory);