AsyncStateMachine.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.coyote;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
/**
* Manages the state transitions for async requests.
*
* <pre>
* The internal states that are used are:
* DISPATCHED - Standard request. Not in Async mode.
* STARTING - ServletRequest.startAsync() has been called from
* Servlet.service() but service() has not exited.
* STARTED - ServletRequest.startAsync() has been called from
* Servlet.service() and service() has exited.
* READ_WRITE_OP - Performing an asynchronous read or write.
* MUST_COMPLETE - ServletRequest.startAsync() followed by complete() have
* been called during a single Servlet.service() method. The
* complete() will be processed as soon as Servlet.service()
* exits.
* COMPLETE_PENDING - ServletRequest.startAsync() has been called from
* Servlet.service() but, before service() exited, complete()
* was called from another thread. The complete() will
* be processed as soon as Servlet.service() exits.
* COMPLETING - The call to complete() was made once the request was in
* the STARTED state.
* TIMING_OUT - The async request has timed out and is waiting for a call
* to complete() or dispatch(). If that isn't made, the error
* state will be entered.
* MUST_DISPATCH - ServletRequest.startAsync() followed by dispatch() have
* been called during a single Servlet.service() method. The
* dispatch() will be processed as soon as Servlet.service()
* exits.
* DISPATCH_PENDING - ServletRequest.startAsync() has been called from
* Servlet.service() but, before service() exited, dispatch()
* was called from another thread. The dispatch() will
* be processed as soon as Servlet.service() exits.
* DISPATCHING - The dispatch is being processed.
* MUST_ERROR - ServletRequest.startAsync() has been called from
* Servlet.service() but, before service() exited, an I/O
* error occurred on another thread. The container will
* perform the necessary error handling when
* Servlet.service() exits.
* ERROR - Something went wrong.
*
*
* The valid state transitions are:
*
* post() dispatched()
* |-------»------------------»---------| |-------«-----------------------«-----|
* | | | |
* | | | post() |
* | post() \|/ \|/ dispatched() |
* | |-----»----------------»DISPATCHED«-------------«-------------| |
* | | | /|\ | | |
* | | startAsync()| |--|timeout() | |
* ^ | | | |
* | | complete() | dispatch() ^ |
* | | |--«---------------«-- | ---«--MUST_ERROR--»-----| | |
* | | | | /|\ | | |
* | ^ | | | | | |
* | | | | /-----|error() | | |
* | | | | / | ^ |
* | | \|/ ST-complete() \|/ / ST-dispatch() \|/ | |
* | MUST_COMPLETE«--------«--------STARTING--------»---------»MUST_DISPATCH |
* | / | \ |
* | / | \ |
* | OT-complete() / | \ OT-dispatch() |
* | COMPLETE_PENDING«------«------/ | \-------»---------»DISPATCH_PENDING |
* | | /|\ | /|\ | |
* | | | | | |post() |
* | | |OT-complete() | OT-dispatch()| | |
* | | |---------«-------«---|---«--\ | | |
* | | | \ | | |
* | | /-------«-------«-- | --«---READ_WRITE--»----| | |
* | | / ST-complete() | / /|\ \ | |
* | | / | post()/ / \ | |
* | | / | / / \ | |
* | | / | / / \ | |
* | | / | / / \ | |
* | | / | | / \ | |
* | | / | | / ST-dispatch()\ | |
* | | | | | | \ | |
* | post()| | timeout() post()| | |asyncOperation() \ | timeout() |
* | | | |--| | | | | | |--| |
* | \|/\|/\|/ | complete() \|/ \|/| dispatch() \|/\|/ \|/ | |
* |--«-----COMPLETING«--------«----------STARTED--------»---------»DISPATCHING----|
* /|\ /|\ | /|\ | /|\ /|\
* | | | |--| | |
* | | timeout()| post() | |
* | | | | |
* | | complete() \|/ dispatch() | |
* | |------------«-------TIMING_OUT--------»----------------| |
* | |
* | complete() dispatch() |
* |---------------«-----------ERROR--------------»-----------------|
*
*
* Notes: * For clarity, the transitions to ERROR which are valid from every state apart from
* STARTING are not shown.
* * All transitions may happen on either the Servlet.service() thread (ST) or on any
* other thread (OT) unless explicitly marked.
* </pre>
*/
class AsyncStateMachine {
private static final Log log = LogFactory.getLog(AsyncStateMachine.class);
private static final StringManager sm = StringManager.getManager(AsyncStateMachine.class);
private enum AsyncState {
DISPATCHED(false, false, false, false),
STARTING(true, true, false, false),
STARTED(true, true, false, false),
MUST_COMPLETE(true, true, true, false),
COMPLETE_PENDING(true, true, false, false),
COMPLETING(true, false, true, false),
TIMING_OUT(true, true, false, false),
MUST_DISPATCH(true, true, false, true),
DISPATCH_PENDING(true, true, false, false),
DISPATCHING(true, false, false, true),
READ_WRITE_OP(true, true, false, false),
MUST_ERROR(true, true, false, false),
ERROR(true, true, false, false);
private final boolean isAsync;
private final boolean isStarted;
private final boolean isCompleting;
private final boolean isDispatching;
AsyncState(boolean isAsync, boolean isStarted, boolean isCompleting, boolean isDispatching) {
this.isAsync = isAsync;
this.isStarted = isStarted;
this.isCompleting = isCompleting;
this.isDispatching = isDispatching;
}
boolean isAsync() {
return isAsync;
}
boolean isStarted() {
return isStarted;
}
boolean isDispatching() {
return isDispatching;
}
boolean isCompleting() {
return isCompleting;
}
}
private volatile AsyncState state = AsyncState.DISPATCHED;
private volatile long lastAsyncStart = 0;
/*
* Tracks the current generation of async processing for this state machine. The generation is incremented every
* time async processing is started. The primary purpose of this is to enable Tomcat to detect and prevent attempts
* to process an event for a previous generation with the current generation as processing such an event usually
* ends badly: e.g. CVE-2018-8037.
*/
private final AtomicLong generation = new AtomicLong(0);
/*
* Error processing should only be triggered once per async generation. This field tracks whether the async
* processing has entered the error state during this async cycle.
*
* Guarded by this
*/
private boolean hasProcessedError = false;
// Need this to fire listener on complete
private AsyncContextCallback asyncCtxt = null;
private final AbstractProcessor processor;
AsyncStateMachine(AbstractProcessor processor) {
this.processor = processor;
}
boolean isAsync() {
return state.isAsync();
}
boolean isAsyncDispatching() {
return state.isDispatching();
}
boolean isAsyncStarted() {
return state.isStarted();
}
boolean isAsyncTimingOut() {
return state == AsyncState.TIMING_OUT;
}
boolean isAsyncError() {
return state == AsyncState.ERROR;
}
boolean isCompleting() {
return state.isCompleting();
}
/**
* Obtain the time that this connection last transitioned to async processing.
*
* @return The time (as returned by {@link System#currentTimeMillis()}) that this connection last transitioned to
* async
*/
long getLastAsyncStart() {
return lastAsyncStart;
}
long getCurrentGeneration() {
return generation.get();
}
synchronized void asyncStart(AsyncContextCallback asyncCtxt) {
if (state == AsyncState.DISPATCHED) {
generation.incrementAndGet();
updateState(AsyncState.STARTING);
// Note: In this instance, caller is responsible for calling
// asyncCtxt.incrementInProgressAsyncCount() as that allows simpler
// error handling.
this.asyncCtxt = asyncCtxt;
lastAsyncStart = System.currentTimeMillis();
} else {
throw new IllegalStateException(sm.getString("asyncStateMachine.invalidAsyncState", "asyncStart()", state));
}
}
synchronized void asyncOperation() {
if (state == AsyncState.STARTED) {
updateState(AsyncState.READ_WRITE_OP);
} else {
throw new IllegalStateException(
sm.getString("asyncStateMachine.invalidAsyncState", "asyncOperation()", state));
}
}
/*
* Async has been processed. Whether or not to enter a long poll depends on current state. For example, as per
* SRV.2.3.3.3 can now process calls to complete() or dispatch().
*/
synchronized SocketState asyncPostProcess() throws IOException {
if (state == AsyncState.COMPLETE_PENDING) {
clearNonBlockingListeners();
updateState(AsyncState.COMPLETING);
return SocketState.ASYNC_END;
} else if (state == AsyncState.DISPATCH_PENDING) {
clearNonBlockingListeners();
updateState(AsyncState.DISPATCHING);
return SocketState.ASYNC_END;
} else if (state == AsyncState.STARTING || state == AsyncState.READ_WRITE_OP) {
updateState(AsyncState.STARTED);
return SocketState.LONG;
} else if (state == AsyncState.MUST_COMPLETE || state == AsyncState.COMPLETING) {
if (processor.getErrorState().isIoAllowed() && processor.flushBufferedWrite()) {
return SocketState.LONG;
}
asyncCtxt.fireOnComplete();
updateState(AsyncState.DISPATCHED);
asyncCtxt.decrementInProgressAsyncCount();
return SocketState.ASYNC_END;
} else if (state == AsyncState.MUST_DISPATCH) {
updateState(AsyncState.DISPATCHING);
return SocketState.ASYNC_END;
} else if (state == AsyncState.DISPATCHING) {
if (processor.getErrorState().isIoAllowed() && processor.flushBufferedWrite()) {
return SocketState.LONG;
}
asyncCtxt.fireOnComplete();
updateState(AsyncState.DISPATCHED);
asyncCtxt.decrementInProgressAsyncCount();
return SocketState.ASYNC_END;
} else if (state == AsyncState.STARTED) {
// This can occur if an async listener does a dispatch to an async
// servlet during onTimeout
return SocketState.LONG;
} else {
throw new IllegalStateException(
sm.getString("asyncStateMachine.invalidAsyncState", "asyncPostProcess()", state));
}
}
synchronized boolean asyncComplete() {
Request request = processor.getRequest();
if ((request == null || !request.isRequestThread()) &&
(state == AsyncState.STARTING || state == AsyncState.READ_WRITE_OP)) {
updateState(AsyncState.COMPLETE_PENDING);
return false;
}
clearNonBlockingListeners();
boolean triggerDispatch = false;
if (state == AsyncState.STARTING || state == AsyncState.MUST_ERROR) {
// Processing is on a container thread so no need to transfer
// processing to a new container thread
updateState(AsyncState.MUST_COMPLETE);
} else if (state == AsyncState.STARTED) {
updateState(AsyncState.COMPLETING);
// A dispatch to a container thread is always required.
// If on a non-container thread, need to get back onto a container
// thread to complete the processing.
// If on a container thread the current request/response are not the
// request/response associated with the AsyncContext so need a new
// container thread to process the different request/response.
triggerDispatch = true;
} else if (state == AsyncState.READ_WRITE_OP || state == AsyncState.TIMING_OUT || state == AsyncState.ERROR) {
// Read/write operations can happen on or off a container thread but
// while in this state the call to listener that triggers the
// read/write will be in progress on a container thread.
// Processing of timeouts and errors can happen on or off a
// container thread (on is much more likely) but while in this state
// the call that triggers the timeout will be in progress on a
// container thread.
// The socket will be added to the poller when the container thread
// exits the AbstractConnectionHandler.process() method so don't do
// a dispatch here which would add it to the poller a second time.
updateState(AsyncState.COMPLETING);
} else {
throw new IllegalStateException(
sm.getString("asyncStateMachine.invalidAsyncState", "asyncComplete()", state));
}
return triggerDispatch;
}
synchronized boolean asyncTimeout() {
if (state == AsyncState.STARTED) {
updateState(AsyncState.TIMING_OUT);
return true;
} else if (state == AsyncState.COMPLETING || state == AsyncState.DISPATCHING ||
state == AsyncState.DISPATCHED) {
// NOOP - App called complete() or dispatch() between the the
// timeout firing and execution reaching this point
return false;
} else {
throw new IllegalStateException(
sm.getString("asyncStateMachine.invalidAsyncState", "asyncTimeout()", state));
}
}
synchronized boolean asyncDispatch() {
Request request = processor.getRequest();
if ((request == null || !request.isRequestThread()) &&
(state == AsyncState.STARTING || state == AsyncState.READ_WRITE_OP)) {
updateState(AsyncState.DISPATCH_PENDING);
return false;
}
clearNonBlockingListeners();
boolean triggerDispatch = false;
if (state == AsyncState.STARTING || state == AsyncState.MUST_ERROR) {
// Processing is on a container thread so no need to transfer
// processing to a new container thread
updateState(AsyncState.MUST_DISPATCH);
} else if (state == AsyncState.STARTED) {
updateState(AsyncState.DISPATCHING);
// A dispatch to a container thread is always required.
// If on a non-container thread, need to get back onto a container
// thread to complete the processing.
// If on a container thread the current request/response are not the
// request/response associated with the AsyncContext so need a new
// container thread to process the different request/response.
triggerDispatch = true;
} else if (state == AsyncState.READ_WRITE_OP || state == AsyncState.TIMING_OUT || state == AsyncState.ERROR) {
// Read/write operations can happen on or off a container thread but
// while in this state the call to listener that triggers the
// read/write will be in progress on a container thread.
// Processing of timeouts and errors can happen on or off a
// container thread (on is much more likely) but while in this state
// the call that triggers the timeout will be in progress on a
// container thread.
// The socket will be added to the poller when the container thread
// exits the AbstractConnectionHandler.process() method so don't do
// a dispatch here which would add it to the poller a second time.
updateState(AsyncState.DISPATCHING);
} else {
throw new IllegalStateException(
sm.getString("asyncStateMachine.invalidAsyncState", "asyncDispatch()", state));
}
return triggerDispatch;
}
synchronized void asyncDispatched() {
if (state == AsyncState.DISPATCHING || state == AsyncState.MUST_DISPATCH) {
updateState(AsyncState.DISPATCHED);
asyncCtxt.decrementInProgressAsyncCount();
} else {
throw new IllegalStateException(
sm.getString("asyncStateMachine.invalidAsyncState", "asyncDispatched()", state));
}
}
synchronized boolean asyncError() {
Request request = processor.getRequest();
boolean containerThread = (request != null && request.isRequestThread());
if (log.isTraceEnabled()) {
log.trace(sm.getString("asyncStateMachine.asyncError.start"));
}
clearNonBlockingListeners();
if (state == AsyncState.STARTING) {
updateState(AsyncState.MUST_ERROR);
} else {
if (hasProcessedError) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("asyncStateMachine.asyncError.skip"));
}
return false;
}
hasProcessedError = true;
if (state == AsyncState.DISPATCHED) {
// Async error handling has moved processing back into an async
// state. Need to increment in progress count as it will decrement
// when the async state is exited again.
asyncCtxt.incrementInProgressAsyncCount();
updateState(AsyncState.ERROR);
} else {
updateState(AsyncState.ERROR);
}
}
// Return true for non-container threads to trigger a dispatch
return !containerThread;
}
synchronized void asyncRun(Runnable runnable) {
if (state == AsyncState.STARTING || state == AsyncState.STARTED || state == AsyncState.READ_WRITE_OP) {
// Execute the runnable using a container thread from the
// Connector's thread pool. Use a wrapper to prevent a memory leak
Thread currentThread = Thread.currentThread();
ClassLoader oldCL = currentThread.getContextClassLoader();
try {
currentThread.setContextClassLoader(this.getClass().getClassLoader());
processor.execute(runnable);
} finally {
currentThread.setContextClassLoader(oldCL);
}
} else {
throw new IllegalStateException(sm.getString("asyncStateMachine.invalidAsyncState", "asyncRun()", state));
}
}
synchronized boolean isAvailable() {
if (asyncCtxt == null) {
// Async processing has probably been completed in another thread.
// Trigger a timeout to make sure the Processor is cleaned up.
return false;
}
return asyncCtxt.isAvailable();
}
synchronized void recycle() {
// Use lastAsyncStart to determine if this instance has been used since
// it was last recycled. If it hasn't there is no need to recycle again
// which saves the relatively expensive call to notifyAll()
if (lastAsyncStart == 0) {
return;
}
// Ensure in case of error that any non-container threads that have been
// paused are unpaused.
notifyAll();
asyncCtxt = null;
state = AsyncState.DISPATCHED;
lastAsyncStart = 0;
hasProcessedError = false;
}
private void clearNonBlockingListeners() {
processor.getRequest().listener = null;
processor.getRequest().getResponse().listener = null;
}
private synchronized void updateState(AsyncState newState) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("asyncStateMachine.stateChange", state, newState));
}
state = newState;
}
}