001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
018 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
019 import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.util.NamedThreadFactory;
023 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
024
025 import java.util.List;
026 import java.util.Set;
027 import java.util.concurrent.TimeUnit;
028
029
033 public abstract class BaseAsyncDestination extends BaseDestination {
034
035 public BaseAsyncDestination() {
036 }
037
038
041 public BaseAsyncDestination(String name) {
042 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
043 }
044
045
048 public BaseAsyncDestination(
049 String name, int workersCoreSize, int workersMaxSize) {
050
051 this.name = name;
052 _workersCoreSize = workersCoreSize;
053 _workersMaxSize = workersMaxSize;
054
055 open();
056 }
057
058 @Override
059 public void close(boolean force) {
060 if (!_threadPoolExecutor.isShutdown()
061 && !_threadPoolExecutor.isTerminating()) {
062
063 if (!force) {
064 _threadPoolExecutor.shutdown();
065 }
066 else {
067 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
068
069 if (_log.isInfoEnabled()) {
070 _log.info(
071 "The following " + pendingTasks.size() + " tasks " +
072 "were not executed due to shutown: " +
073 pendingTasks);
074 }
075 }
076 }
077 }
078
079 public DestinationStatistics getDestinationStatistics() {
080 DestinationStatistics destinationStatistics =
081 new DestinationStatistics();
082
083 destinationStatistics.setActiveThreadCount(
084 _threadPoolExecutor.getActiveCount());
085 destinationStatistics.setCurrentThreadCount(
086 _threadPoolExecutor.getPoolSize());
087 destinationStatistics.setLargestThreadCount(
088 _threadPoolExecutor.getLargestPoolSize());
089 destinationStatistics.setMaxThreadPoolSize(
090 _threadPoolExecutor.getMaxPoolSize());
091 destinationStatistics.setMinThreadPoolSize(
092 _threadPoolExecutor.getCorePoolSize());
093 destinationStatistics.setPendingMessageCount(
094 _threadPoolExecutor.getPendingTaskCount());
095 destinationStatistics.setSentMessageCount(
096 _threadPoolExecutor.getCompletedTaskCount());
097
098 return destinationStatistics;
099 }
100
101 public int getMaximumQueueSize() {
102 return _maximumQueueSize;
103 }
104
105 public int getWorkersCoreSize() {
106 return _workersCoreSize;
107 }
108
109 public int getWorkersMaxSize() {
110 return _workersMaxSize;
111 }
112
113 @Override
114 public void open() {
115 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
116 ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
117
118 if (_rejectedExecutionHandler == null) {
119 _rejectedExecutionHandler = createRejectionExecutionHandler();
120 }
121
122 _threadPoolExecutor = new ThreadPoolExecutor(
123 _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
124 _maximumQueueSize, _rejectedExecutionHandler,
125 new NamedThreadFactory(
126 getName(), Thread.NORM_PRIORITY, classLoader),
127 new ThreadPoolHandlerAdapter());
128 }
129 }
130
131 public void send(Message message) {
132 if (messageListeners.isEmpty()) {
133 if (_log.isDebugEnabled()) {
134 _log.debug("No message listeners for destination " + getName());
135 }
136
137 return;
138 }
139
140 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
141
142 if (threadPoolExecutor.isShutdown()) {
143 throw new IllegalStateException(
144 "Destination " + getName() + " is shutdown and cannot " +
145 "receive more messages");
146 }
147
148 dispatch(messageListeners, message);
149 }
150
151 public void setMaximumQueueSize(int maximumQueueSize) {
152 _maximumQueueSize = maximumQueueSize;
153 }
154
155 public void setRejectedExecutionHandler(
156 RejectedExecutionHandler rejectedExecutionHandler) {
157
158 _rejectedExecutionHandler = rejectedExecutionHandler;
159 }
160
161 public void setWorkersCoreSize(int workersCoreSize) {
162 _workersCoreSize = workersCoreSize;
163 }
164
165 public void setWorkersMaxSize(int workersMaxSize) {
166 _workersMaxSize = workersMaxSize;
167 }
168
169 protected RejectedExecutionHandler createRejectionExecutionHandler() {
170 return new RejectedExecutionHandler() {
171
172 public void rejectedExecution(
173 Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
174
175 if (!_log.isWarnEnabled()) {
176 return;
177 }
178
179 MessageRunnable messageRunnable = (MessageRunnable) runnable;
180
181 _log.warn(
182 "Discarding message " + messageRunnable.getMessage() +
183 " because it exceeds the maximum queue size of " +
184 _maximumQueueSize);
185 }
186
187 };
188 }
189
190 protected abstract void dispatch(
191 Set<MessageListener> messageListeners, Message message);
192
193 protected ThreadPoolExecutor getThreadPoolExecutor() {
194 return _threadPoolExecutor;
195 }
196
197 private static final int _WORKERS_CORE_SIZE = 2;
198
199 private static final int _WORKERS_MAX_SIZE = 5;
200
201 private static Log _log = LogFactoryUtil.getLog(BaseAsyncDestination.class);
202
203 private int _maximumQueueSize = Integer.MAX_VALUE;
204 private RejectedExecutionHandler _rejectedExecutionHandler;
205 private ThreadPoolExecutor _threadPoolExecutor;
206 private int _workersCoreSize = _WORKERS_CORE_SIZE;
207 private int _workersMaxSize = _WORKERS_MAX_SIZE;
208
209 }