001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.ArrayList;
018    import java.util.HashSet;
019    import java.util.List;
020    import java.util.Set;
021    import java.util.concurrent.AbstractExecutorService;
022    import java.util.concurrent.Executors;
023    import java.util.concurrent.ThreadFactory;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
026    import java.util.concurrent.locks.Condition;
027    import java.util.concurrent.locks.ReentrantLock;
028    
029    /**
030     * <p>
031     * See http://issues.liferay.com/browse/LPS-14986.
032     * </p>
033     *
034     * @author Shuyang Zhou
035     */
036    public class ThreadPoolExecutor extends AbstractExecutorService {
037    
038            public ThreadPoolExecutor(int corePoolSize, int maxPoolSize) {
039                    this(
040                            corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, false,
041                            Integer.MAX_VALUE, new AbortPolicy(),
042                            Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
043            }
044    
045            public ThreadPoolExecutor(
046                    int corePoolSize, int maxPoolSize, long keepAliveTime,
047                    TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize) {
048    
049                    this(
050                            corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
051                            allowCoreThreadTimeout, maxQueueSize, new AbortPolicy(),
052                            Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
053            }
054    
055            public ThreadPoolExecutor(
056                    int corePoolSize, int maxPoolSize, long keepAliveTime,
057                    TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize,
058                    RejectedExecutionHandler rejectedExecutionHandler,
059                    ThreadFactory threadFactory, ThreadPoolHandler threadPoolHandler) {
060    
061                    if ((corePoolSize < 0) || (maxPoolSize <= 0) ||
062                            (maxPoolSize < corePoolSize) || (keepAliveTime < 0) ||
063                            (maxQueueSize <= 0)) {
064    
065                            throw new IllegalArgumentException();
066                    }
067    
068                    if ((rejectedExecutionHandler == null) || (threadFactory == null) ||
069                            (threadPoolHandler == null)) {
070    
071                            throw new NullPointerException();
072                    }
073    
074                    _corePoolSize = corePoolSize;
075                    _maxPoolSize = maxPoolSize;
076                    _keepAliveTime = timeUnit.toNanos(keepAliveTime);
077                    _allowCoreThreadTimeout = allowCoreThreadTimeout;
078                    _rejectedExecutionHandler = rejectedExecutionHandler;
079                    _threadFactory = threadFactory;
080                    _threadPoolHandler = threadPoolHandler;
081                    _taskQueue = new TaskQueue<Runnable>(maxQueueSize);
082                    _workerTasks = new HashSet<WorkerTask>();
083            }
084    
085            public void adjustPoolSize(int newCorePoolSize, int newMaxPoolSize) {
086                    if ((newCorePoolSize < 0) || (newMaxPoolSize <= 0) ||
087                            (newMaxPoolSize < newCorePoolSize)) {
088    
089                            throw new IllegalArgumentException();
090                    }
091    
092                    _mainLock.lock();
093    
094                    try {
095                            int surplusCoreThreads = _corePoolSize - newCorePoolSize;
096                            int surplusMaxPoolSize = _maxPoolSize - newMaxPoolSize;
097    
098                            _corePoolSize = newCorePoolSize;
099                            _maxPoolSize = newMaxPoolSize;
100    
101                            if (((surplusCoreThreads > 0) && (_poolSize > _corePoolSize)) ||
102                                    ((surplusMaxPoolSize > 0) && (_poolSize > _maxPoolSize))) {
103    
104                                    int interruptCount = Math.max(
105                                            surplusCoreThreads, surplusMaxPoolSize);
106    
107                                    for (WorkerTask workerTask : _workerTasks) {
108                                            if (interruptCount > 0) {
109                                                    if (workerTask._interruptIfWaiting()) {
110                                                            interruptCount--;
111                                                    }
112                                            }
113                                            else {
114                                                    break;
115                                            }
116                                    }
117                            }
118                            else {
119                                    Runnable runnable = null;
120    
121                                    while ((surplusCoreThreads++ < 0) &&
122                                               (_poolSize < _corePoolSize) &&
123                                               ((runnable = _taskQueue.poll()) != null)) {
124    
125                                            _doAddWorkerThread(runnable);
126                                    }
127                            }
128                    }
129                    finally {
130                            _mainLock.unlock();
131                    }
132            }
133    
134            public boolean awaitTermination(long timeout, TimeUnit timeUnit)
135                    throws InterruptedException {
136    
137                    long nanos = timeUnit.toNanos(timeout);
138    
139                    _mainLock.lock();
140    
141                    try {
142                            while (true) {
143                                    if (_runState == _TERMINATED) {
144                                            return true;
145                                    }
146    
147                                    if (nanos <= 0) {
148                                            return false;
149                                    }
150    
151                                    nanos = _terminationCondition.awaitNanos(nanos);
152                            }
153                    }
154                    finally {
155                            _mainLock.unlock();
156                    }
157            }
158    
159            public void execute(Runnable runnable) {
160                    if (runnable == null) {
161                            throw new NullPointerException();
162                    }
163    
164                    boolean[] hasWaiterMarker = new boolean[1];
165    
166                    if (_runState == _RUNNING &&
167                            _taskQueue.offer(runnable, hasWaiterMarker)) {
168    
169                            if (_runState != _RUNNING) {
170                                    if (_taskQueue.remove(runnable)) {
171                                            _rejectedExecutionHandler.rejectedExecution(runnable, this);
172                                    }
173    
174                                    return;
175                            }
176    
177                            if (!hasWaiterMarker[0]) {
178                                    _addWorkerThread();
179                            }
180    
181                            return;
182                    }
183    
184                    _rejectedExecutionHandler.rejectedExecution(runnable, this);
185            }
186    
187            public int getActiveCount() {
188                    _mainLock.lock();
189    
190                    try {
191                            int count = 0;
192    
193                            for (WorkerTask workerTask : _workerTasks) {
194                                    if (workerTask._isLocked()) {
195                                            count++;
196                                    }
197                            }
198    
199                            return count;
200                    }
201                    finally {
202                            _mainLock.unlock();
203                    }
204            }
205    
206            public long getCompletedTaskCount() {
207                    _mainLock.lock();
208    
209                    try {
210                            long count = _completedTaskCount;
211    
212                            for (WorkerTask workerTask : _workerTasks) {
213                                    count += workerTask._localCompletedTaskCount;
214                            }
215    
216                            return count;
217                    }
218                    finally {
219                            _mainLock.unlock();
220                    }
221            }
222    
223            public int getCorePoolSize() {
224                    return _corePoolSize;
225            }
226    
227            public long getKeepAliveTime(TimeUnit timeUnit) {
228                    return timeUnit.convert(_keepAliveTime, TimeUnit.NANOSECONDS);
229            }
230    
231            public int getLargestPoolSize() {
232                    return _largestPoolSize;
233            }
234    
235            public int getMaxPoolSize() {
236                    return _maxPoolSize;
237            }
238    
239            public int getPendingTaskCount() {
240                    return _taskQueue.size();
241            }
242    
243            public int getPoolSize() {
244                    return _poolSize;
245            }
246    
247            public RejectedExecutionHandler getRejectedExecutionHandler() {
248                    return _rejectedExecutionHandler;
249            }
250    
251            public int getRemainingTaskQueueCapacity() {
252                    return _taskQueue.remainingCapacity();
253            }
254    
255            public long getTaskCount() {
256                    _mainLock.lock();
257    
258                    try {
259                            long count = _completedTaskCount;
260    
261                            for (WorkerTask workerTask : _workerTasks) {
262                                    count += workerTask._localCompletedTaskCount;
263    
264                                    if (workerTask._isLocked()) {
265                                            count++;
266                                    }
267                            }
268    
269                            return count + _taskQueue.size();
270                    }
271                    finally {
272                            _mainLock.unlock();
273                    }
274            }
275    
276            public ThreadFactory getThreadFactory() {
277                    return _threadFactory;
278            }
279    
280            public ThreadPoolHandler getThreadPoolHandler() {
281                    return _threadPoolHandler;
282            }
283    
284            public boolean isAllowCoreThreadTimeout() {
285                    return _allowCoreThreadTimeout;
286            }
287    
288            public boolean isShutdown() {
289                    if (_runState != _RUNNING) {
290                            return true;
291                    }
292                    else {
293                            return false;
294                    }
295            }
296    
297            public boolean isTerminated() {
298                    if (_runState == _TERMINATED) {
299                            return true;
300                    }
301                    else {
302                            return false;
303                    }
304            }
305    
306            public boolean isTerminating() {
307                    if (_runState == _STOP) {
308                            return true;
309                    }
310                    else {
311                            return false;
312                    }
313            }
314    
315            public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
316                    _allowCoreThreadTimeout = allowCoreThreadTimeout;
317            }
318    
319            public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
320                    if (keepAliveTime < 0) {
321                            throw new IllegalArgumentException();
322                    }
323    
324                    _keepAliveTime = timeUnit.toNanos(keepAliveTime);
325            }
326    
327            public void setRejectedExecutionHandler(
328                    RejectedExecutionHandler rejectedExecutionHandler) {
329    
330                    if (rejectedExecutionHandler == null) {
331                            throw new NullPointerException();
332                    }
333    
334                    _rejectedExecutionHandler = rejectedExecutionHandler;
335            }
336    
337            public void setThreadFactory(ThreadFactory threadFactory) {
338                    if (threadFactory == null) {
339                            throw new NullPointerException();
340                    }
341    
342                    _threadFactory = threadFactory;
343            }
344    
345            public void setThreadPoolHandler(ThreadPoolHandler threadPoolHandler) {
346                    if (threadPoolHandler == null) {
347                            throw new NullPointerException();
348                    }
349    
350                    _threadPoolHandler = threadPoolHandler;
351            }
352    
353            public void shutdown() {
354                    _mainLock.lock();
355    
356                    try {
357                            int state = _runState;
358    
359                            if (state < _SHUTDOWN) {
360                                    _runState = _SHUTDOWN;
361                            }
362    
363                            for (WorkerTask workerTask : _workerTasks) {
364                                    workerTask._interruptIfWaiting();
365                            }
366    
367                            _tryTerminate();
368                    }
369                    finally {
370                            _mainLock.unlock();
371                    }
372            }
373    
374            public List<Runnable> shutdownNow() {
375                    _mainLock.lock();
376    
377                    try {
378                            int state = _runState;
379    
380                            if (state < _STOP) {
381                                    _runState = _STOP;
382                            }
383    
384                            for (WorkerTask workerTask : _workerTasks) {
385                                    workerTask._thread.interrupt();
386                            }
387    
388                            List<Runnable> runnables = new ArrayList<Runnable>();
389    
390                            _taskQueue.drainTo(runnables);
391    
392                            _tryTerminate();
393    
394                            return runnables;
395                    }
396                    finally {
397                            _mainLock.unlock();
398                    }
399            }
400    
401            @Override
402            protected void finalize() {
403                    shutdown();
404            }
405    
406            protected ReentrantLock getMainLock() {
407                    return _mainLock;
408            }
409    
410            protected TaskQueue<Runnable> getTaskQueue() {
411                    return _taskQueue;
412            }
413    
414            protected Set<WorkerTask> getWorkerTasks() {
415                    return _workerTasks;
416            }
417    
418            private void _addWorkerThread() {
419                    int runState = _runState;
420                    int poolSize = _poolSize;
421    
422                    if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
423                            ((runState == _SHUTDOWN) && (poolSize == 0) &&
424                             !_taskQueue.isEmpty())) {
425    
426                            _mainLock.lock();
427    
428                            try {
429                                    runState = _runState;
430                                    poolSize = _poolSize;
431    
432                                    if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
433                                            ((runState == _SHUTDOWN) && (poolSize == 0) &&
434                                             !_taskQueue.isEmpty())) {
435    
436                                            _doAddWorkerThread(_taskQueue.poll());
437                                    }
438                            }
439                            finally {
440                                    _mainLock.unlock();
441                            }
442                    }
443            }
444    
445            private void _doAddWorkerThread(Runnable runnable) {
446                    WorkerTask workerTask = new WorkerTask(runnable);
447    
448                    _workerTasks.add(workerTask);
449    
450                    int poolSize = ++_poolSize;
451    
452                    if (poolSize > _largestPoolSize) {
453                            _largestPoolSize = poolSize;
454                    }
455    
456                    workerTask._startWork();
457            }
458    
459            private Runnable _getTask(WorkerTask workerTask, boolean[] cleanUpMarker) {
460                    while (true) {
461                            try {
462                                    int state = _runState;
463    
464                                    if (state >= _STOP) {
465                                            return null;
466                                    }
467    
468                                    Runnable runnable = null;
469    
470                                    if (state == _SHUTDOWN) {
471                                            runnable = _taskQueue.poll();
472                                    }
473                                    else if ((_poolSize > _corePoolSize) ||
474                                                     _allowCoreThreadTimeout) {
475    
476                                            runnable = _taskQueue.poll(
477                                                    _keepAliveTime, TimeUnit.NANOSECONDS);
478                                    }
479                                    else {
480                                            runnable = _taskQueue.take();
481                                    }
482    
483                                    if (runnable != null) {
484                                            return runnable;
485                                    }
486    
487                                    _mainLock.lock();
488    
489                                    try {
490                                            if ((_runState >= _STOP) ||
491                                                    ((_runState >= _SHUTDOWN) && _taskQueue.isEmpty()) ||
492                                                    (_allowCoreThreadTimeout &&
493                                                     ((_poolSize > 1) || _taskQueue.isEmpty())) ||
494                                                    (!_allowCoreThreadTimeout &&
495                                                     (_poolSize > _corePoolSize))) {
496    
497                                                    _completedTaskCount +=
498                                                            workerTask._localCompletedTaskCount;
499    
500                                                    _workerTasks.remove(workerTask);
501    
502                                                    if (--_poolSize == 0) {
503                                                            _tryTerminate();
504                                                    }
505    
506                                                    cleanUpMarker[0] = true;
507    
508                                                    return null;
509                                            }
510                                    }
511                                    finally {
512                                            _mainLock.unlock();
513                                    }
514                            }
515                            catch (InterruptedException ie) {
516                            }
517                    }
518            }
519    
520            private void _tryTerminate() {
521                    if (_poolSize == 0) {
522                            int state = _runState;
523    
524                            if ((state == _STOP) ||
525                                    ((state == _SHUTDOWN) && _taskQueue.isEmpty())) {
526    
527                                    _runState = _TERMINATED;
528    
529                                    _terminationCondition.signalAll();
530                                    _threadPoolHandler.terminated();
531    
532                                    return;
533                            }
534    
535                            if (!_taskQueue.isEmpty()) {
536                                    _doAddWorkerThread(_taskQueue.poll());
537                            }
538                    }
539            }
540    
541            private static final int _RUNNING = 0;
542    
543            private static final int _SHUTDOWN = 1;
544    
545            private static final int _STOP = 2;
546    
547            private static final int _TERMINATED = 3;
548    
549            private volatile boolean _allowCoreThreadTimeout;
550            private long _completedTaskCount;
551            private volatile int _corePoolSize;
552            private volatile long _keepAliveTime;
553            private volatile int _largestPoolSize;
554            private final ReentrantLock _mainLock = new ReentrantLock();
555            private volatile int _maxPoolSize;
556            private volatile int _poolSize;
557            private volatile RejectedExecutionHandler _rejectedExecutionHandler;
558            private volatile int _runState;
559            private final TaskQueue<Runnable> _taskQueue;
560            private final Condition _terminationCondition = _mainLock.newCondition();
561            private volatile ThreadFactory _threadFactory;
562            private volatile ThreadPoolHandler _threadPoolHandler;
563            private final Set<WorkerTask> _workerTasks;
564    
565            private class WorkerTask
566                    extends AbstractQueuedSynchronizer implements Runnable {
567    
568                    public WorkerTask(Runnable runnable) {
569                            _runnable = runnable;
570                    }
571    
572                    public void run() {
573                            boolean[] cleanUpMarker = new boolean[1];
574    
575                            try {
576                                    Runnable runnable = _runnable;
577    
578                                    do {
579                                            if (runnable != null) {
580                                                    _runTask(runnable);
581    
582                                                    runnable = null;
583                                            }
584                                    }
585                                    while ((runnable = _getTask(this, cleanUpMarker)) != null);
586                            }
587                            finally {
588                                    if (!cleanUpMarker[0]) {
589                                            _mainLock.lock();
590    
591                                            try {
592                                                    _completedTaskCount += _localCompletedTaskCount;
593    
594                                                    _workerTasks.remove(this);
595    
596                                                    if (--_poolSize == 0) {
597                                                            _tryTerminate();
598                                                    }
599                                            }
600                                            finally {
601                                                    _mainLock.unlock();
602                                            }
603                                    }
604    
605                                    _threadPoolHandler.beforeThreadEnd(_thread);
606                            }
607                    }
608    
609                    @Override
610                    protected boolean isHeldExclusively() {
611                            if (getState() == 1) {
612                                    return true;
613                            }
614                            else {
615                                    return false;
616                            }
617                    }
618    
619                    @Override
620                    protected boolean tryAcquire(int unused) {
621                            return compareAndSetState(0, 1);
622                    }
623    
624                    @Override
625                    protected boolean tryRelease(int unused) {
626                            setState(0);
627    
628                            return true;
629                    }
630    
631                    private boolean _interruptIfWaiting() {
632                            if (!_thread.isInterrupted() && tryAcquire(1)) {
633                                    try {
634                                            _thread.interrupt();
635    
636                                            return true;
637                                    }
638                                    finally {
639                                            _unlock();
640                                    }
641                            }
642    
643                            return false;
644                    }
645    
646                    private boolean _isLocked() {
647                            return isHeldExclusively();
648                    }
649    
650                    private void _lock() {
651                            acquire(1);
652                    }
653    
654                    private void _runTask(Runnable task) {
655                            _lock();
656    
657                            try {
658                                    if ((_runState < _STOP) && Thread.interrupted() &&
659                                            (_runState >= _STOP)) {
660    
661                                            _thread.interrupt();
662                                    }
663    
664                                    Throwable throwable = null;
665    
666                                    _threadPoolHandler.beforeExecute(_thread, task);
667    
668                                    try {
669                                            task.run();
670    
671                                            _localCompletedTaskCount++;
672                                    }
673                                    catch (RuntimeException re) {
674                                            throwable = re;
675    
676                                            throw re;
677                                    }
678                                    finally {
679                                            _threadPoolHandler.afterExecute(task, throwable);
680                                    }
681                            }
682                            finally {
683                                    _unlock();
684                            }
685                    }
686    
687                    private void _startWork() {
688                            _thread = _threadFactory.newThread(this);
689    
690                            _threadPoolHandler.beforeThreadStart(_thread);
691    
692                            _thread.start();
693                    }
694    
695                    private void _unlock() {
696                            release(1);
697                    }
698    
699                    private volatile long _localCompletedTaskCount;
700                    private final Runnable _runnable;
701                    private Thread _thread;
702    
703            }
704    
705    }