001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
018    import com.liferay.portal.kernel.bean.PortletBeanLocatorUtil;
019    import com.liferay.portal.kernel.cluster.Address;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterMessageType;
022    import com.liferay.portal.kernel.cluster.ClusterNode;
023    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024    import com.liferay.portal.kernel.cluster.ClusterRequest;
025    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
027    import com.liferay.portal.kernel.log.Log;
028    import com.liferay.portal.kernel.log.LogFactoryUtil;
029    import com.liferay.portal.kernel.util.MethodHandler;
030    import com.liferay.portal.kernel.util.NamedThreadFactory;
031    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
032    import com.liferay.portal.kernel.util.Validator;
033    import com.liferay.portal.util.PropsValues;
034    
035    import java.io.Serializable;
036    
037    import java.util.concurrent.ExecutorService;
038    import java.util.concurrent.Executors;
039    
040    import org.jgroups.Channel;
041    import org.jgroups.ChannelException;
042    import org.jgroups.Message;
043    
044    /**
045     * @author Michael C. Han
046     * @author Tina Tian
047     */
048    public class ClusterRequestReceiver extends BaseReceiver {
049    
050            public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
051                    _clusterExecutorImpl = clusterExecutorImpl;
052            }
053    
054            public void destroy() {
055                    _parallelExecutorService.shutdownNow();
056                    _serialExecutorService.shutdownNow();
057            }
058    
059            public void initialize() {
060                    _parallelExecutorService = PortalExecutorManagerUtil.getPortalExecutor(
061                            ClusterRequestReceiver.class.getName() + "_parallel");
062                    _serialExecutorService = Executors.newSingleThreadExecutor(
063                            new NamedThreadFactory(
064                                    ClusterRequestReceiver.class.getName() + "_serial",
065                                    Thread.NORM_PRIORITY, PortalClassLoaderUtil.getClassLoader()));
066            }
067    
068            @Override
069            public void receive(Message message) {
070                    org.jgroups.Address sourceAddress = message.getSrc();
071    
072                    Channel controlChannel = _clusterExecutorImpl.getControlChannel();
073    
074                    org.jgroups.Address localAddress = controlChannel.getAddress();
075    
076                    Object obj = message.getObject();
077    
078                    if (obj == null) {
079                            if (_log.isWarnEnabled()) {
080                                    _log.warn("Message content is null");
081                            }
082    
083                            return;
084                    }
085    
086                    if (localAddress.equals(sourceAddress)) {
087                            boolean isProcessed = processLocalMessage(obj, sourceAddress);
088    
089                            if (isProcessed) {
090                                    return;
091                            }
092                    }
093    
094                    if (obj instanceof ClusterRequest) {
095                            ClusterRequest clusterRequest = (ClusterRequest)obj;
096    
097                            RequestTask requestTask = new RequestTask(
098                                    clusterRequest, sourceAddress, localAddress);
099    
100                            if (clusterRequest.isParallelized()) {
101                                    _parallelExecutorService.execute(requestTask);
102                            }
103                            else {
104                                    _serialExecutorService.execute(requestTask);
105                            }
106                    }
107                    else if (obj instanceof ClusterNodeResponse) {
108                            ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
109    
110                            processClusterResponse(
111                                    clusterNodeResponse, sourceAddress, localAddress);
112                    }
113                    else if (_log.isWarnEnabled()) {
114                            _log.warn(
115                                    "Unable to process message content of type " + obj.getClass());
116                    }
117            }
118    
119            protected Object invoke(
120                            String servletContextName, String beanIdentifier,
121                            MethodHandler methodHandler)
122                    throws Exception {
123    
124                    if (servletContextName == null) {
125                            if (Validator.isNull(beanIdentifier)) {
126                                    return methodHandler.invoke(true);
127                            }
128                            else {
129                                    Object bean = PortalBeanLocatorUtil.locate(beanIdentifier);
130    
131                                    return methodHandler.invoke(bean);
132                            }
133                    }
134    
135                    Thread currentThread = Thread.currentThread();
136    
137                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
138    
139                    try {
140                            ClassLoader classLoader =
141                                    (ClassLoader)PortletBeanLocatorUtil.locate(
142                                            servletContextName, "portletClassLoader");
143    
144                            currentThread.setContextClassLoader(classLoader);
145    
146                            if (Validator.isNull(beanIdentifier)) {
147                                    return methodHandler.invoke(true);
148                            }
149                            else {
150                                    Object bean = PortletBeanLocatorUtil.locate(
151                                            servletContextName, beanIdentifier);
152    
153                                    return methodHandler.invoke(bean);
154                            }
155                    }
156                    catch (Exception e) {
157                            throw e;
158                    }
159                    finally {
160                            currentThread.setContextClassLoader(contextClassLoader);
161                    }
162            }
163    
164            protected void processClusterRequest(
165                    ClusterRequest clusterRequest, org.jgroups.Address sourceAddress,
166                    org.jgroups.Address localAddress) {
167    
168                    ClusterMessageType clusterMessageType =
169                            clusterRequest.getClusterMessageType();
170    
171                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
172                            ClusterNode originatingClusterNode =
173                                    clusterRequest.getOriginatingClusterNode();
174    
175                            if (originatingClusterNode != null) {
176                                    long expirationTime =
177                                            System.currentTimeMillis() +
178                                                    (PropsValues.CLUSTER_EXECUTOR_HEARTBEAT_INTERVAL * 2);
179    
180                                    _clusterExecutorImpl.notify(
181                                            new AddressImpl(sourceAddress), originatingClusterNode,
182                                            expirationTime);
183                            }
184                            else {
185                                    if (_log.isWarnEnabled()) {
186                                            _log.warn(
187                                                    "Content of notify message does not contain cluster " +
188                                                            "node information");
189                                    }
190                            }
191    
192                            return;
193                    }
194    
195                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
196    
197                    Address address = new AddressImpl(localAddress);
198    
199                    clusterNodeResponse.setAddress(address);
200    
201                    clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
202    
203                    try {
204                            ClusterNode localClusterNode =
205                                    _clusterExecutorImpl.getLocalClusterNode();
206    
207                            clusterNodeResponse.setClusterNode(localClusterNode);
208                    }
209                    catch (Exception e) {
210                            clusterNodeResponse.setException(e);
211                    }
212    
213                    clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
214                    clusterNodeResponse.setUuid(clusterRequest.getUuid());
215    
216                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
217    
218                    if (methodHandler != null) {
219                            try {
220                                    ClusterInvokeThreadLocal.setEnabled(false);
221    
222                                    Object returnValue = invoke(
223                                            clusterRequest.getServletContextName(),
224                                            clusterRequest.getBeanIdentifier(), methodHandler);
225    
226                                    if (returnValue instanceof Serializable) {
227                                            clusterNodeResponse.setResult(returnValue);
228                                    }
229                                    else if (returnValue != null) {
230                                            clusterNodeResponse.setException(
231                                                    new ClusterException(
232                                                            "Return value is not serializable"));
233                                    }
234                            }
235                            catch (Exception e) {
236                                    clusterNodeResponse.setException(e);
237    
238                                    _log.error("Failed to invoke method " + methodHandler, e);
239                            }
240                            finally {
241                                    ClusterInvokeThreadLocal.setEnabled(true);
242                            }
243                    }
244                    else {
245                            clusterNodeResponse.setException(
246                                    new ClusterException(
247                                            "Payload is not of type " + MethodHandler.class.getName()));
248                    }
249    
250                    Channel controlChannel = _clusterExecutorImpl.getControlChannel();
251    
252                    try {
253                            controlChannel.send(
254                                    sourceAddress, localAddress, clusterNodeResponse);
255                    }
256                    catch (ChannelException ce) {
257                            _log.error(
258                                    "Unable to send response message " + clusterNodeResponse, ce);
259                    }
260                    catch (Throwable t) {
261                            _log.error(t, t);
262                    }
263            }
264    
265            protected void processClusterResponse(
266                    ClusterNodeResponse clusterNodeResponse,
267                    org.jgroups.Address sourceAddress, org.jgroups.Address localAddress) {
268    
269                    String uuid = clusterNodeResponse.getUuid();
270    
271                    FutureClusterResponses futureClusterResponses =
272                            _clusterExecutorImpl.getExecutionResults(uuid);
273    
274                    if (futureClusterResponses == null) {
275                            if (_log.isInfoEnabled()) {
276                                    _log.info("Unable to find response container for " + uuid);
277                            }
278    
279                            return;
280                    }
281    
282                    Address address = new AddressImpl(sourceAddress);
283    
284                    if (futureClusterResponses.expectsReply(address)) {
285                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
286                    }
287                    else {
288                            if (_log.isWarnEnabled()) {
289                                    _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
290                            }
291                    }
292            }
293    
294            protected boolean processLocalMessage(
295                    Object message, org.jgroups.Address sourceAddress) {
296    
297                    if (message instanceof ClusterRequest) {
298                            ClusterRequest clusterRequest = (ClusterRequest)message;
299    
300                            if (clusterRequest.isSkipLocal()) {
301                                    return true;
302                            }
303                    }
304    
305                    if (_clusterExecutorImpl.isShortcutLocalMethod()) {
306                            return true;
307                    }
308    
309                    return false;
310            }
311    
312            private static Log _log = LogFactoryUtil.getLog(
313                    ClusterRequestReceiver.class);
314    
315            private ClusterExecutorImpl _clusterExecutorImpl;
316            private ExecutorService _parallelExecutorService;
317            private ExecutorService _serialExecutorService;
318    
319            private class RequestTask implements Runnable {
320    
321                    public RequestTask(
322                            ClusterRequest clusterRequest, org.jgroups.Address sourceAddress,
323                            org.jgroups.Address localAddress) {
324    
325                            _clusterRequest = clusterRequest;
326                            _sourceAddress = sourceAddress;
327                            _localAddress = localAddress;
328                    }
329    
330                    public void run() {
331                            processClusterRequest(
332                                    _clusterRequest, _sourceAddress, _localAddress);
333                    }
334    
335                    private ClusterRequest _clusterRequest;
336                    private org.jgroups.Address _localAddress;
337                    private org.jgroups.Address _sourceAddress;
338    
339            }
340    
341    }