-
Notifications
You must be signed in to change notification settings - Fork 198
Fix thread safety issues in Communicator, Session, and AsyncPromiseFulfillerDecorator #433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| package eu.chargetime.ocpp; | ||
|
|
||
| /* | ||
| ChargeTime.eu - Java-OCA-OCPP | ||
| Copyright (C) 2015-2016 Thomas Volden <tv@chargetime.eu> | ||
|
|
@@ -29,7 +30,8 @@ of this software and associated documentation files (the "Software"), to deal | |
|
|
||
| import eu.chargetime.ocpp.feature.Feature; | ||
| import eu.chargetime.ocpp.model.*; | ||
| import java.util.ArrayDeque; | ||
| import java.util.concurrent.ConcurrentLinkedDeque; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -42,11 +44,11 @@ of this software and associated documentation files (the "Software"), to deal | |
| public abstract class Communicator { | ||
| private static final Logger logger = LoggerFactory.getLogger(Communicator.class); | ||
|
|
||
| private final ArrayDeque<Object> transactionQueue; | ||
| private final ConcurrentLinkedDeque<Object> transactionQueue; | ||
| private RetryRunner retryRunner; | ||
| protected Radio radio; | ||
| private CommunicatorEvents events; | ||
| private boolean failedFlag; | ||
| private final AtomicBoolean failedFlag = new AtomicBoolean(false); | ||
|
|
||
| /** | ||
| * Convert a formatted string into a {@link Request}/{@link Confirmation}. This is useful for call | ||
|
|
@@ -149,9 +151,8 @@ public Communicator(Radio transmitter) { | |
| */ | ||
| public Communicator(Radio transmitter, boolean enableTransactionQueue) { | ||
| this.radio = transmitter; | ||
| this.transactionQueue = enableTransactionQueue ? new ArrayDeque<>() : null; | ||
| this.transactionQueue = enableTransactionQueue ? new ConcurrentLinkedDeque<>() : null; | ||
| this.retryRunner = enableTransactionQueue ? new RetryRunner() : null; | ||
| this.failedFlag = false; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -267,7 +268,8 @@ public void sendCallResult(String uniqueId, String action, Confirmation confirma | |
| public void sendCallError( | ||
| String uniqueId, String action, String errorCode, String errorDescription) { | ||
| logger.error( | ||
| "An error occurred. Sending this information: uniqueId {}: action: {}, errorCode: {}, errorDescription: {}", | ||
| "An error occurred. Sending this information: uniqueId {}: action: {}, errorCode: {}," | ||
| + " errorDescription: {}", | ||
|
Comment on lines
+271
to
+272
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert this unnecessary formatting change. |
||
| uniqueId, | ||
| action, | ||
| errorCode, | ||
|
|
@@ -381,7 +383,7 @@ public void receivedMessage(Object input) { | |
| events.onCallResultError( | ||
| call.getId(), call.getErrorCode(), call.getErrorDescription(), call.getRawPayload()); | ||
| } else if (message instanceof CallErrorMessage) { | ||
| failedFlag = true; | ||
| failedFlag.set(true); | ||
| CallErrorMessage call = (CallErrorMessage) message; | ||
| events.onError( | ||
| call.getId(), call.getErrorCode(), call.getErrorDescription(), call.getRawPayload()); | ||
|
|
@@ -417,11 +419,11 @@ private Object getRetryMessage() { | |
| * @return whether a fail flag has been raised. | ||
| */ | ||
| private boolean hasFailed() { | ||
| return failedFlag; | ||
| return failedFlag.get(); | ||
| } | ||
|
|
||
| private void popRetryMessage() { | ||
| if (transactionQueue != null && !transactionQueue.isEmpty()) transactionQueue.pop(); | ||
| if (transactionQueue != null) transactionQueue.pollFirst(); | ||
| } | ||
|
|
||
| /** Will resend transaction related requests. */ | ||
|
|
@@ -433,7 +435,7 @@ public void run() { | |
| Object call; | ||
| try { | ||
| while ((call = getRetryMessage()) != null) { | ||
| failedFlag = false; | ||
| failedFlag.set(false); | ||
| radio.send(call); | ||
| Thread.sleep(DELAY_IN_MILLISECONDS); | ||
| if (!hasFailed()) popRetryMessage(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| package eu.chargetime.ocpp; | ||
|
|
||
| /* | ||
| ChargeTime.eu - Java-OCA-OCPP | ||
| Copyright (C) 2015-2016 Thomas Volden <tv@chargetime.eu> | ||
|
|
@@ -204,11 +205,13 @@ public void accept(SessionEvents eventHandler) { | |
|
|
||
| private class CommunicatorEventHandler implements CommunicatorEvents { | ||
| private static final String OCCURRENCE_CONSTRAINT_VIOLATION = | ||
| "Payload for Action is syntactically correct but at least one of the fields violates occurrence constraints"; | ||
| "Payload for Action is syntactically correct but at least one of the fields violates" | ||
| + " occurrence constraints"; | ||
|
Comment on lines
+208
to
+209
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert this unnecessary formatting change. |
||
| private static final String PROPERTY_CONSTRAINT_VIOLATION = | ||
| "Payload is syntactically correct but at least one field contains an invalid value"; | ||
| private static final String INTERNAL_ERROR = | ||
| "An internal error occurred and the receiver was not able to process the requested Action successfully"; | ||
| "An internal error occurred and the receiver was not able to process the requested Action" | ||
| + " successfully"; | ||
|
Comment on lines
+213
to
+214
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert this unnecessary formatting change. |
||
| private static final String UNABLE_TO_PROCESS = "Unable to process action"; | ||
|
|
||
| @Override | ||
|
|
@@ -266,7 +269,7 @@ public void onCallResult(String id, String action, Object payload) { | |
| } | ||
|
|
||
| @Override | ||
| public synchronized void onCall(String id, String action, Object payload) { | ||
| public void onCall(String id, String action, Object payload) { | ||
| Optional<Feature> featureOptional = featureRepository.findFeature(action); | ||
| if (!featureOptional.isPresent() || featureOptional.get().getConfirmationType() == null) { | ||
| communicator.sendCallError( | ||
|
|
@@ -279,6 +282,7 @@ public synchronized void onCall(String id, String action, Object payload) { | |
| if (request.validate()) { | ||
| CompletableFuture<Confirmation> promise = new CompletableFuture<>(); | ||
| promise.whenComplete(new ConfirmationHandler(id, action, communicator)); | ||
| promise.whenComplete((result, error) -> pendingPromises.remove(id)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert and make this change in a different PR. |
||
| addPendingPromise(id, action, promise); | ||
| dispatcher.handleRequest(promise, request); | ||
| } else { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,184 @@ | ||
| package eu.chargetime.ocpp.test; | ||
|
|
||
| /* | ||
| ChargeTime.eu - Java-OCA-OCPP | ||
|
|
||
| MIT License | ||
|
|
||
| Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| of this software and associated documentation files (the "Software"), to deal | ||
| in the Software without restriction, including without limitation the rights | ||
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| copies of the Software, and to permit persons to whom the Software is | ||
| furnished to do so, subject to the following conditions: | ||
|
|
||
| The above copyright notice and this permission notice shall be included in all | ||
| copies or substantial portions of the Software. | ||
|
|
||
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| SOFTWARE. | ||
| */ | ||
|
|
||
| import static org.junit.Assert.assertTrue; | ||
| import static org.mockito.Mockito.*; | ||
|
|
||
| import eu.chargetime.ocpp.AsyncPromiseFulfillerDecorator; | ||
| import eu.chargetime.ocpp.PromiseFulfiller; | ||
| import eu.chargetime.ocpp.SessionEvents; | ||
| import eu.chargetime.ocpp.model.Confirmation; | ||
| import eu.chargetime.ocpp.model.Request; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.TimeUnit; | ||
| import org.junit.Before; | ||
| import org.junit.Test; | ||
| import org.junit.runner.RunWith; | ||
| import org.mockito.Mock; | ||
| import org.mockito.junit.MockitoJUnitRunner; | ||
|
|
||
| @RunWith(MockitoJUnitRunner.class) | ||
| public class AsyncPromiseFulfillerDecoratorTest { | ||
|
|
||
| @Mock private PromiseFulfiller innerFulfiller; | ||
| @Mock private SessionEvents sessionEvents; | ||
| @Mock private Request request; | ||
|
|
||
| private AsyncPromiseFulfillerDecorator decorator; | ||
|
|
||
| @Before | ||
| public void setup() { | ||
| // Reset to a fresh default executor before each test to avoid cross-test pollution | ||
| AsyncPromiseFulfillerDecorator.setExecutor(createFreshExecutor()); | ||
| decorator = new AsyncPromiseFulfillerDecorator(innerFulfiller); | ||
| } | ||
|
|
||
| private static ExecutorService createFreshExecutor() { | ||
| int coreSize = Runtime.getRuntime().availableProcessors(); | ||
| int maxSize = coreSize * 2; | ||
| return new java.util.concurrent.ThreadPoolExecutor( | ||
| coreSize, | ||
| maxSize, | ||
| 60L, | ||
| TimeUnit.SECONDS, | ||
| new java.util.concurrent.LinkedBlockingQueue<>(1000), | ||
| new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()); | ||
| } | ||
|
|
||
| @Test | ||
| public void fulfill_delegatesToInnerFulfiller() throws Exception { | ||
| CompletableFuture<Confirmation> promise = new CompletableFuture<>(); | ||
| CountDownLatch latch = new CountDownLatch(1); | ||
|
|
||
| doAnswer( | ||
| invocation -> { | ||
| latch.countDown(); | ||
| return null; | ||
| }) | ||
| .when(innerFulfiller) | ||
| .fulfill(any(), any(), any()); | ||
|
|
||
| decorator.fulfill(promise, sessionEvents, request); | ||
|
|
||
| assertTrue("Inner fulfiller was not called within timeout", latch.await(5, TimeUnit.SECONDS)); | ||
| verify(innerFulfiller).fulfill(promise, sessionEvents, request); | ||
| } | ||
|
|
||
| @Test | ||
| public void fulfill_executesAsynchronously_doesNotBlockCaller() throws Exception { | ||
| CountDownLatch blockingLatch = new CountDownLatch(1); | ||
| CountDownLatch callerReturned = new CountDownLatch(1); | ||
|
|
||
| doAnswer( | ||
| invocation -> { | ||
| // Block the inner fulfiller until we signal | ||
| blockingLatch.await(5, TimeUnit.SECONDS); | ||
| return null; | ||
| }) | ||
| .when(innerFulfiller) | ||
| .fulfill(any(), any(), any()); | ||
|
|
||
| // fulfill() should return immediately without blocking | ||
| decorator.fulfill(null, sessionEvents, request); | ||
| callerReturned.countDown(); | ||
|
|
||
| assertTrue("fulfill() blocked the caller thread", callerReturned.getCount() == 0); | ||
|
|
||
| // Release the blocked task | ||
| blockingLatch.countDown(); | ||
| } | ||
|
|
||
| @Test | ||
| public void fulfill_withCustomExecutor_usesCustomExecutor() throws Exception { | ||
| ExecutorService customExecutor = Executors.newSingleThreadExecutor(); | ||
| AsyncPromiseFulfillerDecorator.setExecutor(customExecutor); | ||
|
|
||
| CountDownLatch latch = new CountDownLatch(1); | ||
| doAnswer( | ||
| invocation -> { | ||
| latch.countDown(); | ||
| return null; | ||
| }) | ||
| .when(innerFulfiller) | ||
| .fulfill(any(), any(), any()); | ||
|
|
||
| decorator.fulfill(null, sessionEvents, request); | ||
|
|
||
| assertTrue("Task was not executed on custom executor", latch.await(5, TimeUnit.SECONDS)); | ||
| verify(innerFulfiller).fulfill(null, sessionEvents, request); | ||
|
|
||
| customExecutor.shutdown(); | ||
| } | ||
|
|
||
| @Test | ||
| public void fulfill_multipleConcurrentCalls_allExecuted() throws Exception { | ||
| int callCount = 20; | ||
| CountDownLatch latch = new CountDownLatch(callCount); | ||
|
|
||
| doAnswer( | ||
| invocation -> { | ||
| latch.countDown(); | ||
| return null; | ||
| }) | ||
| .when(innerFulfiller) | ||
| .fulfill(any(), any(), any()); | ||
|
|
||
| for (int i = 0; i < callCount; i++) { | ||
| decorator.fulfill(null, sessionEvents, request); | ||
| } | ||
|
|
||
| assertTrue("Not all concurrent fulfill calls were executed", latch.await(10, TimeUnit.SECONDS)); | ||
| verify(innerFulfiller, times(callCount)).fulfill(any(), any(), any()); | ||
| } | ||
|
|
||
| @Test | ||
| public void fulfill_whenInnerFulfillerThrows_doesNotCrashExecutor() throws Exception { | ||
| doThrow(new RuntimeException("handler error")) | ||
| .when(innerFulfiller) | ||
| .fulfill(any(), any(), any()); | ||
|
|
||
| // First call triggers exception | ||
| decorator.fulfill(null, sessionEvents, request); | ||
| Thread.sleep(200); | ||
|
|
||
| // Second call should still work (executor not dead) | ||
| CountDownLatch latch = new CountDownLatch(1); | ||
| doAnswer( | ||
| invocation -> { | ||
| latch.countDown(); | ||
| return null; | ||
| }) | ||
| .when(innerFulfiller) | ||
| .fulfill(any(), any(), any()); | ||
|
|
||
| decorator.fulfill(null, sessionEvents, request); | ||
|
|
||
| assertTrue("Executor died after exception in fulfiller", latch.await(5, TimeUnit.SECONDS)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about the effects of this. Note that this also affects the request handling on the charging station side, and I've seen CSMS "bomb" charging stations with parallel ChangeConfigurationRequests for all read-write configuration settings. Would this still work the same as before?
Also, I gather that once the threads are exhausted (which may be only 2 on a single-core charging station), the executor switches to executing the request handler within the onCall() caller's thread. But if then an exception is thrown in the request handler, it would not result in the same response as when it was thrown in a separate thread, would it?