1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * This library is free software; you can redistribute it and/or modify it under
5    * the terms of the GNU Lesser General Public License as published by the Free
6    * Software Foundation; either version 2.1 of the License, or (at your option)
7    * any later version.
8    *
9    * This library is distributed in the hope that it will be useful, but WITHOUT
10   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11   * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12   * details.
13   */
14  
15  package com.liferay.portal.cluster;
16  
17  import com.liferay.portal.kernel.cluster.Address;
18  import com.liferay.portal.kernel.cluster.ClusterException;
19  import com.liferay.portal.kernel.cluster.ClusterExecutor;
20  import com.liferay.portal.kernel.cluster.ClusterRequest;
21  import com.liferay.portal.kernel.log.Log;
22  import com.liferay.portal.kernel.log.LogFactoryUtil;
23  import com.liferay.portal.kernel.util.MethodInvoker;
24  import com.liferay.portal.kernel.util.MethodWrapper;
25  import com.liferay.portal.kernel.util.PropsKeys;
26  import com.liferay.portal.kernel.util.PropsUtil;
27  import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
28  import com.liferay.portal.util.PropsValues;
29  
30  import java.io.Serializable;
31  
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.Properties;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.Future;
39  
40  import org.jgroups.ChannelException;
41  import org.jgroups.JChannel;
42  
43  /**
44   * <a href="ClusterExecutorImpl.java.html"><b><i>View Source</i></b></a>
45   *
46   * @author Tina Tian
47   */
48  public class ClusterExecutorImpl
49      extends ClusterBase implements ClusterExecutor {
50  
51      public void destroy() {
52          if (!PropsValues.CLUSTER_LINK_ENABLED) {
53              return;
54          }
55  
56          _controlChannel.close();
57      }
58  
59      public Map<Address, Future<?>> executeMulticastCall(
60          MethodWrapper methodWrapper) {
61  
62          if (!PropsValues.CLUSTER_LINK_ENABLED) {
63              return null;
64          }
65  
66          ClusterRequest clusterRequest = new ClusterRequestImpl();
67  
68          clusterRequest.setMulticast(true);
69          clusterRequest.setPayload(methodWrapper);
70          clusterRequest.setUuid(PortalUUIDUtil.generate());
71  
72          Map<Address, Future<?>> results = new HashMap<Address, Future<?>>();
73  
74          List<Address> addresses = getControlAddresses();
75          Address localControlAddress = getLocalControlAddress();
76  
77          for (Address address : addresses) {
78              if (_shortcutLocalMethod && address.equals(localControlAddress)) {
79                  results.put(address, runLocalMethod(methodWrapper));
80              }
81              else {
82                  results.put(address, new FutureResult<Object>());
83              }
84          }
85  
86          _multicastResultMap.put(clusterRequest.getUuid(), results);
87  
88          try {
89              _controlChannel.send(null, null, clusterRequest);
90          }
91          catch (ChannelException ce) {
92              _log.error("Unable to send unicast message " + clusterRequest, ce);
93          }
94  
95          return results;
96      }
97  
98      public Future<?> executeUnicastCall(
99          Address address, MethodWrapper methodWrapper) {
100 
101         if (!PropsValues.CLUSTER_LINK_ENABLED) {
102             return null;
103         }
104 
105         org.jgroups.Address jGroupsAddress =
106             (org.jgroups.Address)address.getRealAddress();
107 
108         ClusterRequest clusterRequest = new ClusterRequestImpl();
109 
110         clusterRequest.setMulticast(false);
111         clusterRequest.setPayload(methodWrapper);
112         clusterRequest.setUuid(PortalUUIDUtil.generate());
113 
114         if (_shortcutLocalMethod && address.equals(getLocalControlAddress())) {
115             return runLocalMethod(methodWrapper);
116         }
117 
118         FutureResult<Object> futureResult = new FutureResult<Object>();
119 
120         _unicastResultMap.put(clusterRequest.getUuid(), futureResult);
121 
122         try {
123             _controlChannel.send(jGroupsAddress, null, clusterRequest);
124         }
125         catch (ChannelException ce) {
126             _log.error("Unable to send unicast message " + clusterRequest, ce);
127         }
128 
129         return futureResult;
130     }
131 
132     public List<Address> getControlAddresses() {
133         if (!PropsValues.CLUSTER_LINK_ENABLED) {
134             return Collections.EMPTY_LIST;
135         }
136 
137         return getAddresses(_controlChannel);
138     }
139 
140     public Address getLocalControlAddress() {
141         if (!PropsValues.CLUSTER_LINK_ENABLED) {
142             return null;
143         }
144 
145         return new AddressImpl(_controlChannel.getLocalAddress());
146     }
147 
148     public boolean isShortcutLocalMethod() {
149         return _shortcutLocalMethod;
150     }
151 
152     public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
153         _shortcutLocalMethod = shortcutLocalMethod;
154     }
155 
156     protected void initChannels() throws ChannelException {
157         Properties controlProperties = PropsUtil.getProperties(
158             PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
159 
160         String controlProperty = controlProperties.getProperty(
161             PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
162 
163         ClusterInvokeReceiver clusterInvokeReceiver = new ClusterInvokeReceiver(
164             _multicastResultMap, _unicastResultMap);
165 
166         _controlChannel = createChannel(
167             controlProperty, clusterInvokeReceiver, _LIFERAY_CONTROL_CHANNEL);
168 
169         clusterInvokeReceiver.setChannel(_controlChannel);
170     }
171 
172     protected FutureResult<Object> runLocalMethod(MethodWrapper methodWrapper) {
173         FutureResult<Object> futureResult = new FutureResult<Object>();
174 
175         try {
176             Object returnValue = MethodInvoker.invoke(methodWrapper);
177 
178             if (returnValue instanceof Serializable) {
179                 futureResult.setResult(returnValue);
180             }
181             else if (returnValue != null) {
182                 futureResult.setException(
183                     new ClusterException("Return value is not serializable"));
184             }
185             else {
186                 futureResult.setResult(null);
187             }
188         }
189         catch (Exception e) {
190             futureResult.setException(e);
191         }
192 
193         return futureResult;
194     }
195 
196     private static final String _LIFERAY_CONTROL_CHANNEL =
197         "LIFERAY-CONTROL-CHANNEL";
198 
199     private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
200 
201     private JChannel _controlChannel;
202     private Map<String, Map<Address, Future<?>>> _multicastResultMap =
203         new ConcurrentHashMap<String, Map<Address, Future<?>>>();
204     private boolean _shortcutLocalMethod;
205     private Map<String, Future<?>> _unicastResultMap =
206         new ConcurrentHashMap<String, Future<?>>();
207 
208 }