diff --git a/scr/src/main/java/org/apache/felix/scr/impl/Activator.java b/scr/src/main/java/org/apache/felix/scr/impl/Activator.java index b1b41cce0a..b37b61190c 100644 --- a/scr/src/main/java/org/apache/felix/scr/impl/Activator.java +++ b/scr/src/main/java/org/apache/felix/scr/impl/Activator.java @@ -97,7 +97,7 @@ public class Activator extends AbstractExtender private ComponentRegistry m_componentRegistry; // thread acting upon configurations - private ComponentActorExecutor m_componentActor; + private ComponentActorThread m_componentActor; private ServiceRegistration m_runtime_reg; @@ -210,8 +210,7 @@ protected void doStart() throws Exception // prepare component registry m_componentBundles = new HashMap<>(); - m_componentActor = new ComponentActorExecutor( this.logger ); - m_componentRegistry = new ComponentRegistry( this.m_configuration, this.logger, this.m_componentActor ); + m_componentRegistry = new ComponentRegistry( this.m_configuration, this.logger ); final ServiceComponentRuntimeImpl runtime = new ServiceComponentRuntimeImpl( m_globalContext, m_componentRegistry ); m_runtime_reg = m_context.registerService( ServiceComponentRuntime.class, @@ -223,6 +222,12 @@ protected void doStart() throws Exception logger.log(Level.INFO, " Version = {0}", null, m_bundle.getVersion().toString() ); + // create and start the component actor + m_componentActor = new ComponentActorThread( this.logger ); + Thread t = new Thread( m_componentActor, "SCR Component Actor" ); + t.setDaemon( true ); + t.start(); + super.doStart(); m_componentCommands = new ComponentCommands(m_context, m_globalContext, runtime, m_configuration); @@ -425,13 +430,14 @@ public void doStop() throws Exception // dispose component registry if ( m_componentRegistry != null ) { + m_componentRegistry.shutdown(); m_componentRegistry = null; } // terminate the actor thread if ( m_componentActor != null ) { - m_componentActor.shutdownNow(); + m_componentActor.terminate(); m_componentActor = null; } ClassUtils.setFrameworkWiring(null); diff --git a/scr/src/main/java/org/apache/felix/scr/impl/BundleComponentActivator.java b/scr/src/main/java/org/apache/felix/scr/impl/BundleComponentActivator.java index 4ff4869792..8dbc277508 100644 --- a/scr/src/main/java/org/apache/felix/scr/impl/BundleComponentActivator.java +++ b/scr/src/main/java/org/apache/felix/scr/impl/BundleComponentActivator.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.StringTokenizer; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -82,7 +81,7 @@ public class BundleComponentActivator implements ComponentActivator private final List> m_holders = new ArrayList<>(); // thread acting upon configurations - private final ScheduledExecutorService m_componentActor; + private final ComponentActorThread m_componentActor; // true as long as the dispose method is not called private final AtomicBoolean m_active = new AtomicBoolean( true ); @@ -197,7 +196,7 @@ public void removeServiceListener(String serviceFilterString, */ public BundleComponentActivator(final ScrLogger scrLogger, final ComponentRegistry componentRegistry, - final ScheduledExecutorService componentActor, + final ComponentActorThread componentActor, final BundleContext context, final ScrConfiguration configuration, final List cachedComponentMetadata, @@ -713,10 +712,10 @@ public void schedule(Runnable task) { if ( isActive() ) { - ScheduledExecutorService cat = m_componentActor; + ComponentActorThread cat = m_componentActor; if ( cat != null ) { - cat.submit( task ); + cat.schedule( task ); } else { @@ -763,7 +762,7 @@ public void leaveCreate(ServiceReference serviceReference) @Override public void missingServicePresent(ServiceReference serviceReference) { - m_componentRegistry.missingServicePresent( serviceReference ); + m_componentRegistry.missingServicePresent( serviceReference, m_componentActor ); } @Override diff --git a/scr/src/main/java/org/apache/felix/scr/impl/ComponentActorExecutor.java b/scr/src/main/java/org/apache/felix/scr/impl/ComponentActorExecutor.java deleted file mode 100644 index 67786694bf..0000000000 --- a/scr/src/main/java/org/apache/felix/scr/impl/ComponentActorExecutor.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.felix.scr.impl; - - -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; - -import org.apache.felix.scr.impl.logger.InternalLogger.Level; -import org.apache.felix.scr.impl.logger.ScrLogger; - - -/** - * The ComponentActorExecutor is the thread used to act upon registered - * components of the service component runtime. - * This is also used by the ComponentRegistry to schedule service.changecount updates. - */ -class ComponentActorExecutor extends ScheduledThreadPoolExecutor -{ - - private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() - { - @Override - public Thread newThread(Runnable r) - { - Thread thread = new Thread(r, "SCR Component Actor"); - thread.setDaemon(true); - return thread; - } - }; - - private final ScrLogger logger; - - ComponentActorExecutor(final ScrLogger log ) - { - super( 1, THREAD_FACTORY ); - logger = log; - } - - @Override - protected void beforeExecute(Thread t, Runnable r) - { - logger.log(Level.DEBUG, "Running task: " + r, null); - } - - @Override - protected void afterExecute(Runnable r, Throwable t) - { - if (t != null) - { - logger.log(Level.ERROR, "Unexpected problem executing task " + r, t); - } - } -} diff --git a/scr/src/main/java/org/apache/felix/scr/impl/ComponentActorThread.java b/scr/src/main/java/org/apache/felix/scr/impl/ComponentActorThread.java new file mode 100644 index 0000000000..d1aa3bafca --- /dev/null +++ b/scr/src/main/java/org/apache/felix/scr/impl/ComponentActorThread.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.felix.scr.impl; + + +import java.util.LinkedList; + +import org.apache.felix.scr.impl.logger.InternalLogger.Level; +import org.apache.felix.scr.impl.logger.ScrLogger; + + +/** + * The ComponentActorThread is the thread used to act upon registered + * components of the service component runtime. + */ +class ComponentActorThread implements Runnable +{ + + // sentinel task to terminate this thread + private static final Runnable TERMINATION_TASK = new Runnable() + { + @Override + public void run() + { + } + + + @Override + public String toString() + { + return "Component Actor Terminator"; + } + }; + + // the queue of Runnable instances to be run + private final LinkedList tasks = new LinkedList<>(); + + private final ScrLogger logger; + + + ComponentActorThread( final ScrLogger log ) + { + logger = log; + } + + + // waits on Runnable instances coming into the queue. As instances come + // in, this method calls the Runnable.run method, logs any exception + // happening and keeps on waiting for the next Runnable. If the Runnable + // taken from the queue is this thread instance itself, the thread + // terminates. + @Override + public void run() + { + logger.log(Level.DEBUG, "Starting ComponentActorThread", null); + + for ( ;; ) + { + final Runnable task; + synchronized ( tasks ) + { + while ( tasks.isEmpty() ) + { + boolean interrupted = Thread.interrupted(); + try + { + tasks.wait(); + } + catch ( InterruptedException ie ) + { + interrupted = true; + // don't care + } + finally + { + if (interrupted) + { // restore interrupt status + Thread.currentThread().interrupt(); + } + } + } + + task = tasks.removeFirst(); + } + + try + { + // return if the task is this thread itself + if ( task == TERMINATION_TASK ) + { + logger.log(Level.DEBUG, "Shutting down ComponentActorThread", + null); + return; + } + + // otherwise execute the task, log any issues + logger.log(Level.DEBUG, "Running task: " + task, null); + task.run(); + } + catch ( Throwable t ) + { + logger.log(Level.ERROR, "Unexpected problem executing task " + task, + t); + } + finally + { + synchronized ( tasks ) + { + tasks.notifyAll(); + } + } + } + } + + + // cause this thread to terminate by adding this thread to the end + // of the queue + void terminate() + { + schedule( TERMINATION_TASK ); + synchronized ( tasks ) + { + while ( !tasks.isEmpty() ) + { + boolean interrupted = Thread.interrupted(); + try + { + tasks.wait(); + } + catch ( InterruptedException e ) + { + interrupted = true; + logger.log(Level.ERROR, + "Interrupted exception waiting for queue to empty", e); + } + finally + { + if (interrupted) + { // restore interrupt status + Thread.currentThread().interrupt(); + } + } + } + } + } + + + // queue the given runnable to be run as soon as possible + void schedule( Runnable task ) + { + synchronized ( tasks ) + { + // append to the task queue + tasks.add( task ); + + logger.log(Level.DEBUG, "Adding task [{0}] as #{1} in the queue", null, + task, tasks.size()); + + // notify the waiting thread + tasks.notifyAll(); + } + } +} diff --git a/scr/src/main/java/org/apache/felix/scr/impl/ComponentRegistry.java b/scr/src/main/java/org/apache/felix/scr/impl/ComponentRegistry.java index 718c18361c..a4cb54dd01 100644 --- a/scr/src/main/java/org/apache/felix/scr/impl/ComponentRegistry.java +++ b/scr/src/main/java/org/apache/felix/scr/impl/ComponentRegistry.java @@ -28,10 +28,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.felix.scr.impl.inject.ComponentMethods; @@ -130,16 +130,14 @@ public class ComponentRegistry private final ScrConfiguration m_configuration; - private final ScheduledExecutorService m_componentActor; - - public ComponentRegistry(final ScrConfiguration scrConfiguration, final ScrLogger logger, final ScheduledExecutorService componentActor ) + public ComponentRegistry( final ScrConfiguration scrConfiguration, final ScrLogger logger ) { m_configuration = scrConfiguration; m_logger = logger; - m_componentActor = componentActor; m_componentHoldersByName = new HashMap<>(); m_componentHoldersByPid = new HashMap<>(); m_componentsById = new HashMap<>(); + } //---------- ComponentManager registration by component Id @@ -562,7 +560,7 @@ public void leaveCreate(final ServiceReference serviceReference) * @param serviceReference * @param actor */ - public synchronized void missingServicePresent( final ServiceReference serviceReference ) + public synchronized void missingServicePresent( final ServiceReference serviceReference, ComponentActorThread actor ) { final List> dependencyManagers = m_missingDependencies.remove( serviceReference ); if ( dependencyManagers != null ) @@ -592,7 +590,7 @@ public String toString() } ; m_logger.log(Level.DEBUG, "Scheduling runnable {0} asynchronously", null, runnable); - m_componentActor.submit( runnable ); + actor.schedule( runnable ); } } @@ -706,6 +704,10 @@ public void unregisterRegionConfigurationSupport( private final AtomicLong changeCount = new AtomicLong(); + private volatile Timer changeCountTimer; + + private final Object changeCountTimerLock = new Object(); + private volatile ServiceRegistration registration; public Dictionary getServiceRegistrationProperties() @@ -727,9 +729,16 @@ public void updateChangeCount() { final long count = this.changeCount.incrementAndGet(); + final Timer timer; + synchronized ( this.changeCountTimerLock ) { + if ( this.changeCountTimer == null ) { + this.changeCountTimer = new Timer("SCR Component Registry", true); + } + timer = this.changeCountTimer; + } try { - m_componentActor.schedule(new Runnable() + timer.schedule(new TimerTask() { @Override @@ -745,9 +754,18 @@ public void run() { // we ignore this as this might happen on shutdown } + synchronized ( changeCountTimerLock ) + { + if ( changeCount.get() == count ) + { + changeCountTimer.cancel(); + changeCountTimer = null; + } + } + } } - }, m_configuration.serviceChangecountTimeout(), TimeUnit.MILLISECONDS); + }, m_configuration.serviceChangecountTimeout()); } catch (Exception e) { m_logger.log(Level.WARN, @@ -756,4 +774,11 @@ public void run() } } } + + public void shutdown() { + final Timer timer = changeCountTimer; + if (timer != null) { + timer.cancel(); + } + } } diff --git a/scr/src/test/java/org/apache/felix/scr/integration/ComponentTestBase.java b/scr/src/test/java/org/apache/felix/scr/integration/ComponentTestBase.java index db3c5ce7fa..51137c94fd 100644 --- a/scr/src/test/java/org/apache/felix/scr/integration/ComponentTestBase.java +++ b/scr/src/test/java/org/apache/felix/scr/integration/ComponentTestBase.java @@ -55,7 +55,6 @@ import javax.inject.Inject; import org.apache.felix.scr.impl.ComponentCommands; -import org.apache.felix.scr.impl.manager.ScrConfiguration; import org.apache.felix.scr.integration.components.SimpleComponent; import org.apache.felix.service.command.Converter; import org.junit.After; diff --git a/scr/src/test/java/org/apache/felix/scr/integration/Felix6778Test.java b/scr/src/test/java/org/apache/felix/scr/integration/Felix6778Test.java deleted file mode 100644 index dbc4079a7e..0000000000 --- a/scr/src/test/java/org/apache/felix/scr/integration/Felix6778Test.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.felix.scr.integration; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.ops4j.pax.exam.Configuration; -import org.ops4j.pax.exam.Option; -import org.ops4j.pax.exam.OptionUtils; -import org.ops4j.pax.exam.junit.PaxExam; -import org.osgi.framework.BundleException; -import org.osgi.framework.Constants; -import org.osgi.framework.InvalidSyntaxException; -import org.osgi.framework.ServiceEvent; -import org.osgi.framework.ServiceListener; -import org.osgi.service.component.runtime.ServiceComponentRuntime; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.ops4j.pax.exam.CoreOptions.systemProperty; - -@RunWith(PaxExam.class) -public class Felix6778Test extends ComponentTestBase implements ServiceListener -{ - - private static final long DS_SERVICE_CHANGECOUNT_TIMEOUT = 1000; - - static - { - descriptorFile = "/integration_test_simple_components.xml"; - } - - class RecordedScrChangeCount - { - private final Thread thread; - private final long changecount; - - RecordedScrChangeCount(ServiceEvent event) - { - this.thread = Thread.currentThread(); - this.changecount = (long) event.getServiceReference().getProperty(Constants.SERVICE_CHANGECOUNT); - } - } - - @Configuration - public static Option[] configuration() - { - return OptionUtils.combine(ComponentTestBase.configuration(), - systemProperty( "ds.service.changecount.timeout" ).value( Long.toString(DS_SERVICE_CHANGECOUNT_TIMEOUT) )); - } - - private List recordedEvents = new CopyOnWriteArrayList<>(); - - @Before - public void addServiceListener() throws InvalidSyntaxException - { - bundleContext.addServiceListener( - this, - "("+Constants.OBJECTCLASS + "=" + ServiceComponentRuntime.class.getName() + ")" - ); - } - - @After - public void removeServiceListener() - { - bundleContext.removeServiceListener(this); - } - - @Test - public void verify_changecount_updates() throws InterruptedException, BundleException - { - // Wait for 2x the changecount timeout`to account for the asynchronous service.changecount property update - Thread.sleep(DS_SERVICE_CHANGECOUNT_TIMEOUT * 2); - - // Check that the service.changecount update was recorded - assertEquals(1, recordedEvents.size()); - assertEquals(13L, recordedEvents.get(0).changecount); - assertEquals("SCR Component Actor", recordedEvents.get(0).thread.getName()); - - // Trigger a change by stopping the bundle with components - bundle.stop(); - - // Wait for 2x the changecount timeout`to account for the asynchronous service.changecount property update - Thread.sleep(DS_SERVICE_CHANGECOUNT_TIMEOUT * 2); - - // Check that another service.changecount update was recorded - assertEquals(2, recordedEvents.size()); - assertEquals(26L, recordedEvents.get(1).changecount); - assertEquals("SCR Component Actor", recordedEvents.get(1).thread.getName()); - - // Check if both events originate from the same thread - assertSame(recordedEvents.get(0).thread, recordedEvents.get(1).thread); - } - - @Override - public void serviceChanged(ServiceEvent event) - { - if (event.getType() == ServiceEvent.MODIFIED) - { - recordedEvents.add(new RecordedScrChangeCount(event)); - } - } - - -}