001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.notifications.Channel;
018 import com.liferay.portal.kernel.notifications.ChannelException;
019 import com.liferay.portal.kernel.notifications.ChannelHub;
020 import com.liferay.portal.kernel.notifications.ChannelListener;
021 import com.liferay.portal.kernel.notifications.NotificationEvent;
022 import com.liferay.portal.kernel.notifications.UnknownChannelException;
023 import com.liferay.portal.model.CompanyConstants;
024 import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
025 import com.liferay.portal.util.PropsValues;
026
027 import java.util.ArrayList;
028 import java.util.Collection;
029 import java.util.Collections;
030 import java.util.Iterator;
031 import java.util.List;
032 import java.util.Map;
033 import java.util.Set;
034 import java.util.concurrent.ConcurrentHashMap;
035 import java.util.concurrent.ConcurrentMap;
036
037
042 public class ChannelHubImpl implements ChannelHub {
043
044 public void cleanUp() throws ChannelException {
045 for (Channel channel : _channels.values()) {
046 channel.cleanUp();
047 }
048 }
049
050 public void cleanUp(long userId) throws ChannelException {
051 Channel channel = getChannel(userId);
052
053 channel.cleanUp();
054 }
055
056 public ChannelHub clone(long companyId) {
057 ChannelHubImpl channelHubImpl = new ChannelHubImpl();
058
059 channelHubImpl.setChannelPrototype(_channel);
060 channelHubImpl.setCompanyId(companyId);
061
062 return channelHubImpl;
063 }
064
065 public void confirmDelivery(
066 long userId, Collection<String> notificationEventUuids)
067 throws ChannelException {
068
069 confirmDelivery(userId, notificationEventUuids, false);
070 }
071
072 public void confirmDelivery(
073 long userId, Collection<String> notificationEventUuids,
074 boolean archive)
075 throws ChannelException {
076
077 Channel channel = getChannel(userId);
078
079 channel.confirmDelivery(notificationEventUuids, archive);
080 }
081
082 public void confirmDelivery(long userId, String notificationEventUuid)
083 throws ChannelException {
084
085 confirmDelivery(userId, notificationEventUuid, false);
086 }
087
088 public void confirmDelivery(
089 long userId, String notificationEventUuid, boolean archive)
090 throws ChannelException {
091
092 Channel channel = getChannel(userId);
093
094 channel.confirmDelivery(notificationEventUuid, archive);
095 }
096
097 public Channel createChannel(long userId) throws ChannelException {
098 if (_channels.containsKey(userId)) {
099 return _channels.get(userId);
100 }
101
102 Channel channel = _channel.clone(_companyId, userId);
103
104 Channel oldChannel = _channels.putIfAbsent(userId, channel);
105
106 if (oldChannel != null) {
107 channel.sendNotificationEvents(oldChannel.getNotificationEvents());
108 }
109
110 channel.init();
111
112 return channel;
113 }
114
115 public void deleteUserNotificiationEvent(
116 long userId, String notificationEventUuid)
117 throws ChannelException {
118
119 Channel channel = getChannel(userId);
120
121 channel.deleteUserNotificiationEvent(notificationEventUuid);
122 }
123
124 public void deleteUserNotificiationEvents(
125 long userId, Collection<String> notificationEventUuids)
126 throws ChannelException {
127
128 Channel channel = getChannel(userId);
129
130 channel.deleteUserNotificiationEvents(notificationEventUuids);
131 }
132
133 public void destroy() throws ChannelException {
134 Set<Map.Entry<Long, Channel>> channels = _channels.entrySet();
135
136 Iterator<Map.Entry<Long, Channel>> itr = channels.iterator();
137
138 while (itr.hasNext()) {
139 Channel channel = itr.next().getValue();
140
141 channel.close();
142
143 itr.remove();
144 }
145 }
146
147 public Channel destroyChannel(long userId) throws ChannelException {
148 Channel channel = _channels.remove(userId);
149
150 if (channel != null) {
151 channel.close();
152 }
153
154 return channel;
155 }
156
157 public Channel fetchChannel(long userId) throws ChannelException {
158 return fetchChannel(userId, false);
159 }
160
161 public Channel fetchChannel(long userId, boolean createIfAbsent)
162 throws ChannelException {
163
164 Channel channel = _channels.get(userId);
165
166 if (channel == null) {
167 synchronized (_channels) {
168 channel = _channels.get(userId);
169
170 if (channel == null) {
171 if (createIfAbsent) {
172 channel = createChannel(userId);
173 }
174 }
175 }
176 }
177
178 return channel;
179 }
180
181 public void flush() throws ChannelException {
182 for (Channel channel : _channels.values()) {
183 channel.flush();
184 }
185 }
186
187 public void flush(long userId) throws ChannelException {
188 Channel channel = fetchChannel(userId);
189
190 if (channel != null) {
191 channel.flush();
192 }
193 }
194
195 public void flush(long userId, long timestamp) throws ChannelException {
196 Channel channel = fetchChannel(userId);
197
198 if (channel != null) {
199 channel.flush(timestamp);
200 }
201 }
202
203 public Channel getChannel(long userId) throws ChannelException {
204 return getChannel(userId, false);
205 }
206
207 public Channel getChannel(long userId, boolean createIfAbsent)
208 throws ChannelException {
209
210 Channel channel = fetchChannel(userId, createIfAbsent);
211
212 if (channel == null) {
213 throw new UnknownChannelException(
214 "No channel exists with user id " + userId);
215 }
216
217 return channel;
218 }
219
220 public long getCompanyId() {
221 return _companyId;
222 }
223
224 public List<NotificationEvent> getNotificationEvents(long userId)
225 throws ChannelException {
226
227 return getNotificationEvents(userId, false);
228 }
229
230 public List<NotificationEvent> getNotificationEvents(
231 long userId, boolean flush)
232 throws ChannelException {
233
234 Channel channel = getChannel(userId);
235
236 return channel.getNotificationEvents(flush);
237 }
238
239 public Collection<Long> getUserIds() {
240 return Collections.unmodifiableSet(_channels.keySet());
241 }
242
243 public void registerChannelListener(
244 long userId, ChannelListener channelListener)
245 throws ChannelException {
246
247 Channel channel = getChannel(userId);
248
249 channel.registerChannelListener(channelListener);
250 }
251
252 public void removeTransientNotificationEvents(
253 long userId, Collection<NotificationEvent> notificationEvents)
254 throws ChannelException {
255
256 Channel channel = fetchChannel(userId);
257
258 if (channel != null) {
259 channel.removeTransientNotificationEvents(notificationEvents);
260 }
261 }
262
263 public void removeTransientNotificationEventsByUuid(
264 long userId, Collection<String> notificationEventUuids)
265 throws ChannelException {
266
267 Channel channel = fetchChannel(userId);
268
269 if (channel != null) {
270 channel.removeTransientNotificationEventsByUuid(
271 notificationEventUuids);
272 }
273 }
274
275 public void sendNotificationEvent(
276 long userId, NotificationEvent notificationEvent)
277 throws ChannelException {
278
279 Channel channel = fetchChannel(userId);
280
281 if (channel != null) {
282 channel.sendNotificationEvent(notificationEvent);
283
284 return;
285 }
286
287 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED ||
288 !notificationEvent.isDeliveryRequired()) {
289
290 return;
291 }
292
293 try {
294 UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
295 userId, notificationEvent);
296 }
297 catch (Exception e) {
298 throw new ChannelException("Unable to send event", e);
299 }
300 }
301
302 public void sendNotificationEvents(
303 long userId, Collection<NotificationEvent> notificationEvents)
304 throws ChannelException {
305
306 Channel channel = fetchChannel(userId);
307
308 if (channel != null) {
309 channel.sendNotificationEvents(notificationEvents);
310
311 return;
312 }
313
314 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
315 return;
316 }
317
318 List<NotificationEvent> persistedNotificationEvents =
319 new ArrayList<NotificationEvent>(notificationEvents.size());
320
321 for (NotificationEvent notificationEvent : notificationEvents) {
322 if (notificationEvent.isDeliveryRequired()) {
323 persistedNotificationEvents.add(notificationEvent);
324 }
325 }
326
327 try {
328 UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
329 userId, persistedNotificationEvents);
330 }
331 catch (Exception e) {
332 throw new ChannelException("Unable to send events", e);
333 }
334 }
335
336 public void setChannelPrototype(Channel channel) {
337 _channel = channel;
338 }
339
340 public void setCompanyId(long companyId) {
341 _companyId = companyId;
342 }
343
344 public void unregisterChannelListener(
345 long userId, ChannelListener channelListener)
346 throws ChannelException {
347
348 Channel channel = getChannel(userId);
349
350 channel.unregisterChannelListener(channelListener);
351 }
352
353 private Channel _channel;
354 private ConcurrentMap<Long, Channel> _channels =
355 new ConcurrentHashMap<Long, Channel>();
356 private long _companyId = CompanyConstants.SYSTEM;
357
358 }