001
014
015 package com.liferay.portal.cache.ehcache;
016
017 import com.liferay.portal.cluster.BaseReceiver;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.io.Serializable;
022
023 import java.rmi.RemoteException;
024
025 import java.util.ArrayList;
026 import java.util.List;
027
028 import net.sf.ehcache.Cache;
029 import net.sf.ehcache.CacheException;
030 import net.sf.ehcache.CacheManager;
031 import net.sf.ehcache.Ehcache;
032 import net.sf.ehcache.Element;
033 import net.sf.ehcache.distribution.CacheManagerPeerProvider;
034 import net.sf.ehcache.distribution.CachePeer;
035 import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
036
037 import org.jgroups.Address;
038 import org.jgroups.JChannel;
039 import org.jgroups.Message;
040
041
048 public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
049
050 public JGroupsManager(
051 CacheManager cacheManager, String clusterName,
052 String channelProperties) {
053
054 try {
055 _jChannel = new JChannel(channelProperties);
056
057 _jChannel.setReceiver(new EhcacheJGroupsReceiver());
058
059 _jChannel.connect(clusterName);
060
061 if (_log.isInfoEnabled()) {
062 _log.info(
063 "Create a new channel with properties " +
064 _jChannel.getProperties());
065 }
066 }
067 catch (Exception e) {
068 if (_log.isErrorEnabled()) {
069 _log.error("Unable to initialize channels", e);
070 }
071 }
072
073 _cacheManager = cacheManager;
074 }
075
076 public void dispose() throws CacheException {
077 if (_jChannel != null) {
078 _jChannel.close();
079 }
080 }
081
082 public Address getBusLocalAddress() {
083 return _jChannel.getAddress();
084 }
085
086 public List<Address> getBusMembership() {
087 return _jChannel.getView().getMembers();
088 }
089
090 @SuppressWarnings("rawtypes")
091 public List getElements(List list) {
092 return null;
093 }
094
095 public String getGuid() {
096 return null;
097 }
098
099 @SuppressWarnings("rawtypes")
100 public List getKeys() {
101 return null;
102 }
103
104 public String getName() {
105 return null;
106 }
107
108 public Element getQuiet(Serializable serializable) {
109 return null;
110 }
111
112 public String getScheme() {
113 return _SCHEME;
114 }
115
116 public long getTimeForClusterToForm() {
117 return 0;
118 }
119
120 public String getUrl() {
121 return null;
122 }
123
124 public String getUrlBase() {
125 return null;
126 }
127
128 public void handleNotification(Serializable serializable) {
129 if (serializable instanceof JGroupEventMessage) {
130 handleJGroupsNotification((JGroupEventMessage)serializable);
131 }
132 else if (serializable instanceof List<?>) {
133 List<?> valueList = (List<?>)serializable;
134
135 for (Object object : valueList) {
136 if (object instanceof JGroupEventMessage) {
137 handleJGroupsNotification((JGroupEventMessage)object);
138 }
139 }
140 }
141 }
142
143 public void init() {
144 }
145
146 public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
147 List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
148
149 cachePeers.add(this);
150
151 return cachePeers;
152 }
153
154 public void put(Element element) {
155 }
156
157 public void registerPeer(String string) {
158 }
159
160 public boolean remove(Serializable serializable) {
161 return false;
162 }
163
164 public void removeAll() {
165 }
166
167 @SuppressWarnings("rawtypes")
168 public void send(Address address, List eventMessages)
169 throws RemoteException {
170
171 ArrayList<JGroupEventMessage> jGroupEventMessages =
172 new ArrayList<JGroupEventMessage>();
173
174 for (Object eventMessage : eventMessages) {
175 if (eventMessage instanceof JGroupEventMessage) {
176 JGroupEventMessage jGroupEventMessage =
177 (JGroupEventMessage)eventMessage;
178
179 jGroupEventMessages.add(jGroupEventMessage);
180 }
181 else {
182 if (_log.isDebugEnabled()) {
183 _log.debug(
184 eventMessage + "is not a JGroupEventMessage type");
185 }
186 }
187 }
188
189 try {
190 _jChannel.send(address, null, jGroupEventMessages);
191 }
192 catch (Throwable t) {
193 throw new RemoteException(t.getMessage());
194 }
195 }
196
197 @SuppressWarnings("rawtypes")
198 public void send(List eventMessages) throws RemoteException {
199 send(null, eventMessages);
200 }
201
202 public void unregisterPeer(String string) {
203 }
204
205 protected void handleJGroupsNotification(
206 JGroupEventMessage jGroupEventMessage) {
207
208 Cache cache = _cacheManager.getCache(jGroupEventMessage.getCacheName());
209
210 if (cache == null) {
211 return;
212 }
213
214 int event = jGroupEventMessage.getEvent();
215 Serializable key = jGroupEventMessage.getSerializableKey();
216
217 if ((event == JGroupEventMessage.REMOVE) &&
218 (cache.getQuiet(key) != null)) {
219
220 cache.remove(key, true);
221 }
222 else if (event == JGroupEventMessage.REMOVE_ALL) {
223 cache.removeAll(true);
224 }
225 else if (event == JGroupEventMessage.PUT) {
226 Element element = jGroupEventMessage.getElement();
227
228 cache.put(new Element(key, element.getValue()), true);
229 }
230 }
231
232 private static final String _SCHEME = "JGroups";
233
234 private static Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
235
236 private CacheManager _cacheManager;
237 private JChannel _jChannel;
238
239 private class EhcacheJGroupsReceiver extends BaseReceiver {
240
241 @Override
242 public void receive(Message message) {
243 Object object = message.getObject();
244
245 if (object == null) {
246 if (_log.isWarnEnabled()) {
247 _log.warn("Message content is null");
248 }
249
250 return;
251 }
252
253 if (object instanceof Serializable) {
254 handleNotification((Serializable)object);
255 }
256 else {
257 if (_log.isWarnEnabled()) {
258 _log.warn(
259 "Unable to process message content of type " +
260 object.getClass().getName());
261 }
262 }
263 }
264
265 }
266
267 }