001
014
015 package com.liferay.portal.increment;
016
017 import com.liferay.portal.kernel.concurrent.BatchablePipe;
018 import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.messaging.Message;
023 import com.liferay.portal.kernel.messaging.MessageRunnable;
024
025
028 public class BufferedIncrementDiscardPolicy
029 implements RejectedExecutionHandler {
030
031 @SuppressWarnings("rawtypes")
032 public void rejectedExecution(
033 Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
034
035 MessageRunnable messageRunnable = (MessageRunnable)runnable;
036
037 Message message = messageRunnable.getMessage();
038
039 BatchablePipe<String, BufferedIncreasableEntry> batchablePipe =
040 (BatchablePipe<String, BufferedIncreasableEntry>)
041 message.getPayload();
042
043 for (int i = 0; i < _discardNumber; i++) {
044 BufferedIncreasableEntry bufferedIncreasableEntry =
045 (BufferedIncreasableEntry)batchablePipe.take();
046
047 if (bufferedIncreasableEntry == null) {
048 break;
049 }
050 else if (_log.isInfoEnabled()) {
051 _log.info(
052 "Discarding BufferedIncreasableEntry " +
053 bufferedIncreasableEntry);
054 }
055 }
056 }
057
058 public void setDiscardNumber(int discardNumber) {
059 _discardNumber = discardNumber;
060 }
061
062 private static Log _log = LogFactoryUtil.getLog(
063 BufferedIncrementDiscardPolicy.class);
064
065 private int _discardNumber = 1;
066
067 }