001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.ArrayList;
018 import java.util.HashSet;
019 import java.util.List;
020 import java.util.Set;
021 import java.util.concurrent.AbstractExecutorService;
022 import java.util.concurrent.Executors;
023 import java.util.concurrent.ThreadFactory;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
026 import java.util.concurrent.locks.Condition;
027 import java.util.concurrent.locks.ReentrantLock;
028
029
036 public class ThreadPoolExecutor extends AbstractExecutorService {
037
038 public ThreadPoolExecutor(int corePoolSize, int maxPoolSize) {
039 this(
040 corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, false,
041 Integer.MAX_VALUE, new AbortPolicy(),
042 Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
043 }
044
045 public ThreadPoolExecutor(
046 int corePoolSize, int maxPoolSize, long keepAliveTime,
047 TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize) {
048
049 this(
050 corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
051 allowCoreThreadTimeout, maxQueueSize, new AbortPolicy(),
052 Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
053 }
054
055 public ThreadPoolExecutor(
056 int corePoolSize, int maxPoolSize, long keepAliveTime,
057 TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize,
058 RejectedExecutionHandler rejectedExecutionHandler,
059 ThreadFactory threadFactory, ThreadPoolHandler threadPoolHandler) {
060
061 if ((corePoolSize < 0) || (maxPoolSize <= 0) ||
062 (maxPoolSize < corePoolSize) || (keepAliveTime < 0) ||
063 (maxQueueSize <= 0)) {
064
065 throw new IllegalArgumentException();
066 }
067
068 if ((rejectedExecutionHandler == null) || (threadFactory == null) ||
069 (threadPoolHandler == null)) {
070
071 throw new NullPointerException();
072 }
073
074 _corePoolSize = corePoolSize;
075 _maxPoolSize = maxPoolSize;
076 _keepAliveTime = timeUnit.toNanos(keepAliveTime);
077 _allowCoreThreadTimeout = allowCoreThreadTimeout;
078 _rejectedExecutionHandler = rejectedExecutionHandler;
079 _threadFactory = threadFactory;
080 _threadPoolHandler = threadPoolHandler;
081 _taskQueue = new TaskQueue<Runnable>(maxQueueSize);
082 _workerTasks = new HashSet<WorkerTask>();
083 }
084
085 public void adjustPoolSize(int newCorePoolSize, int newMaxPoolSize) {
086 if ((newCorePoolSize < 0) || (newMaxPoolSize <= 0) ||
087 (newMaxPoolSize < newCorePoolSize)) {
088
089 throw new IllegalArgumentException();
090 }
091
092 _mainLock.lock();
093
094 try {
095 int surplusCoreThreads = _corePoolSize - newCorePoolSize;
096 int surplusMaxPoolSize = _maxPoolSize - newMaxPoolSize;
097
098 _corePoolSize = newCorePoolSize;
099 _maxPoolSize = newMaxPoolSize;
100
101 if (((surplusCoreThreads > 0) && (_poolSize > _corePoolSize)) ||
102 ((surplusMaxPoolSize > 0) && (_poolSize > _maxPoolSize))) {
103
104 int interruptCount = Math.max(
105 surplusCoreThreads, surplusMaxPoolSize);
106
107 for (WorkerTask workerTask : _workerTasks) {
108 if (interruptCount > 0) {
109 if (workerTask._interruptIfWaiting()) {
110 interruptCount--;
111 }
112 }
113 else {
114 break;
115 }
116 }
117 }
118 else {
119 Runnable runnable = null;
120
121 while ((surplusCoreThreads++ < 0) &&
122 (_poolSize < _corePoolSize) &&
123 ((runnable = _taskQueue.poll()) != null)) {
124
125 _doAddWorkerThread(runnable);
126 }
127 }
128 }
129 finally {
130 _mainLock.unlock();
131 }
132 }
133
134 public boolean awaitTermination(long timeout, TimeUnit timeUnit)
135 throws InterruptedException {
136
137 long nanos = timeUnit.toNanos(timeout);
138
139 _mainLock.lock();
140
141 try {
142 while (true) {
143 if (_runState == _TERMINATED) {
144 return true;
145 }
146
147 if (nanos <= 0) {
148 return false;
149 }
150
151 nanos = _terminationCondition.awaitNanos(nanos);
152 }
153 }
154 finally {
155 _mainLock.unlock();
156 }
157 }
158
159 public void execute(Runnable runnable) {
160 if (runnable == null) {
161 throw new NullPointerException();
162 }
163
164 boolean[] hasWaiterMarker = new boolean[1];
165
166 if (_runState == _RUNNING &&
167 _taskQueue.offer(runnable, hasWaiterMarker)) {
168
169 if (_runState != _RUNNING) {
170 if (_taskQueue.remove(runnable)) {
171 _rejectedExecutionHandler.rejectedExecution(runnable, this);
172 }
173
174 return;
175 }
176
177 if (!hasWaiterMarker[0]) {
178 _addWorkerThread();
179 }
180
181 return;
182 }
183
184 _rejectedExecutionHandler.rejectedExecution(runnable, this);
185 }
186
187 public int getActiveCount() {
188 _mainLock.lock();
189
190 try {
191 int count = 0;
192
193 for (WorkerTask workerTask : _workerTasks) {
194 if (workerTask._isLocked()) {
195 count++;
196 }
197 }
198
199 return count;
200 }
201 finally {
202 _mainLock.unlock();
203 }
204 }
205
206 public long getCompletedTaskCount() {
207 _mainLock.lock();
208
209 try {
210 long count = _completedTaskCount;
211
212 for (WorkerTask workerTask : _workerTasks) {
213 count += workerTask._localCompletedTaskCount;
214 }
215
216 return count;
217 }
218 finally {
219 _mainLock.unlock();
220 }
221 }
222
223 public int getCorePoolSize() {
224 return _corePoolSize;
225 }
226
227 public long getKeepAliveTime(TimeUnit timeUnit) {
228 return timeUnit.convert(_keepAliveTime, TimeUnit.NANOSECONDS);
229 }
230
231 public int getLargestPoolSize() {
232 return _largestPoolSize;
233 }
234
235 public int getMaxPoolSize() {
236 return _maxPoolSize;
237 }
238
239 public int getPendingTaskCount() {
240 return _taskQueue.size();
241 }
242
243 public int getPoolSize() {
244 return _poolSize;
245 }
246
247 public RejectedExecutionHandler getRejectedExecutionHandler() {
248 return _rejectedExecutionHandler;
249 }
250
251 public int getRemainingTaskQueueCapacity() {
252 return _taskQueue.remainingCapacity();
253 }
254
255 public long getTaskCount() {
256 _mainLock.lock();
257
258 try {
259 long count = _completedTaskCount;
260
261 for (WorkerTask workerTask : _workerTasks) {
262 count += workerTask._localCompletedTaskCount;
263
264 if (workerTask._isLocked()) {
265 count++;
266 }
267 }
268
269 return count + _taskQueue.size();
270 }
271 finally {
272 _mainLock.unlock();
273 }
274 }
275
276 public ThreadFactory getThreadFactory() {
277 return _threadFactory;
278 }
279
280 public ThreadPoolHandler getThreadPoolHandler() {
281 return _threadPoolHandler;
282 }
283
284 public boolean isAllowCoreThreadTimeout() {
285 return _allowCoreThreadTimeout;
286 }
287
288 public boolean isShutdown() {
289 if (_runState != _RUNNING) {
290 return true;
291 }
292 else {
293 return false;
294 }
295 }
296
297 public boolean isTerminated() {
298 if (_runState == _TERMINATED) {
299 return true;
300 }
301 else {
302 return false;
303 }
304 }
305
306 public boolean isTerminating() {
307 if (_runState == _STOP) {
308 return true;
309 }
310 else {
311 return false;
312 }
313 }
314
315 public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
316 _allowCoreThreadTimeout = allowCoreThreadTimeout;
317 }
318
319 public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
320 if (keepAliveTime < 0) {
321 throw new IllegalArgumentException();
322 }
323
324 _keepAliveTime = timeUnit.toNanos(keepAliveTime);
325 }
326
327 public void setRejectedExecutionHandler(
328 RejectedExecutionHandler rejectedExecutionHandler) {
329
330 if (rejectedExecutionHandler == null) {
331 throw new NullPointerException();
332 }
333
334 _rejectedExecutionHandler = rejectedExecutionHandler;
335 }
336
337 public void setThreadFactory(ThreadFactory threadFactory) {
338 if (threadFactory == null) {
339 throw new NullPointerException();
340 }
341
342 _threadFactory = threadFactory;
343 }
344
345 public void setThreadPoolHandler(ThreadPoolHandler threadPoolHandler) {
346 if (threadPoolHandler == null) {
347 throw new NullPointerException();
348 }
349
350 _threadPoolHandler = threadPoolHandler;
351 }
352
353 public void shutdown() {
354 _mainLock.lock();
355
356 try {
357 int state = _runState;
358
359 if (state < _SHUTDOWN) {
360 _runState = _SHUTDOWN;
361 }
362
363 for (WorkerTask workerTask : _workerTasks) {
364 workerTask._interruptIfWaiting();
365 }
366
367 _tryTerminate();
368 }
369 finally {
370 _mainLock.unlock();
371 }
372 }
373
374 public List<Runnable> shutdownNow() {
375 _mainLock.lock();
376
377 try {
378 int state = _runState;
379
380 if (state < _STOP) {
381 _runState = _STOP;
382 }
383
384 for (WorkerTask workerTask : _workerTasks) {
385 workerTask._thread.interrupt();
386 }
387
388 List<Runnable> runnables = new ArrayList<Runnable>();
389
390 _taskQueue.drainTo(runnables);
391
392 _tryTerminate();
393
394 return runnables;
395 }
396 finally {
397 _mainLock.unlock();
398 }
399 }
400
401 @Override
402 protected void finalize() {
403 shutdown();
404 }
405
406 protected ReentrantLock getMainLock() {
407 return _mainLock;
408 }
409
410 protected TaskQueue<Runnable> getTaskQueue() {
411 return _taskQueue;
412 }
413
414 protected Set<WorkerTask> getWorkerTasks() {
415 return _workerTasks;
416 }
417
418 private void _addWorkerThread() {
419 int runState = _runState;
420 int poolSize = _poolSize;
421
422 if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
423 ((runState == _SHUTDOWN) && (poolSize == 0) &&
424 !_taskQueue.isEmpty())) {
425
426 _mainLock.lock();
427
428 try {
429 runState = _runState;
430 poolSize = _poolSize;
431
432 if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
433 ((runState == _SHUTDOWN) && (poolSize == 0) &&
434 !_taskQueue.isEmpty())) {
435
436 _doAddWorkerThread(_taskQueue.poll());
437 }
438 }
439 finally {
440 _mainLock.unlock();
441 }
442 }
443 }
444
445 private void _doAddWorkerThread(Runnable runnable) {
446 WorkerTask workerTask = new WorkerTask(runnable);
447
448 _workerTasks.add(workerTask);
449
450 int poolSize = ++_poolSize;
451
452 if (poolSize > _largestPoolSize) {
453 _largestPoolSize = poolSize;
454 }
455
456 workerTask._startWork();
457 }
458
459 private Runnable _getTask(WorkerTask workerTask, boolean[] cleanUpMarker) {
460 while (true) {
461 try {
462 int state = _runState;
463
464 if (state >= _STOP) {
465 return null;
466 }
467
468 Runnable runnable = null;
469
470 if (state == _SHUTDOWN) {
471 runnable = _taskQueue.poll();
472 }
473 else if ((_poolSize > _corePoolSize) ||
474 _allowCoreThreadTimeout) {
475
476 runnable = _taskQueue.poll(
477 _keepAliveTime, TimeUnit.NANOSECONDS);
478 }
479 else {
480 runnable = _taskQueue.take();
481 }
482
483 if (runnable != null) {
484 return runnable;
485 }
486
487 _mainLock.lock();
488
489 try {
490 if ((_runState >= _STOP) ||
491 ((_runState >= _SHUTDOWN) && _taskQueue.isEmpty()) ||
492 (_allowCoreThreadTimeout &&
493 ((_poolSize > 1) || _taskQueue.isEmpty())) ||
494 (!_allowCoreThreadTimeout &&
495 (_poolSize > _corePoolSize))) {
496
497 _completedTaskCount +=
498 workerTask._localCompletedTaskCount;
499
500 _workerTasks.remove(workerTask);
501
502 if (--_poolSize == 0) {
503 _tryTerminate();
504 }
505
506 cleanUpMarker[0] = true;
507
508 return null;
509 }
510 }
511 finally {
512 _mainLock.unlock();
513 }
514 }
515 catch (InterruptedException ie) {
516 }
517 }
518 }
519
520 private void _tryTerminate() {
521 if (_poolSize == 0) {
522 int state = _runState;
523
524 if ((state == _STOP) ||
525 ((state == _SHUTDOWN) && _taskQueue.isEmpty())) {
526
527 _runState = _TERMINATED;
528
529 _terminationCondition.signalAll();
530 _threadPoolHandler.terminated();
531
532 return;
533 }
534
535 if (!_taskQueue.isEmpty()) {
536 _doAddWorkerThread(_taskQueue.poll());
537 }
538 }
539 }
540
541 private static final int _RUNNING = 0;
542
543 private static final int _SHUTDOWN = 1;
544
545 private static final int _STOP = 2;
546
547 private static final int _TERMINATED = 3;
548
549 private volatile boolean _allowCoreThreadTimeout;
550 private long _completedTaskCount;
551 private volatile int _corePoolSize;
552 private volatile long _keepAliveTime;
553 private volatile int _largestPoolSize;
554 private final ReentrantLock _mainLock = new ReentrantLock();
555 private volatile int _maxPoolSize;
556 private volatile int _poolSize;
557 private volatile RejectedExecutionHandler _rejectedExecutionHandler;
558 private volatile int _runState;
559 private final TaskQueue<Runnable> _taskQueue;
560 private final Condition _terminationCondition = _mainLock.newCondition();
561 private volatile ThreadFactory _threadFactory;
562 private volatile ThreadPoolHandler _threadPoolHandler;
563 private final Set<WorkerTask> _workerTasks;
564
565 private class WorkerTask
566 extends AbstractQueuedSynchronizer implements Runnable {
567
568 public WorkerTask(Runnable runnable) {
569 _runnable = runnable;
570 }
571
572 public void run() {
573 boolean[] cleanUpMarker = new boolean[1];
574
575 try {
576 Runnable runnable = _runnable;
577
578 do {
579 if (runnable != null) {
580 _runTask(runnable);
581
582 runnable = null;
583 }
584 }
585 while ((runnable = _getTask(this, cleanUpMarker)) != null);
586 }
587 finally {
588 if (!cleanUpMarker[0]) {
589 _mainLock.lock();
590
591 try {
592 _completedTaskCount += _localCompletedTaskCount;
593
594 _workerTasks.remove(this);
595
596 if (--_poolSize == 0) {
597 _tryTerminate();
598 }
599 }
600 finally {
601 _mainLock.unlock();
602 }
603 }
604
605 _threadPoolHandler.beforeThreadEnd(_thread);
606 }
607 }
608
609 @Override
610 protected boolean isHeldExclusively() {
611 if (getState() == 1) {
612 return true;
613 }
614 else {
615 return false;
616 }
617 }
618
619 @Override
620 protected boolean tryAcquire(int unused) {
621 return compareAndSetState(0, 1);
622 }
623
624 @Override
625 protected boolean tryRelease(int unused) {
626 setState(0);
627
628 return true;
629 }
630
631 private boolean _interruptIfWaiting() {
632 if (!_thread.isInterrupted() && tryAcquire(1)) {
633 try {
634 _thread.interrupt();
635
636 return true;
637 }
638 finally {
639 _unlock();
640 }
641 }
642
643 return false;
644 }
645
646 private boolean _isLocked() {
647 return isHeldExclusively();
648 }
649
650 private void _lock() {
651 acquire(1);
652 }
653
654 private void _runTask(Runnable task) {
655 _lock();
656
657 try {
658 if ((_runState < _STOP) && Thread.interrupted() &&
659 (_runState >= _STOP)) {
660
661 _thread.interrupt();
662 }
663
664 Throwable throwable = null;
665
666 _threadPoolHandler.beforeExecute(_thread, task);
667
668 try {
669 task.run();
670
671 _localCompletedTaskCount++;
672 }
673 catch (RuntimeException re) {
674 throwable = re;
675
676 throw re;
677 }
678 finally {
679 _threadPoolHandler.afterExecute(task, throwable);
680 }
681 }
682 finally {
683 _unlock();
684 }
685 }
686
687 private void _startWork() {
688 _thread = _threadFactory.newThread(this);
689
690 _threadPoolHandler.beforeThreadStart(_thread);
691
692 _thread.start();
693 }
694
695 private void _unlock() {
696 release(1);
697 }
698
699 private volatile long _localCompletedTaskCount;
700 private final Runnable _runnable;
701 private Thread _thread;
702
703 }
704
705 }