001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.cache.Lifecycle;
018 import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
019 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.util.CentralizedThreadLocal;
023 import com.liferay.portal.kernel.util.Validator;
024 import com.liferay.portal.security.auth.CompanyThreadLocal;
025 import com.liferay.portal.security.auth.PrincipalThreadLocal;
026
027 import java.util.Set;
028
029
037 public class SerialDestination extends BaseAsyncDestination {
038
039 public SerialDestination() {
040 super();
041
042 setWorkersCoreSize(_WORKERS_CORE_SIZE);
043 setWorkersMaxSize(_WORKERS_MAX_SIZE);
044 }
045
046
049 public SerialDestination(String name) {
050 super(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
051 }
052
053 @Override
054 protected void dispatch(
055 final Set<MessageListener> messageListeners, final Message message) {
056
057 if (!message.contains("companyId")) {
058 message.put("companyId", CompanyThreadLocal.getCompanyId());
059 }
060
061 if (!message.contains("principalName")) {
062 message.put("principalName", PrincipalThreadLocal.getName());
063 }
064
065 if (!message.contains("principalPassword")) {
066 message.put(
067 "principalPassword", PrincipalThreadLocal.getPassword());
068 }
069
070 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
071
072 Runnable runnable = new MessageRunnable(message) {
073
074 public void run() {
075 long companyId = CompanyThreadLocal.getCompanyId();
076 String principalName = PrincipalThreadLocal.getName();
077 String principalPassword = PrincipalThreadLocal.getPassword();
078
079 try {
080 long messageCompanyId = message.getLong("companyId");
081
082 if (messageCompanyId > 0) {
083 CompanyThreadLocal.setCompanyId(messageCompanyId);
084 }
085
086 String messagePrincipalName = message.getString(
087 "principalName");
088
089 if (Validator.isNotNull(messagePrincipalName)) {
090 PrincipalThreadLocal.setName(messagePrincipalName);
091 }
092
093 String messagePrincipalPassword = message.getString(
094 "principalPassword");
095
096 if (Validator.isNotNull(messagePrincipalPassword)) {
097 PrincipalThreadLocal.setPassword(
098 messagePrincipalPassword);
099 }
100
101 for (MessageListener messageListener : messageListeners) {
102 try {
103 messageListener.receive(message);
104 }
105 catch (MessageListenerException mle) {
106 _log.error(
107 "Unable to process message " + message, mle);
108 }
109 }
110 }
111 finally {
112 CompanyThreadLocal.setCompanyId(companyId);
113 PrincipalThreadLocal.setName(principalName);
114 PrincipalThreadLocal.setPassword(principalPassword);
115 ThreadLocalCacheManager.clearAll(Lifecycle.REQUEST);
116
117 CentralizedThreadLocal.clearShortLivedThreadLocals();
118 }
119 }
120
121 };
122
123 threadPoolExecutor.execute(runnable);
124 }
125
126 private static final int _WORKERS_CORE_SIZE = 1;
127
128 private static final int _WORKERS_MAX_SIZE = 1;
129
130 private static Log _log = LogFactoryUtil.getLog(SerialDestination.class);
131
132 }