1
14
15 package com.liferay.portal.cluster;
16
17 import com.liferay.portal.kernel.cluster.Address;
18 import com.liferay.portal.kernel.cluster.ClusterException;
19 import com.liferay.portal.kernel.cluster.ClusterExecutor;
20 import com.liferay.portal.kernel.cluster.ClusterRequest;
21 import com.liferay.portal.kernel.log.Log;
22 import com.liferay.portal.kernel.log.LogFactoryUtil;
23 import com.liferay.portal.kernel.util.MethodInvoker;
24 import com.liferay.portal.kernel.util.MethodWrapper;
25 import com.liferay.portal.kernel.util.PropsKeys;
26 import com.liferay.portal.kernel.util.PropsUtil;
27 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
28 import com.liferay.portal.util.PropsValues;
29
30 import java.io.Serializable;
31
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Properties;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.Future;
39
40 import org.jgroups.ChannelException;
41 import org.jgroups.JChannel;
42
43
48 public class ClusterExecutorImpl
49 extends ClusterBase implements ClusterExecutor {
50
51 public void destroy() {
52 if (!PropsValues.CLUSTER_LINK_ENABLED) {
53 return;
54 }
55
56 _controlChannel.close();
57 }
58
59 public Map<Address, Future<?>> executeMulticastCall(
60 MethodWrapper methodWrapper) {
61
62 if (!PropsValues.CLUSTER_LINK_ENABLED) {
63 return null;
64 }
65
66 ClusterRequest clusterRequest = new ClusterRequestImpl();
67
68 clusterRequest.setMulticast(true);
69 clusterRequest.setPayload(methodWrapper);
70 clusterRequest.setUuid(PortalUUIDUtil.generate());
71
72 Map<Address, Future<?>> results = new HashMap<Address, Future<?>>();
73
74 List<Address> addresses = getControlAddresses();
75 Address localControlAddress = getLocalControlAddress();
76
77 for (Address address : addresses) {
78 if (_shortcutLocalMethod && address.equals(localControlAddress)) {
79 results.put(address, runLocalMethod(methodWrapper));
80 }
81 else {
82 results.put(address, new FutureResult<Object>());
83 }
84 }
85
86 _multicastResultMap.put(clusterRequest.getUuid(), results);
87
88 try {
89 _controlChannel.send(null, null, clusterRequest);
90 }
91 catch (ChannelException ce) {
92 _log.error("Unable to send unicast message " + clusterRequest, ce);
93 }
94
95 return results;
96 }
97
98 public Future<?> executeUnicastCall(
99 Address address, MethodWrapper methodWrapper) {
100
101 if (!PropsValues.CLUSTER_LINK_ENABLED) {
102 return null;
103 }
104
105 org.jgroups.Address jGroupsAddress =
106 (org.jgroups.Address)address.getRealAddress();
107
108 ClusterRequest clusterRequest = new ClusterRequestImpl();
109
110 clusterRequest.setMulticast(false);
111 clusterRequest.setPayload(methodWrapper);
112 clusterRequest.setUuid(PortalUUIDUtil.generate());
113
114 if (_shortcutLocalMethod && address.equals(getLocalControlAddress())) {
115 return runLocalMethod(methodWrapper);
116 }
117
118 FutureResult<Object> futureResult = new FutureResult<Object>();
119
120 _unicastResultMap.put(clusterRequest.getUuid(), futureResult);
121
122 try {
123 _controlChannel.send(jGroupsAddress, null, clusterRequest);
124 }
125 catch (ChannelException ce) {
126 _log.error("Unable to send unicast message " + clusterRequest, ce);
127 }
128
129 return futureResult;
130 }
131
132 public List<Address> getControlAddresses() {
133 if (!PropsValues.CLUSTER_LINK_ENABLED) {
134 return Collections.EMPTY_LIST;
135 }
136
137 return getAddresses(_controlChannel);
138 }
139
140 public Address getLocalControlAddress() {
141 if (!PropsValues.CLUSTER_LINK_ENABLED) {
142 return null;
143 }
144
145 return new AddressImpl(_controlChannel.getLocalAddress());
146 }
147
148 public boolean isShortcutLocalMethod() {
149 return _shortcutLocalMethod;
150 }
151
152 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
153 _shortcutLocalMethod = shortcutLocalMethod;
154 }
155
156 protected void initChannels() throws ChannelException {
157 Properties controlProperties = PropsUtil.getProperties(
158 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
159
160 String controlProperty = controlProperties.getProperty(
161 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
162
163 ClusterInvokeReceiver clusterInvokeReceiver = new ClusterInvokeReceiver(
164 _multicastResultMap, _unicastResultMap);
165
166 _controlChannel = createChannel(
167 controlProperty, clusterInvokeReceiver, _LIFERAY_CONTROL_CHANNEL);
168
169 clusterInvokeReceiver.setChannel(_controlChannel);
170 }
171
172 protected FutureResult<Object> runLocalMethod(MethodWrapper methodWrapper) {
173 FutureResult<Object> futureResult = new FutureResult<Object>();
174
175 try {
176 Object returnValue = MethodInvoker.invoke(methodWrapper);
177
178 if (returnValue instanceof Serializable) {
179 futureResult.setResult(returnValue);
180 }
181 else if (returnValue != null) {
182 futureResult.setException(
183 new ClusterException("Return value is not serializable"));
184 }
185 else {
186 futureResult.setResult(null);
187 }
188 }
189 catch (Exception e) {
190 futureResult.setException(e);
191 }
192
193 return futureResult;
194 }
195
196 private static final String _LIFERAY_CONTROL_CHANNEL =
197 "LIFERAY-CONTROL-CHANNEL";
198
199 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
200
201 private JChannel _controlChannel;
202 private Map<String, Map<Address, Future<?>>> _multicastResultMap =
203 new ConcurrentHashMap<String, Map<Address, Future<?>>>();
204 private boolean _shortcutLocalMethod;
205 private Map<String, Future<?>> _unicastResultMap =
206 new ConcurrentHashMap<String, Future<?>>();
207
208 }