001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027 import com.liferay.portal.kernel.exception.SystemException;
028 import com.liferay.portal.kernel.log.Log;
029 import com.liferay.portal.kernel.log.LogFactoryUtil;
030 import com.liferay.portal.kernel.util.InetAddressUtil;
031 import com.liferay.portal.kernel.util.MethodHandler;
032 import com.liferay.portal.kernel.util.NamedThreadFactory;
033 import com.liferay.portal.kernel.util.ObjectValuePair;
034 import com.liferay.portal.kernel.util.PropsKeys;
035 import com.liferay.portal.kernel.util.PropsUtil;
036 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
037 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
038 import com.liferay.portal.util.PortalPortEventListener;
039 import com.liferay.portal.util.PortalUtil;
040 import com.liferay.portal.util.PropsValues;
041
042 import java.io.Serializable;
043
044 import java.net.InetAddress;
045
046 import java.util.ArrayList;
047 import java.util.Collection;
048 import java.util.Collections;
049 import java.util.Iterator;
050 import java.util.List;
051 import java.util.Map;
052 import java.util.Properties;
053 import java.util.Set;
054 import java.util.concurrent.ConcurrentHashMap;
055 import java.util.concurrent.CopyOnWriteArrayList;
056 import java.util.concurrent.Executors;
057 import java.util.concurrent.ScheduledExecutorService;
058 import java.util.concurrent.TimeUnit;
059
060 import org.jgroups.ChannelException;
061 import org.jgroups.JChannel;
062
063
067 public class ClusterExecutorImpl
068 extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
069
070 public void addClusterEventListener(
071 ClusterEventListener clusterEventListener) {
072
073 if (!isEnabled()) {
074 return;
075 }
076
077 _clusterEventListeners.addIfAbsent(clusterEventListener);
078 }
079
080 @Override
081 public void afterPropertiesSet() {
082 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
083 addClusterEventListener(new DebuggingClusterEventListenerImpl());
084 }
085
086 if (PropsValues.LIVE_USERS_ENABLED) {
087 addClusterEventListener(new LiveUsersClusterEventListenerImpl());
088 }
089
090 super.afterPropertiesSet();
091 }
092
093 @Override
094 public void destroy() {
095 if (!isEnabled()) {
096 return;
097 }
098
099 _scheduledExecutorService.shutdownNow();
100
101 _clusterRequestReceiver.destroy();
102
103 _controlChannel.close();
104 }
105
106 public FutureClusterResponses execute(ClusterRequest clusterRequest)
107 throws SystemException {
108
109 if (!isEnabled()) {
110 return null;
111 }
112
113 List<Address> addresses = prepareAddresses(clusterRequest);
114
115 FutureClusterResponses futureClusterResponses =
116 new FutureClusterResponses(addresses);
117
118 if (!clusterRequest.isFireAndForget()) {
119 String uuid = clusterRequest.getUuid();
120
121 _futureClusterResponses.put(uuid, futureClusterResponses);
122 }
123
124 if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
125 addresses.remove(getLocalClusterNodeAddress())) {
126
127 ClusterNodeResponse clusterNodeResponse = runLocalMethod(
128 clusterRequest.getMethodHandler());
129
130 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
131 clusterNodeResponse.setUuid(clusterRequest.getUuid());
132
133 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
134 }
135
136 if (clusterRequest.isMulticast()) {
137 sendMulticastRequest(clusterRequest);
138 }
139 else {
140 sendUnicastRequest(clusterRequest, addresses);
141 }
142
143 return futureClusterResponses;
144 }
145
146 public List<ClusterEventListener> getClusterEventListeners() {
147 if (!isEnabled()) {
148 return Collections.emptyList();
149 }
150
151 return Collections.unmodifiableList(_clusterEventListeners);
152 }
153
154 public List<Address> getClusterNodeAddresses() {
155 if (!isEnabled()) {
156 return Collections.emptyList();
157 }
158
159 removeExpiredInstances();
160
161 return new ArrayList<Address>(_clusterNodeAddresses.values());
162 }
163
164 public List<ClusterNode> getClusterNodes() {
165 if (!isEnabled()) {
166 return Collections.emptyList();
167 }
168
169 removeExpiredInstances();
170
171 Set<ObjectValuePair<Address, ClusterNode>> liveInstances =
172 _liveInstances.keySet();
173
174 List<ClusterNode> liveClusterNodes = new ArrayList<ClusterNode>(
175 liveInstances.size());
176
177 for (ObjectValuePair<Address, ClusterNode> liveInstance :
178 liveInstances) {
179
180 liveClusterNodes.add(liveInstance.getValue());
181 }
182
183 return liveClusterNodes;
184 }
185
186 public ClusterNode getLocalClusterNode() {
187 if (!isEnabled()) {
188 return null;
189 }
190
191 return _localClusterNode;
192 }
193
194 public Address getLocalClusterNodeAddress() {
195 if (!isEnabled()) {
196 return null;
197 }
198
199 return _localAddress;
200 }
201
202 public void initialize() {
203 if (!isEnabled()) {
204 return;
205 }
206
207 PortalUtil.addPortalPortEventListener(this);
208
209 _localAddress = new AddressImpl(_controlChannel.getLocalAddress());
210
211 try {
212 initLocalClusterNode();
213 }
214 catch (SystemException se) {
215 _log.error("Unable to determine local network address", se);
216 }
217
218 ObjectValuePair<Address, ClusterNode> localInstance =
219 new ObjectValuePair<Address, ClusterNode>(
220 _localAddress, _localClusterNode);
221
222 _liveInstances.put(localInstance, Long.MAX_VALUE);
223
224 _clusterNodeAddresses.put(
225 _localClusterNode.getClusterNodeId(), _localAddress);
226
227 _clusterRequestReceiver.initialize();
228
229 _scheduledExecutorService = Executors.newScheduledThreadPool(
230 1,
231 new NamedThreadFactory(
232 ClusterExecutorImpl.class.getName(), Thread.NORM_PRIORITY,
233 Thread.currentThread().getContextClassLoader()));
234
235 _scheduledExecutorService.scheduleWithFixedDelay(
236 new HeartbeatTask(), 0,
237 PropsValues.CLUSTER_EXECUTOR_HEARTBEAT_INTERVAL,
238 TimeUnit.MILLISECONDS);
239 }
240
241 public boolean isClusterNodeAlive(Address address) {
242 if (!isEnabled()) {
243 return false;
244 }
245
246 removeExpiredInstances();
247
248 return _clusterNodeAddresses.containsValue(address);
249 }
250
251 public boolean isClusterNodeAlive(String clusterNodeId) {
252 if (!isEnabled()) {
253 return false;
254 }
255
256 removeExpiredInstances();
257
258 return _clusterNodeAddresses.containsKey(clusterNodeId);
259 }
260
261 @Override
262 public boolean isEnabled() {
263 return PropsValues.CLUSTER_LINK_ENABLED;
264 }
265
266 public void portalPortConfigured(int port) {
267 if (!isEnabled() ||
268 _localClusterNode.getPort() ==
269 PropsValues.PORTAL_INSTANCE_HTTP_PORT) {
270
271 return;
272 }
273
274 _localClusterNode.setPort(port);
275 }
276
277 public void removeClusterEventListener(
278 ClusterEventListener clusterEventListener) {
279
280 if (!isEnabled()) {
281 return;
282 }
283
284 _clusterEventListeners.remove(clusterEventListener);
285 }
286
287 public void setClusterEventListeners(
288 List<ClusterEventListener> clusterEventListeners) {
289
290 if (!isEnabled()) {
291 return;
292 }
293
294 _clusterEventListeners.addAllAbsent(clusterEventListeners);
295 }
296
297 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
298 if (!isEnabled()) {
299 return;
300 }
301
302 _shortcutLocalMethod = shortcutLocalMethod;
303 }
304
305 protected void fireClusterEvent(ClusterEvent clusterEvent) {
306 for (ClusterEventListener listener : _clusterEventListeners) {
307 listener.processClusterEvent(clusterEvent);
308 }
309 }
310
311 protected JChannel getControlChannel() {
312 return _controlChannel;
313 }
314
315 protected FutureClusterResponses getExecutionResults(String uuid) {
316 return _futureClusterResponses.get(uuid);
317 }
318
319 @Override
320 protected void initChannels() {
321 Properties controlProperties = PropsUtil.getProperties(
322 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
323
324 String controlProperty = controlProperties.getProperty(
325 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
326
327 _clusterRequestReceiver = new ClusterRequestReceiver(this);
328
329 try {
330 _controlChannel = createJChannel(
331 controlProperty, _clusterRequestReceiver,
332 _DEFAULT_CLUSTER_NAME);
333 }
334 catch (ChannelException ce) {
335 _log.error(ce, ce);
336 }
337 catch (Exception e) {
338 _log.error(e, e);
339 }
340 }
341
342 protected void initLocalClusterNode() throws SystemException {
343 _localClusterNode = new ClusterNode(PortalUUIDUtil.generate());
344
345 if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
346 _localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
347 }
348 else {
349 _localClusterNode.setPort(PortalUtil.getPortalPort(false));
350 }
351
352 try {
353 InetAddress inetAddress = bindInetAddress;
354
355 if (inetAddress == null) {
356 inetAddress = InetAddressUtil.getLocalInetAddress();
357 }
358
359 _localClusterNode.setInetAddress(inetAddress);
360
361 _localClusterNode.setHostName(inetAddress.getHostName());
362 }
363 catch (Exception e) {
364 throw new SystemException(
365 "Unable to determine local network address", e);
366 }
367 }
368
369 protected boolean isShortcutLocalMethod() {
370 return _shortcutLocalMethod;
371 }
372
373 protected void notify(
374 Address address, ClusterNode clusterNode, long expirationTime) {
375
376 removeExpiredInstances();
377
378 if (System.currentTimeMillis() > expirationTime) {
379 return;
380 }
381
382 ObjectValuePair<Address, ClusterNode> liveInstance =
383 new ObjectValuePair<Address, ClusterNode>(address, clusterNode);
384
385
386
387
388 Long oldExpirationTime = _liveInstances.remove(liveInstance);
389
390 _liveInstances.put(liveInstance, expirationTime);
391
392 if ((oldExpirationTime != null) ||
393 ((_localAddress != null) && _localAddress.equals(address))) {
394
395 return;
396 }
397
398 _clusterNodeAddresses.put(clusterNode.getClusterNodeId(), address);
399
400 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
401
402 fireClusterEvent(clusterEvent);
403 }
404
405 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
406 boolean isMulticast = clusterRequest.isMulticast();
407
408 List<Address> addresses = null;
409
410 if (isMulticast) {
411 addresses = getAddresses(_controlChannel);
412 }
413 else {
414 addresses = new ArrayList<Address>();
415
416 Collection<Address> clusterNodeAddresses =
417 clusterRequest.getTargetClusterNodeAddresses();
418
419 if (clusterNodeAddresses != null) {
420 addresses.addAll(clusterNodeAddresses);
421 }
422
423 Collection<String> clusterNodeIds =
424 clusterRequest.getTargetClusterNodeIds();
425
426 if (clusterNodeIds != null) {
427 for (String clusterNodeId : clusterNodeIds) {
428 Address address = _clusterNodeAddresses.get(clusterNodeId);
429
430 addresses.add(address);
431 }
432 }
433 }
434
435 return addresses;
436 }
437
438 protected void removeExpiredInstances() {
439 if (_liveInstances.isEmpty()) {
440 return;
441 }
442
443 Set<Map.Entry<ObjectValuePair<Address, ClusterNode>, Long>>
444 liveInstances = _liveInstances.entrySet();
445
446 Iterator<Map.Entry<ObjectValuePair<Address, ClusterNode>, Long>> itr =
447 liveInstances.iterator();
448
449 long now = System.currentTimeMillis();
450
451 while (itr.hasNext()) {
452 Map.Entry<ObjectValuePair<Address, ClusterNode>, Long> entry =
453 itr.next();
454
455 long expirationTime = entry.getValue();
456
457 if (now < expirationTime) {
458 continue;
459 }
460
461 ObjectValuePair<Address, ClusterNode> liveInstance = entry.getKey();
462
463 ClusterNode clusterNode = liveInstance.getValue();
464
465 if (_localClusterNode.equals(clusterNode)) {
466 continue;
467 }
468
469 _clusterNodeAddresses.remove(clusterNode.getClusterNodeId());
470
471 itr.remove();
472
473 ClusterEvent clusterEvent = ClusterEvent.depart(clusterNode);
474
475 fireClusterEvent(clusterEvent);
476 }
477 }
478
479 protected ClusterNodeResponse runLocalMethod(MethodHandler methodHandler) {
480 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
481
482 ClusterNode localClusterNode = getLocalClusterNode();
483
484 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
485 clusterNodeResponse.setClusterNode(localClusterNode);
486 clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
487
488 if (methodHandler == null) {
489 clusterNodeResponse.setException(
490 new ClusterException(
491 "Payload is not of type " + MethodHandler.class.getName()));
492
493 return clusterNodeResponse;
494 }
495
496 try {
497 Object returnValue = methodHandler.invoke(true);
498
499 if (returnValue instanceof Serializable) {
500 clusterNodeResponse.setResult(returnValue);
501 }
502 else if (returnValue != null) {
503 clusterNodeResponse.setException(
504 new ClusterException("Return value is not serializable"));
505 }
506 }
507 catch (Exception e) {
508 clusterNodeResponse.setException(e);
509 }
510
511 return clusterNodeResponse;
512 }
513
514 protected void sendMulticastRequest(ClusterRequest clusterRequest)
515 throws SystemException {
516
517 try {
518 _controlChannel.send(null, null, clusterRequest);
519 }
520 catch (ChannelException ce) {
521 _log.error(
522 "Unable to send multicast message " + clusterRequest, ce);
523
524 throw new SystemException("Unable to send multicast request", ce);
525 }
526 }
527
528 protected void sendUnicastRequest(
529 ClusterRequest clusterRequest, List<Address> addresses)
530 throws SystemException {
531
532 for (Address address : addresses) {
533 org.jgroups.Address jGroupsAddress =
534 (org.jgroups.Address)address.getRealAddress();
535
536 try {
537 _controlChannel.send(jGroupsAddress, null, clusterRequest);
538 }
539 catch (ChannelException ce) {
540 _log.error(
541 "Unable to send unicast message " + clusterRequest, ce);
542
543 throw new SystemException("Unable to send unicast request", ce);
544 }
545 }
546 }
547
548 private static final String _DEFAULT_CLUSTER_NAME =
549 "LIFERAY-CONTROL-CHANNEL";
550
551 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
552
553 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
554 new CopyOnWriteArrayList<ClusterEventListener>();
555 private Map<String, Address> _clusterNodeAddresses =
556 new ConcurrentHashMap<String, Address>();
557 private ClusterRequestReceiver _clusterRequestReceiver;
558 private JChannel _controlChannel;
559 private Map<String, FutureClusterResponses> _futureClusterResponses =
560 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
561 private Map<ObjectValuePair<Address, ClusterNode>, Long> _liveInstances =
562 new ConcurrentHashMap<ObjectValuePair<Address, ClusterNode>, Long>();
563 private Address _localAddress;
564 private ClusterNode _localClusterNode;
565 private ScheduledExecutorService _scheduledExecutorService;
566 private boolean _shortcutLocalMethod;
567
568 private class HeartbeatTask implements Runnable {
569
570 public void run() {
571 try {
572 ClusterRequest clusterNotifyRequest =
573 ClusterRequest.createClusterNotifyRequest(
574 _localClusterNode);
575
576 sendMulticastRequest(clusterNotifyRequest);
577 }
578 catch (Exception e) {
579 if (_log.isDebugEnabled()) {
580 _log.debug("Unable to send check in request", e);
581 }
582
583 _scheduledExecutorService.scheduleWithFixedDelay(
584 new HeartbeatTask(), 0,
585 PropsValues.CLUSTER_EXECUTOR_HEARTBEAT_INTERVAL,
586 TimeUnit.MILLISECONDS);
587 }
588 }
589
590 }
591
592 }