001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
018    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
019    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
020    import com.liferay.portal.kernel.log.Log;
021    import com.liferay.portal.kernel.log.LogFactoryUtil;
022    import com.liferay.portal.kernel.util.NamedThreadFactory;
023    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
024    
025    import java.util.List;
026    import java.util.Set;
027    import java.util.concurrent.TimeUnit;
028    
029    /**
030     * @author Michael C. Han
031     * @author Shuyang Zhou
032     */
033    public abstract class BaseAsyncDestination extends BaseDestination {
034    
035            public BaseAsyncDestination() {
036            }
037    
038            /**
039             * @deprecated
040             */
041            public BaseAsyncDestination(String name) {
042                    this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
043            }
044    
045            /**
046             * @deprecated
047             */
048            public BaseAsyncDestination(
049                    String name, int workersCoreSize, int workersMaxSize) {
050    
051                    this.name = name;
052                    _workersCoreSize = workersCoreSize;
053                    _workersMaxSize = workersMaxSize;
054    
055                    open();
056            }
057    
058            @Override
059            public void close(boolean force) {
060                    if (!_threadPoolExecutor.isShutdown()
061                            && !_threadPoolExecutor.isTerminating()) {
062    
063                            if (!force) {
064                                    _threadPoolExecutor.shutdown();
065                            }
066                            else {
067                                    List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
068    
069                                    if (_log.isInfoEnabled()) {
070                                            _log.info(
071                                                    "The following " + pendingTasks.size() + " tasks " +
072                                                            "were not executed due to shutown: " +
073                                                                    pendingTasks);
074                                    }
075                            }
076                    }
077            }
078    
079            public DestinationStatistics getDestinationStatistics() {
080                    DestinationStatistics destinationStatistics =
081                            new DestinationStatistics();
082    
083                    destinationStatistics.setActiveThreadCount(
084                            _threadPoolExecutor.getActiveCount());
085                    destinationStatistics.setCurrentThreadCount(
086                            _threadPoolExecutor.getPoolSize());
087                    destinationStatistics.setLargestThreadCount(
088                            _threadPoolExecutor.getLargestPoolSize());
089                    destinationStatistics.setMaxThreadPoolSize(
090                            _threadPoolExecutor.getMaxPoolSize());
091                    destinationStatistics.setMinThreadPoolSize(
092                            _threadPoolExecutor.getCorePoolSize());
093                    destinationStatistics.setPendingMessageCount(
094                            _threadPoolExecutor.getPendingTaskCount());
095                    destinationStatistics.setSentMessageCount(
096                            _threadPoolExecutor.getCompletedTaskCount());
097    
098                    return destinationStatistics;
099            }
100    
101            public int getMaximumQueueSize() {
102                    return _maximumQueueSize;
103            }
104    
105            public int getWorkersCoreSize() {
106                    return _workersCoreSize;
107            }
108    
109            public int getWorkersMaxSize() {
110                    return _workersMaxSize;
111            }
112    
113            @Override
114            public void open() {
115                    if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
116                            ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
117    
118                            if (_rejectedExecutionHandler == null) {
119                                    _rejectedExecutionHandler = createRejectionExecutionHandler();
120                            }
121    
122                            _threadPoolExecutor = new ThreadPoolExecutor(
123                                    _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
124                                    _maximumQueueSize, _rejectedExecutionHandler,
125                                    new NamedThreadFactory(
126                                            getName(), Thread.NORM_PRIORITY, classLoader),
127                                    new ThreadPoolHandlerAdapter());
128                    }
129            }
130    
131            public void send(Message message) {
132                    if (messageListeners.isEmpty()) {
133                            if (_log.isDebugEnabled()) {
134                                    _log.debug("No message listeners for destination " + getName());
135                            }
136    
137                            return;
138                    }
139    
140                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
141    
142                    if (threadPoolExecutor.isShutdown()) {
143                            throw new IllegalStateException(
144                                    "Destination " + getName() + " is shutdown and cannot " +
145                                            "receive more messages");
146                    }
147    
148                    dispatch(messageListeners, message);
149            }
150    
151            public void setMaximumQueueSize(int maximumQueueSize) {
152                    _maximumQueueSize = maximumQueueSize;
153            }
154    
155            public void setRejectedExecutionHandler(
156                    RejectedExecutionHandler rejectedExecutionHandler) {
157    
158                    _rejectedExecutionHandler = rejectedExecutionHandler;
159            }
160    
161            public void setWorkersCoreSize(int workersCoreSize) {
162                    _workersCoreSize = workersCoreSize;
163            }
164    
165            public void setWorkersMaxSize(int workersMaxSize) {
166                    _workersMaxSize = workersMaxSize;
167            }
168    
169            protected RejectedExecutionHandler createRejectionExecutionHandler() {
170                    return new RejectedExecutionHandler() {
171    
172                            public void rejectedExecution(
173                                    Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
174    
175                                    if (!_log.isWarnEnabled()) {
176                                            return;
177                                    }
178    
179                                    MessageRunnable messageRunnable = (MessageRunnable) runnable;
180    
181                                    _log.warn(
182                                            "Discarding message " + messageRunnable.getMessage() +
183                                                    " because it exceeds the maximum queue size of " +
184                                                            _maximumQueueSize);
185                            }
186    
187                    };
188            }
189    
190            protected abstract void dispatch(
191                    Set<MessageListener> messageListeners, Message message);
192    
193            protected ThreadPoolExecutor getThreadPoolExecutor() {
194                    return _threadPoolExecutor;
195            }
196    
197            private static final int _WORKERS_CORE_SIZE = 2;
198    
199            private static final int _WORKERS_MAX_SIZE = 5;
200    
201            private static Log _log = LogFactoryUtil.getLog(BaseAsyncDestination.class);
202    
203            private int _maximumQueueSize = Integer.MAX_VALUE;
204            private RejectedExecutionHandler _rejectedExecutionHandler;
205            private ThreadPoolExecutor _threadPoolExecutor;
206            private int _workersCoreSize = _WORKERS_CORE_SIZE;
207            private int _workersMaxSize = _WORKERS_MAX_SIZE;
208    
209    }