001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027    import com.liferay.portal.kernel.exception.SystemException;
028    import com.liferay.portal.kernel.log.Log;
029    import com.liferay.portal.kernel.log.LogFactoryUtil;
030    import com.liferay.portal.kernel.util.InetAddressUtil;
031    import com.liferay.portal.kernel.util.MethodHandler;
032    import com.liferay.portal.kernel.util.NamedThreadFactory;
033    import com.liferay.portal.kernel.util.ObjectValuePair;
034    import com.liferay.portal.kernel.util.PropsKeys;
035    import com.liferay.portal.kernel.util.PropsUtil;
036    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
037    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
038    import com.liferay.portal.util.PortalPortEventListener;
039    import com.liferay.portal.util.PortalUtil;
040    import com.liferay.portal.util.PropsValues;
041    
042    import java.io.Serializable;
043    
044    import java.net.InetAddress;
045    
046    import java.util.ArrayList;
047    import java.util.Collection;
048    import java.util.Collections;
049    import java.util.Iterator;
050    import java.util.List;
051    import java.util.Map;
052    import java.util.Properties;
053    import java.util.Set;
054    import java.util.concurrent.ConcurrentHashMap;
055    import java.util.concurrent.CopyOnWriteArrayList;
056    import java.util.concurrent.Executors;
057    import java.util.concurrent.ScheduledExecutorService;
058    import java.util.concurrent.TimeUnit;
059    
060    import org.jgroups.ChannelException;
061    import org.jgroups.JChannel;
062    
063    /**
064     * @author Tina Tian
065     * @author Shuyang Zhou
066     */
067    public class ClusterExecutorImpl
068            extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
069    
070            public void addClusterEventListener(
071                    ClusterEventListener clusterEventListener) {
072    
073                    if (!isEnabled()) {
074                            return;
075                    }
076    
077                    _clusterEventListeners.addIfAbsent(clusterEventListener);
078            }
079    
080            @Override
081            public void afterPropertiesSet() {
082                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
083                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
084                    }
085    
086                    if (PropsValues.LIVE_USERS_ENABLED) {
087                            addClusterEventListener(new LiveUsersClusterEventListenerImpl());
088                    }
089    
090                    super.afterPropertiesSet();
091            }
092    
093            @Override
094            public void destroy() {
095                    if (!isEnabled()) {
096                            return;
097                    }
098    
099                    _scheduledExecutorService.shutdownNow();
100    
101                    _clusterRequestReceiver.destroy();
102    
103                    _controlChannel.close();
104            }
105    
106            public FutureClusterResponses execute(ClusterRequest clusterRequest)
107                    throws SystemException {
108    
109                    if (!isEnabled()) {
110                            return null;
111                    }
112    
113                    List<Address> addresses = prepareAddresses(clusterRequest);
114    
115                    FutureClusterResponses futureClusterResponses =
116                            new FutureClusterResponses(addresses);
117    
118                    if (!clusterRequest.isFireAndForget()) {
119                            String uuid = clusterRequest.getUuid();
120    
121                            _futureClusterResponses.put(uuid, futureClusterResponses);
122                    }
123    
124                    if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
125                            addresses.remove(getLocalClusterNodeAddress())) {
126    
127                            ClusterNodeResponse clusterNodeResponse = runLocalMethod(
128                                    clusterRequest.getMethodHandler());
129    
130                            clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
131                            clusterNodeResponse.setUuid(clusterRequest.getUuid());
132    
133                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
134                    }
135    
136                    if (clusterRequest.isMulticast()) {
137                            sendMulticastRequest(clusterRequest);
138                    }
139                    else {
140                            sendUnicastRequest(clusterRequest, addresses);
141                    }
142    
143                    return futureClusterResponses;
144            }
145    
146            public List<ClusterEventListener> getClusterEventListeners() {
147                    if (!isEnabled()) {
148                            return Collections.emptyList();
149                    }
150    
151                    return Collections.unmodifiableList(_clusterEventListeners);
152            }
153    
154            public List<Address> getClusterNodeAddresses() {
155                    if (!isEnabled()) {
156                            return Collections.emptyList();
157                    }
158    
159                    removeExpiredInstances();
160    
161                    return new ArrayList<Address>(_clusterNodeAddresses.values());
162            }
163    
164            public List<ClusterNode> getClusterNodes() {
165                    if (!isEnabled()) {
166                            return Collections.emptyList();
167                    }
168    
169                    removeExpiredInstances();
170    
171                    Set<ObjectValuePair<Address, ClusterNode>> liveInstances =
172                            _liveInstances.keySet();
173    
174                    List<ClusterNode> liveClusterNodes = new ArrayList<ClusterNode>(
175                            liveInstances.size());
176    
177                    for (ObjectValuePair<Address, ClusterNode> liveInstance :
178                                    liveInstances) {
179    
180                            liveClusterNodes.add(liveInstance.getValue());
181                    }
182    
183                    return liveClusterNodes;
184            }
185    
186            public ClusterNode getLocalClusterNode() {
187                    if (!isEnabled()) {
188                            return null;
189                    }
190    
191                    return _localClusterNode;
192            }
193    
194            public Address getLocalClusterNodeAddress() {
195                    if (!isEnabled()) {
196                            return null;
197                    }
198    
199                    return _localAddress;
200            }
201    
202            public void initialize() {
203                    if (!isEnabled()) {
204                            return;
205                    }
206    
207                    PortalUtil.addPortalPortEventListener(this);
208    
209                    _localAddress = new AddressImpl(_controlChannel.getLocalAddress());
210    
211                    try {
212                            initLocalClusterNode();
213                    }
214                    catch (SystemException se) {
215                            _log.error("Unable to determine local network address", se);
216                    }
217    
218                    ObjectValuePair<Address, ClusterNode> localInstance =
219                            new ObjectValuePair<Address, ClusterNode>(
220                                    _localAddress, _localClusterNode);
221    
222                    _liveInstances.put(localInstance, Long.MAX_VALUE);
223    
224                    _clusterNodeAddresses.put(
225                            _localClusterNode.getClusterNodeId(), _localAddress);
226    
227                    _clusterRequestReceiver.initialize();
228    
229                    _scheduledExecutorService = Executors.newScheduledThreadPool(
230                            1,
231                            new NamedThreadFactory(
232                                    ClusterExecutorImpl.class.getName(), Thread.NORM_PRIORITY,
233                                    Thread.currentThread().getContextClassLoader()));
234    
235                    _scheduledExecutorService.scheduleWithFixedDelay(
236                            new HeartbeatTask(), 0,
237                            PropsValues.CLUSTER_EXECUTOR_HEARTBEAT_INTERVAL,
238                            TimeUnit.MILLISECONDS);
239            }
240    
241            public boolean isClusterNodeAlive(Address address) {
242                    if (!isEnabled()) {
243                            return false;
244                    }
245    
246                    removeExpiredInstances();
247    
248                    return _clusterNodeAddresses.containsValue(address);
249            }
250    
251            public boolean isClusterNodeAlive(String clusterNodeId) {
252                    if (!isEnabled()) {
253                            return false;
254                    }
255    
256                    removeExpiredInstances();
257    
258                    return _clusterNodeAddresses.containsKey(clusterNodeId);
259            }
260    
261            @Override
262            public boolean isEnabled() {
263                    return PropsValues.CLUSTER_LINK_ENABLED;
264            }
265    
266            public void portalPortConfigured(int port) {
267                    if (!isEnabled() ||
268                            _localClusterNode.getPort() ==
269                                    PropsValues.PORTAL_INSTANCE_HTTP_PORT) {
270    
271                            return;
272                    }
273    
274                    _localClusterNode.setPort(port);
275            }
276    
277            public void removeClusterEventListener(
278                    ClusterEventListener clusterEventListener) {
279    
280                    if (!isEnabled()) {
281                            return;
282                    }
283    
284                    _clusterEventListeners.remove(clusterEventListener);
285            }
286    
287            public void setClusterEventListeners(
288                    List<ClusterEventListener> clusterEventListeners) {
289    
290                    if (!isEnabled()) {
291                            return;
292                    }
293    
294                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
295            }
296    
297            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
298                    if (!isEnabled()) {
299                            return;
300                    }
301    
302                    _shortcutLocalMethod = shortcutLocalMethod;
303            }
304    
305            protected void fireClusterEvent(ClusterEvent clusterEvent) {
306                    for (ClusterEventListener listener : _clusterEventListeners) {
307                            listener.processClusterEvent(clusterEvent);
308                    }
309            }
310    
311            protected JChannel getControlChannel() {
312                    return _controlChannel;
313            }
314    
315            protected FutureClusterResponses getExecutionResults(String uuid) {
316                    return _futureClusterResponses.get(uuid);
317            }
318    
319            @Override
320            protected void initChannels() {
321                    Properties controlProperties = PropsUtil.getProperties(
322                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
323    
324                    String controlProperty = controlProperties.getProperty(
325                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
326    
327                    _clusterRequestReceiver = new ClusterRequestReceiver(this);
328    
329                    try {
330                            _controlChannel = createJChannel(
331                                    controlProperty, _clusterRequestReceiver,
332                                    _DEFAULT_CLUSTER_NAME);
333                    }
334                    catch (ChannelException ce) {
335                            _log.error(ce, ce);
336                    }
337                    catch (Exception e) {
338                            _log.error(e, e);
339                    }
340            }
341    
342            protected void initLocalClusterNode() throws SystemException {
343                    _localClusterNode = new ClusterNode(PortalUUIDUtil.generate());
344    
345                    if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
346                            _localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
347                    }
348                    else {
349                            _localClusterNode.setPort(PortalUtil.getPortalPort(false));
350                    }
351    
352                    try {
353                            InetAddress inetAddress = bindInetAddress;
354    
355                            if (inetAddress == null) {
356                                    inetAddress = InetAddressUtil.getLocalInetAddress();
357                            }
358    
359                            _localClusterNode.setInetAddress(inetAddress);
360    
361                            _localClusterNode.setHostName(inetAddress.getHostName());
362                    }
363                    catch (Exception e) {
364                            throw new SystemException(
365                                    "Unable to determine local network address", e);
366                    }
367            }
368    
369            protected boolean isShortcutLocalMethod() {
370                    return _shortcutLocalMethod;
371            }
372    
373            protected void notify(
374                    Address address, ClusterNode clusterNode, long expirationTime) {
375    
376                    removeExpiredInstances();
377    
378                    if (System.currentTimeMillis() > expirationTime) {
379                            return;
380                    }
381    
382                    ObjectValuePair<Address, ClusterNode> liveInstance =
383                            new ObjectValuePair<Address, ClusterNode>(address, clusterNode);
384    
385                    // Go through the extra step of removing, and then putting the new
386                    // expiration time. See LPS-23463.
387    
388                    Long oldExpirationTime = _liveInstances.remove(liveInstance);
389    
390                    _liveInstances.put(liveInstance, expirationTime);
391    
392                    if ((oldExpirationTime != null) ||
393                            ((_localAddress != null) && _localAddress.equals(address))) {
394    
395                            return;
396                    }
397    
398                    _clusterNodeAddresses.put(clusterNode.getClusterNodeId(), address);
399    
400                    ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
401    
402                    fireClusterEvent(clusterEvent);
403            }
404    
405            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
406                    boolean isMulticast = clusterRequest.isMulticast();
407    
408                    List<Address> addresses = null;
409    
410                    if (isMulticast) {
411                            addresses = getAddresses(_controlChannel);
412                    }
413                    else {
414                            addresses = new ArrayList<Address>();
415    
416                            Collection<Address> clusterNodeAddresses =
417                                    clusterRequest.getTargetClusterNodeAddresses();
418    
419                            if (clusterNodeAddresses != null) {
420                                    addresses.addAll(clusterNodeAddresses);
421                            }
422    
423                            Collection<String> clusterNodeIds =
424                                    clusterRequest.getTargetClusterNodeIds();
425    
426                            if (clusterNodeIds != null) {
427                                    for (String clusterNodeId : clusterNodeIds) {
428                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
429    
430                                            addresses.add(address);
431                                    }
432                            }
433                    }
434    
435                    return addresses;
436            }
437    
438            protected void removeExpiredInstances() {
439                    if (_liveInstances.isEmpty()) {
440                            return;
441                    }
442    
443                    Set<Map.Entry<ObjectValuePair<Address, ClusterNode>, Long>>
444                            liveInstances = _liveInstances.entrySet();
445    
446                    Iterator<Map.Entry<ObjectValuePair<Address, ClusterNode>, Long>> itr =
447                            liveInstances.iterator();
448    
449                    long now = System.currentTimeMillis();
450    
451                    while (itr.hasNext()) {
452                            Map.Entry<ObjectValuePair<Address, ClusterNode>, Long> entry =
453                                    itr.next();
454    
455                            long expirationTime = entry.getValue();
456    
457                            if (now < expirationTime) {
458                                    continue;
459                            }
460    
461                            ObjectValuePair<Address, ClusterNode> liveInstance = entry.getKey();
462    
463                            ClusterNode clusterNode = liveInstance.getValue();
464    
465                            if (_localClusterNode.equals(clusterNode)) {
466                                    continue;
467                            }
468    
469                            _clusterNodeAddresses.remove(clusterNode.getClusterNodeId());
470    
471                            itr.remove();
472    
473                            ClusterEvent clusterEvent = ClusterEvent.depart(clusterNode);
474    
475                            fireClusterEvent(clusterEvent);
476                    }
477            }
478    
479            protected ClusterNodeResponse runLocalMethod(MethodHandler methodHandler) {
480                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
481    
482                    ClusterNode localClusterNode = getLocalClusterNode();
483    
484                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
485                    clusterNodeResponse.setClusterNode(localClusterNode);
486                    clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
487    
488                    if (methodHandler == null) {
489                            clusterNodeResponse.setException(
490                                    new ClusterException(
491                                            "Payload is not of type " + MethodHandler.class.getName()));
492    
493                            return clusterNodeResponse;
494                    }
495    
496                    try {
497                            Object returnValue = methodHandler.invoke(true);
498    
499                            if (returnValue instanceof Serializable) {
500                                    clusterNodeResponse.setResult(returnValue);
501                            }
502                            else if (returnValue != null) {
503                                    clusterNodeResponse.setException(
504                                            new ClusterException("Return value is not serializable"));
505                            }
506                    }
507                    catch (Exception e) {
508                            clusterNodeResponse.setException(e);
509                    }
510    
511                    return clusterNodeResponse;
512            }
513    
514            protected void sendMulticastRequest(ClusterRequest clusterRequest)
515                    throws SystemException {
516    
517                    try {
518                            _controlChannel.send(null, null, clusterRequest);
519                    }
520                    catch (ChannelException ce) {
521                            _log.error(
522                                    "Unable to send multicast message " + clusterRequest, ce);
523    
524                            throw new SystemException("Unable to send multicast request", ce);
525                    }
526            }
527    
528            protected void sendUnicastRequest(
529                            ClusterRequest clusterRequest, List<Address> addresses)
530                    throws SystemException {
531    
532                    for (Address address : addresses) {
533                            org.jgroups.Address jGroupsAddress =
534                                    (org.jgroups.Address)address.getRealAddress();
535    
536                            try {
537                                    _controlChannel.send(jGroupsAddress, null, clusterRequest);
538                            }
539                            catch (ChannelException ce) {
540                                    _log.error(
541                                            "Unable to send unicast message " + clusterRequest, ce);
542    
543                                    throw new SystemException("Unable to send unicast request", ce);
544                            }
545                    }
546            }
547    
548            private static final String _DEFAULT_CLUSTER_NAME =
549                    "LIFERAY-CONTROL-CHANNEL";
550    
551            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
552    
553            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
554                    new CopyOnWriteArrayList<ClusterEventListener>();
555            private Map<String, Address> _clusterNodeAddresses =
556                    new ConcurrentHashMap<String, Address>();
557            private ClusterRequestReceiver _clusterRequestReceiver;
558            private JChannel _controlChannel;
559            private Map<String, FutureClusterResponses> _futureClusterResponses =
560                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
561            private Map<ObjectValuePair<Address, ClusterNode>, Long> _liveInstances =
562                    new ConcurrentHashMap<ObjectValuePair<Address, ClusterNode>, Long>();
563            private Address _localAddress;
564            private ClusterNode _localClusterNode;
565            private ScheduledExecutorService _scheduledExecutorService;
566            private boolean _shortcutLocalMethod;
567    
568            private class HeartbeatTask implements Runnable {
569    
570                    public void run() {
571                            try {
572                                    ClusterRequest clusterNotifyRequest =
573                                            ClusterRequest.createClusterNotifyRequest(
574                                                    _localClusterNode);
575    
576                                    sendMulticastRequest(clusterNotifyRequest);
577                            }
578                            catch (Exception e) {
579                                    if (_log.isDebugEnabled()) {
580                                            _log.debug("Unable to send check in request", e);
581                                    }
582    
583                                    _scheduledExecutorService.scheduleWithFixedDelay(
584                                            new HeartbeatTask(), 0,
585                                            PropsValues.CLUSTER_EXECUTOR_HEARTBEAT_INTERVAL,
586                                            TimeUnit.MILLISECONDS);
587                            }
588                    }
589    
590            }
591    
592    }