diff --git a/rcljava/src/main/java/org/ros2/rcljava/events/EventHandlerImpl.java b/rcljava/src/main/java/org/ros2/rcljava/events/EventHandlerImpl.java index 25545c7f..74e4e6dd 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/events/EventHandlerImpl.java +++ b/rcljava/src/main/java/org/ros2/rcljava/events/EventHandlerImpl.java @@ -64,16 +64,19 @@ public class EventHandlerImpl< * @param eventStatusFactory Factory of an event status. * @param callback The callback function that will be called when the event * is triggered. + * @param disposeCallback Callback that will be called when this object is disposed. */ public EventHandlerImpl( final WeakReference parentReference, final long handle, final Supplier eventStatusFactory, - final Consumer callback) { + final Consumer callback, + final Consumer disposeCallback) { this.parentReference = parentReference; this.handle = handle; this.eventStatusFactory = eventStatusFactory; this.callback = callback; + this.disposeCallback = disposeCallback; } /** @@ -102,9 +105,11 @@ public final long getHandle() { * {@inheritDoc} */ public synchronized final void dispose() { - if (this.handle != 0) { - nativeDispose(this.handle); + if (this.handle == 0) { + return; } + this.disposeCallback.accept(this); + nativeDispose(this.handle); this.handle = 0; } @@ -126,11 +131,12 @@ public synchronized final void executeCallback() { nativeTake(this.handle, nativeEventStatusHandle); eventStatus.fromRCLEvent(nativeEventStatusHandle); eventStatus.deallocateRCLStatusEvent(nativeEventStatusHandle); - callback.accept(eventStatus); + this.callback.accept(eventStatus); } private final Supplier eventStatusFactory; private final WeakReference parentReference; private long handle; private final Consumer callback; + private final Consumer disposeCallback; } diff --git a/rcljava/src/main/java/org/ros2/rcljava/publisher/PublisherImpl.java b/rcljava/src/main/java/org/ros2/rcljava/publisher/PublisherImpl.java index 17018f87..0e93531e 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/publisher/PublisherImpl.java +++ b/rcljava/src/main/java/org/ros2/rcljava/publisher/PublisherImpl.java @@ -117,10 +117,20 @@ public final WeakReference getNodeReference() { public final EventHandler createEventHandler(Supplier factory, Consumer callback) { + final WeakReference> weakEventHandlers = new WeakReference( + this.eventHandlers); + Consumer disposeCallback = new Consumer() { + public void accept(EventHandler eventHandler) { + Collection eventHandlers = weakEventHandlers.get(); + if (eventHandlers != null) { + eventHandlers.remove(eventHandler); + } + } + }; T status = factory.get(); long eventHandle = nativeCreateEvent(this.handle, status.getPublisherEventType()); EventHandler eventHandler = new EventHandlerImpl( - new WeakReference(this), eventHandle, factory, callback); + new WeakReference(this), eventHandle, factory, callback, disposeCallback); this.eventHandlers.add(eventHandler); return eventHandler; } diff --git a/rcljava/src/main/java/org/ros2/rcljava/subscription/SubscriptionImpl.java b/rcljava/src/main/java/org/ros2/rcljava/subscription/SubscriptionImpl.java index 34b8b79c..8e20d81d 100644 --- a/rcljava/src/main/java/org/ros2/rcljava/subscription/SubscriptionImpl.java +++ b/rcljava/src/main/java/org/ros2/rcljava/subscription/SubscriptionImpl.java @@ -22,6 +22,7 @@ import org.ros2.rcljava.RCLJava; import org.ros2.rcljava.common.JNIUtils; +import org.ros2.rcljava.consumers.BiConsumer; import org.ros2.rcljava.consumers.Consumer; import org.ros2.rcljava.events.EventHandler; import org.ros2.rcljava.events.EventHandlerImpl; @@ -128,10 +129,20 @@ public final WeakReference getNodeReference() { public final EventHandler createEventHandler(Supplier factory, Consumer callback) { + final WeakReference> weakEventHandlers = new WeakReference( + this.eventHandlers); + Consumer disposeCallback = new Consumer() { + public void accept(EventHandler eventHandler) { + Collection eventHandlers = weakEventHandlers.get(); + if (eventHandlers != null) { + eventHandlers.remove(eventHandler); + } + } + }; T status = factory.get(); long eventHandle = nativeCreateEvent(this.handle, status.getSubscriptionEventType()); EventHandler eventHandler = new EventHandlerImpl( - new WeakReference(this), eventHandle, factory, callback); + new WeakReference(this), eventHandle, factory, callback, disposeCallback); this.eventHandlers.add(eventHandler); return eventHandler; } diff --git a/rcljava/src/test/java/org/ros2/rcljava/publisher/PublisherTest.java b/rcljava/src/test/java/org/ros2/rcljava/publisher/PublisherTest.java index 7ee84de8..3890774a 100644 --- a/rcljava/src/test/java/org/ros2/rcljava/publisher/PublisherTest.java +++ b/rcljava/src/test/java/org/ros2/rcljava/publisher/PublisherTest.java @@ -90,9 +90,12 @@ public void accept(final OfferedQosIncompatible status) { } ); assertNotEquals(0, eventHandler.getHandle()); + assertEquals(1, publisher.getEventHandlers().size()); // force executing the callback, so we check that taking an event works eventHandler.executeCallback(); - RCLJava.shutdown(); + eventHandler.dispose(); assertEquals(0, eventHandler.getHandle()); + assertEquals(0, publisher.getEventHandlers().size()); + RCLJava.shutdown(); } }