001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
018 import com.liferay.portal.kernel.bean.PortletBeanLocatorUtil;
019 import com.liferay.portal.kernel.cluster.Address;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterMessageType;
022 import com.liferay.portal.kernel.cluster.ClusterNode;
023 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024 import com.liferay.portal.kernel.cluster.ClusterRequest;
025 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
027 import com.liferay.portal.kernel.log.Log;
028 import com.liferay.portal.kernel.log.LogFactoryUtil;
029 import com.liferay.portal.kernel.util.MethodHandler;
030 import com.liferay.portal.kernel.util.NamedThreadFactory;
031 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
032 import com.liferay.portal.kernel.util.Validator;
033 import com.liferay.portal.util.PropsValues;
034
035 import java.io.Serializable;
036
037 import java.util.concurrent.ExecutorService;
038 import java.util.concurrent.Executors;
039
040 import org.jgroups.Channel;
041 import org.jgroups.ChannelException;
042 import org.jgroups.Message;
043
044
048 public class ClusterRequestReceiver extends BaseReceiver {
049
050 public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
051 _clusterExecutorImpl = clusterExecutorImpl;
052 }
053
054 public void destroy() {
055 _parallelExecutorService.shutdownNow();
056 _serialExecutorService.shutdownNow();
057 }
058
059 public void initialize() {
060 _parallelExecutorService = PortalExecutorManagerUtil.getPortalExecutor(
061 ClusterRequestReceiver.class.getName() + "_parallel");
062 _serialExecutorService = Executors.newSingleThreadExecutor(
063 new NamedThreadFactory(
064 ClusterRequestReceiver.class.getName() + "_serial",
065 Thread.NORM_PRIORITY, PortalClassLoaderUtil.getClassLoader()));
066 }
067
068 @Override
069 public void receive(Message message) {
070 org.jgroups.Address sourceAddress = message.getSrc();
071
072 Channel controlChannel = _clusterExecutorImpl.getControlChannel();
073
074 org.jgroups.Address localAddress = controlChannel.getAddress();
075
076 Object obj = message.getObject();
077
078 if (obj == null) {
079 if (_log.isWarnEnabled()) {
080 _log.warn("Message content is null");
081 }
082
083 return;
084 }
085
086 if (localAddress.equals(sourceAddress)) {
087 boolean isProcessed = processLocalMessage(obj, sourceAddress);
088
089 if (isProcessed) {
090 return;
091 }
092 }
093
094 if (obj instanceof ClusterRequest) {
095 ClusterRequest clusterRequest = (ClusterRequest)obj;
096
097 RequestTask requestTask = new RequestTask(
098 clusterRequest, sourceAddress, localAddress);
099
100 if (clusterRequest.isParallelized()) {
101 _parallelExecutorService.execute(requestTask);
102 }
103 else {
104 _serialExecutorService.execute(requestTask);
105 }
106 }
107 else if (obj instanceof ClusterNodeResponse) {
108 ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
109
110 processClusterResponse(
111 clusterNodeResponse, sourceAddress, localAddress);
112 }
113 else if (_log.isWarnEnabled()) {
114 _log.warn(
115 "Unable to process message content of type " + obj.getClass());
116 }
117 }
118
119 protected Object invoke(
120 String servletContextName, String beanIdentifier,
121 MethodHandler methodHandler)
122 throws Exception {
123
124 if (servletContextName == null) {
125 if (Validator.isNull(beanIdentifier)) {
126 return methodHandler.invoke(true);
127 }
128 else {
129 Object bean = PortalBeanLocatorUtil.locate(beanIdentifier);
130
131 return methodHandler.invoke(bean);
132 }
133 }
134
135 Thread currentThread = Thread.currentThread();
136
137 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
138
139 try {
140 ClassLoader classLoader =
141 (ClassLoader)PortletBeanLocatorUtil.locate(
142 servletContextName, "portletClassLoader");
143
144 currentThread.setContextClassLoader(classLoader);
145
146 if (Validator.isNull(beanIdentifier)) {
147 return methodHandler.invoke(true);
148 }
149 else {
150 Object bean = PortletBeanLocatorUtil.locate(
151 servletContextName, beanIdentifier);
152
153 return methodHandler.invoke(bean);
154 }
155 }
156 catch (Exception e) {
157 throw e;
158 }
159 finally {
160 currentThread.setContextClassLoader(contextClassLoader);
161 }
162 }
163
164 protected void processClusterRequest(
165 ClusterRequest clusterRequest, org.jgroups.Address sourceAddress,
166 org.jgroups.Address localAddress) {
167
168 ClusterMessageType clusterMessageType =
169 clusterRequest.getClusterMessageType();
170
171 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
172 ClusterNode originatingClusterNode =
173 clusterRequest.getOriginatingClusterNode();
174
175 if (originatingClusterNode != null) {
176 long expirationTime =
177 System.currentTimeMillis() +
178 (PropsValues.CLUSTER_EXECUTOR_HEARTBEAT_INTERVAL * 2);
179
180 _clusterExecutorImpl.notify(
181 new AddressImpl(sourceAddress), originatingClusterNode,
182 expirationTime);
183 }
184 else {
185 if (_log.isWarnEnabled()) {
186 _log.warn(
187 "Content of notify message does not contain cluster " +
188 "node information");
189 }
190 }
191
192 return;
193 }
194
195 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
196
197 Address address = new AddressImpl(localAddress);
198
199 clusterNodeResponse.setAddress(address);
200
201 clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
202
203 try {
204 ClusterNode localClusterNode =
205 _clusterExecutorImpl.getLocalClusterNode();
206
207 clusterNodeResponse.setClusterNode(localClusterNode);
208 }
209 catch (Exception e) {
210 clusterNodeResponse.setException(e);
211 }
212
213 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
214 clusterNodeResponse.setUuid(clusterRequest.getUuid());
215
216 MethodHandler methodHandler = clusterRequest.getMethodHandler();
217
218 if (methodHandler != null) {
219 try {
220 ClusterInvokeThreadLocal.setEnabled(false);
221
222 Object returnValue = invoke(
223 clusterRequest.getServletContextName(),
224 clusterRequest.getBeanIdentifier(), methodHandler);
225
226 if (returnValue instanceof Serializable) {
227 clusterNodeResponse.setResult(returnValue);
228 }
229 else if (returnValue != null) {
230 clusterNodeResponse.setException(
231 new ClusterException(
232 "Return value is not serializable"));
233 }
234 }
235 catch (Exception e) {
236 clusterNodeResponse.setException(e);
237
238 _log.error("Failed to invoke method " + methodHandler, e);
239 }
240 finally {
241 ClusterInvokeThreadLocal.setEnabled(true);
242 }
243 }
244 else {
245 clusterNodeResponse.setException(
246 new ClusterException(
247 "Payload is not of type " + MethodHandler.class.getName()));
248 }
249
250 Channel controlChannel = _clusterExecutorImpl.getControlChannel();
251
252 try {
253 controlChannel.send(
254 sourceAddress, localAddress, clusterNodeResponse);
255 }
256 catch (ChannelException ce) {
257 _log.error(
258 "Unable to send response message " + clusterNodeResponse, ce);
259 }
260 catch (Throwable t) {
261 _log.error(t, t);
262 }
263 }
264
265 protected void processClusterResponse(
266 ClusterNodeResponse clusterNodeResponse,
267 org.jgroups.Address sourceAddress, org.jgroups.Address localAddress) {
268
269 String uuid = clusterNodeResponse.getUuid();
270
271 FutureClusterResponses futureClusterResponses =
272 _clusterExecutorImpl.getExecutionResults(uuid);
273
274 if (futureClusterResponses == null) {
275 if (_log.isInfoEnabled()) {
276 _log.info("Unable to find response container for " + uuid);
277 }
278
279 return;
280 }
281
282 Address address = new AddressImpl(sourceAddress);
283
284 if (futureClusterResponses.expectsReply(address)) {
285 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
286 }
287 else {
288 if (_log.isWarnEnabled()) {
289 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
290 }
291 }
292 }
293
294 protected boolean processLocalMessage(
295 Object message, org.jgroups.Address sourceAddress) {
296
297 if (message instanceof ClusterRequest) {
298 ClusterRequest clusterRequest = (ClusterRequest)message;
299
300 if (clusterRequest.isSkipLocal()) {
301 return true;
302 }
303 }
304
305 if (_clusterExecutorImpl.isShortcutLocalMethod()) {
306 return true;
307 }
308
309 return false;
310 }
311
312 private static Log _log = LogFactoryUtil.getLog(
313 ClusterRequestReceiver.class);
314
315 private ClusterExecutorImpl _clusterExecutorImpl;
316 private ExecutorService _parallelExecutorService;
317 private ExecutorService _serialExecutorService;
318
319 private class RequestTask implements Runnable {
320
321 public RequestTask(
322 ClusterRequest clusterRequest, org.jgroups.Address sourceAddress,
323 org.jgroups.Address localAddress) {
324
325 _clusterRequest = clusterRequest;
326 _sourceAddress = sourceAddress;
327 _localAddress = localAddress;
328 }
329
330 public void run() {
331 processClusterRequest(
332 _clusterRequest, _sourceAddress, _localAddress);
333 }
334
335 private ClusterRequest _clusterRequest;
336 private org.jgroups.Address _localAddress;
337 private org.jgroups.Address _sourceAddress;
338
339 }
340
341 }