001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.notifications;
016    
017    import com.liferay.portal.kernel.notifications.Channel;
018    import com.liferay.portal.kernel.notifications.ChannelException;
019    import com.liferay.portal.kernel.notifications.ChannelHub;
020    import com.liferay.portal.kernel.notifications.ChannelListener;
021    import com.liferay.portal.kernel.notifications.NotificationEvent;
022    import com.liferay.portal.kernel.notifications.UnknownChannelException;
023    import com.liferay.portal.model.CompanyConstants;
024    import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
025    import com.liferay.portal.util.PropsValues;
026    
027    import java.util.ArrayList;
028    import java.util.Collection;
029    import java.util.Collections;
030    import java.util.Iterator;
031    import java.util.List;
032    import java.util.Map;
033    import java.util.Set;
034    import java.util.concurrent.ConcurrentHashMap;
035    import java.util.concurrent.ConcurrentMap;
036    
037    /**
038     * @author Edward Han
039     * @author Brian Wing Shun Chan
040     * @author Shuyang Zhou
041     */
042    public class ChannelHubImpl implements ChannelHub {
043    
044            public void cleanUp() throws ChannelException {
045                    for (Channel channel : _channels.values()) {
046                            channel.cleanUp();
047                    }
048            }
049    
050            public void cleanUp(long userId) throws ChannelException {
051                    Channel channel = getChannel(userId);
052    
053                    channel.cleanUp();
054            }
055    
056            public ChannelHub clone(long companyId) {
057                    ChannelHubImpl channelHubImpl = new ChannelHubImpl();
058    
059                    channelHubImpl.setChannelPrototype(_channel);
060                    channelHubImpl.setCompanyId(companyId);
061    
062                    return channelHubImpl;
063            }
064    
065            public void confirmDelivery(
066                            long userId, Collection<String> notificationEventUuids)
067                    throws ChannelException {
068    
069                    confirmDelivery(userId, notificationEventUuids, false);
070            }
071    
072            public void confirmDelivery(
073                            long userId, Collection<String> notificationEventUuids,
074                            boolean archive)
075                    throws ChannelException {
076    
077                    Channel channel = getChannel(userId);
078    
079                    channel.confirmDelivery(notificationEventUuids, archive);
080            }
081    
082            public void confirmDelivery(long userId, String notificationEventUuid)
083                    throws ChannelException {
084    
085                    confirmDelivery(userId, notificationEventUuid, false);
086            }
087    
088            public void confirmDelivery(
089                            long userId, String notificationEventUuid, boolean archive)
090                    throws ChannelException {
091    
092                    Channel channel = getChannel(userId);
093    
094                    channel.confirmDelivery(notificationEventUuid, archive);
095            }
096    
097            public Channel createChannel(long userId) throws ChannelException {
098                    if (_channels.containsKey(userId)) {
099                            return _channels.get(userId);
100                    }
101    
102                    Channel channel = _channel.clone(_companyId, userId);
103    
104                    Channel oldChannel = _channels.putIfAbsent(userId, channel);
105    
106                    if (oldChannel != null) {
107                            channel.sendNotificationEvents(oldChannel.getNotificationEvents());
108                    }
109    
110                    channel.init();
111    
112                    return channel;
113            }
114    
115            public void deleteUserNotificiationEvent(
116                            long userId, String notificationEventUuid)
117                    throws ChannelException {
118    
119                    Channel channel = getChannel(userId);
120    
121                    channel.deleteUserNotificiationEvent(notificationEventUuid);
122            }
123    
124            public void deleteUserNotificiationEvents(
125                            long userId, Collection<String> notificationEventUuids)
126                    throws ChannelException {
127    
128                    Channel channel = getChannel(userId);
129    
130                    channel.deleteUserNotificiationEvents(notificationEventUuids);
131            }
132    
133            public void destroy() throws ChannelException {
134                    Set<Map.Entry<Long, Channel>> channels = _channels.entrySet();
135    
136                    Iterator<Map.Entry<Long, Channel>> itr = channels.iterator();
137    
138                    while (itr.hasNext()) {
139                            Channel channel = itr.next().getValue();
140    
141                            channel.close();
142    
143                            itr.remove();
144                    }
145            }
146    
147            public Channel destroyChannel(long userId) throws ChannelException {
148                    Channel channel = _channels.remove(userId);
149    
150                    if (channel != null) {
151                            channel.close();
152                    }
153    
154                    return channel;
155            }
156    
157            public Channel fetchChannel(long userId) throws ChannelException {
158                    return fetchChannel(userId, false);
159            }
160    
161            public Channel fetchChannel(long userId, boolean createIfAbsent)
162                    throws ChannelException {
163    
164                    Channel channel = _channels.get(userId);
165    
166                    if (channel == null) {
167                            synchronized (_channels) {
168                                    channel = _channels.get(userId);
169    
170                                    if (channel == null) {
171                                            if (createIfAbsent) {
172                                                    channel = createChannel(userId);
173                                            }
174                                    }
175                            }
176                    }
177    
178                    return channel;
179            }
180    
181            public void flush() throws ChannelException {
182                    for (Channel channel : _channels.values()) {
183                            channel.flush();
184                    }
185            }
186    
187            public void flush(long userId) throws ChannelException {
188                    Channel channel = fetchChannel(userId);
189    
190                    if (channel != null) {
191                            channel.flush();
192                    }
193            }
194    
195            public void flush(long userId, long timestamp) throws ChannelException {
196                    Channel channel = fetchChannel(userId);
197    
198                    if (channel != null) {
199                            channel.flush(timestamp);
200                    }
201            }
202    
203            public Channel getChannel(long userId) throws ChannelException {
204                    return getChannel(userId, false);
205            }
206    
207            public Channel getChannel(long userId, boolean createIfAbsent)
208                    throws ChannelException {
209    
210                    Channel channel = fetchChannel(userId, createIfAbsent);
211    
212                    if (channel == null) {
213                            throw new UnknownChannelException(
214                                    "No channel exists with user id " + userId);
215                    }
216    
217                    return channel;
218            }
219    
220            public long getCompanyId() {
221                    return _companyId;
222            }
223    
224            public List<NotificationEvent> getNotificationEvents(long userId)
225                    throws ChannelException {
226    
227                    return getNotificationEvents(userId, false);
228            }
229    
230            public List<NotificationEvent> getNotificationEvents(
231                            long userId, boolean flush)
232                    throws ChannelException {
233    
234                    Channel channel = getChannel(userId);
235    
236                    return channel.getNotificationEvents(flush);
237            }
238    
239            public Collection<Long> getUserIds() {
240                    return Collections.unmodifiableSet(_channels.keySet());
241            }
242    
243            public void registerChannelListener(
244                            long userId, ChannelListener channelListener)
245                    throws ChannelException {
246    
247                    Channel channel = getChannel(userId);
248    
249                    channel.registerChannelListener(channelListener);
250            }
251    
252            public void removeTransientNotificationEvents(
253                            long userId, Collection<NotificationEvent> notificationEvents)
254                    throws ChannelException {
255    
256                    Channel channel = fetchChannel(userId);
257    
258                    if (channel != null) {
259                            channel.removeTransientNotificationEvents(notificationEvents);
260                    }
261            }
262    
263            public void removeTransientNotificationEventsByUuid(
264                            long userId, Collection<String> notificationEventUuids)
265                    throws ChannelException {
266    
267                    Channel channel = fetchChannel(userId);
268    
269                    if (channel != null) {
270                            channel.removeTransientNotificationEventsByUuid(
271                                    notificationEventUuids);
272                    }
273            }
274    
275            public void sendNotificationEvent(
276                            long userId, NotificationEvent notificationEvent)
277                    throws ChannelException {
278    
279                    Channel channel = fetchChannel(userId);
280    
281                    if (channel != null) {
282                            channel.sendNotificationEvent(notificationEvent);
283    
284                            return;
285                    }
286    
287                    if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED ||
288                            !notificationEvent.isDeliveryRequired()) {
289    
290                            return;
291                    }
292    
293                    try {
294                            UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
295                                    userId, notificationEvent);
296                    }
297                    catch (Exception e) {
298                            throw new ChannelException("Unable to send event", e);
299                    }
300            }
301    
302            public void sendNotificationEvents(
303                            long userId, Collection<NotificationEvent> notificationEvents)
304                    throws ChannelException {
305    
306                    Channel channel = fetchChannel(userId);
307    
308                    if (channel != null) {
309                            channel.sendNotificationEvents(notificationEvents);
310    
311                            return;
312                    }
313    
314                    if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
315                            return;
316                    }
317    
318                    List<NotificationEvent> persistedNotificationEvents =
319                                    new ArrayList<NotificationEvent>(notificationEvents.size());
320    
321                    for (NotificationEvent notificationEvent : notificationEvents) {
322                            if (notificationEvent.isDeliveryRequired()) {
323                                    persistedNotificationEvents.add(notificationEvent);
324                            }
325                    }
326    
327                    try {
328                            UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
329                                    userId, persistedNotificationEvents);
330                    }
331                    catch (Exception e) {
332                            throw new ChannelException("Unable to send events", e);
333                    }
334            }
335    
336            public void setChannelPrototype(Channel channel) {
337                    _channel = channel;
338            }
339    
340            public void setCompanyId(long companyId) {
341                    _companyId = companyId;
342            }
343    
344            public void unregisterChannelListener(
345                            long userId, ChannelListener channelListener)
346                    throws ChannelException {
347    
348                    Channel channel = getChannel(userId);
349    
350                    channel.unregisterChannelListener(channelListener);
351            }
352    
353            private Channel _channel;
354            private ConcurrentMap<Long, Channel> _channels =
355                    new ConcurrentHashMap<Long, Channel>();
356            private long _companyId = CompanyConstants.SYSTEM;
357    
358    }