001
014
015 package com.liferay.portal.kernel.cluster;
016
017 import java.util.HashSet;
018 import java.util.List;
019 import java.util.Set;
020 import java.util.concurrent.BlockingQueue;
021 import java.util.concurrent.CancellationException;
022 import java.util.concurrent.CountDownLatch;
023 import java.util.concurrent.Future;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.TimeoutException;
026
027
030 public class FutureClusterResponses implements Future<ClusterNodeResponses> {
031
032 public FutureClusterResponses(List<Address> addresses) {
033 _clusterNodeResponses = new ClusterNodeResponses();
034 _countDownLatch = new CountDownLatch(addresses.size());
035 _expectedReplyAddress = new HashSet<Address>(addresses);
036 }
037
038 public void addClusterNodeResponse(
039 ClusterNodeResponse clusterNodeResponse) {
040
041 _clusterNodeResponses.addClusterResponse(clusterNodeResponse);
042
043 _countDownLatch.countDown();
044 }
045
046 public void addExpectedReplyAddress(Address address) {
047 _expectedReplyAddress.add(address);
048 }
049
050 public boolean cancel(boolean mayInterruptIfRunning) {
051 if (_cancelled || isDone()) {
052 return false;
053 }
054
055 _cancelled = true;
056
057 return true;
058 }
059
060 public boolean expectsReply(Address address) {
061 return _expectedReplyAddress.contains(address);
062 }
063
064 public ClusterNodeResponses get() throws InterruptedException {
065 if (_cancelled) {
066 throw new CancellationException();
067 }
068
069 _countDownLatch.await();
070
071 return _clusterNodeResponses;
072 }
073
074 public ClusterNodeResponses get(long timeout, TimeUnit timeUnit)
075 throws InterruptedException, TimeoutException {
076
077 if (_cancelled) {
078 throw new CancellationException();
079 }
080
081 if (_countDownLatch.await(timeout, timeUnit)) {
082 return _clusterNodeResponses;
083 }
084 else {
085 throw new TimeoutException();
086 }
087 }
088
089 public BlockingQueue<ClusterNodeResponse> getPartialResults() {
090 return _clusterNodeResponses.getClusterResponses();
091 }
092
093 public boolean isCancelled() {
094 return _cancelled;
095 }
096
097 public boolean isDone() {
098 if ((_countDownLatch.getCount() == 0) || _cancelled) {
099 return true;
100 }
101 else {
102 return false;
103 }
104 }
105
106 private boolean _cancelled;
107 private ClusterNodeResponses _clusterNodeResponses;
108 private CountDownLatch _countDownLatch;
109 private Set<Address> _expectedReplyAddress;
110
111 }