001
014
015 package com.liferay.portal.notifications;
016
017 import com.liferay.portal.kernel.exception.SystemException;
018 import com.liferay.portal.kernel.json.JSONException;
019 import com.liferay.portal.kernel.json.JSONFactoryUtil;
020 import com.liferay.portal.kernel.json.JSONObject;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.notifications.BaseChannelImpl;
024 import com.liferay.portal.kernel.notifications.Channel;
025 import com.liferay.portal.kernel.notifications.ChannelException;
026 import com.liferay.portal.kernel.notifications.NotificationEvent;
027 import com.liferay.portal.kernel.notifications.NotificationEventComparator;
028 import com.liferay.portal.kernel.notifications.NotificationEventFactoryUtil;
029 import com.liferay.portal.model.CompanyConstants;
030 import com.liferay.portal.model.UserNotificationEvent;
031 import com.liferay.portal.service.UserNotificationEventLocalServiceUtil;
032 import com.liferay.portal.util.PropsValues;
033
034 import java.util.ArrayList;
035 import java.util.Collection;
036 import java.util.Comparator;
037 import java.util.HashMap;
038 import java.util.HashSet;
039 import java.util.Iterator;
040 import java.util.List;
041 import java.util.Map;
042 import java.util.Set;
043 import java.util.TreeSet;
044 import java.util.concurrent.locks.ReentrantLock;
045
046
051 public class ChannelImpl extends BaseChannelImpl {
052
053 public ChannelImpl() {
054 this(CompanyConstants.SYSTEM, 0);
055 }
056
057 public ChannelImpl(long companyId, long usedId) {
058 super(companyId, usedId);
059 }
060
061 public Channel clone(long companyId, long userId) {
062 return new ChannelImpl(companyId, userId);
063 }
064
065 public void confirmDelivery(Collection<String> notificationEventUuids)
066 throws ChannelException {
067
068 confirmDelivery(notificationEventUuids, false);
069 }
070
071 public void confirmDelivery(
072 Collection<String> notificationEventUuids, boolean archive)
073 throws ChannelException {
074
075 _reentrantLock.lock();
076
077 try {
078 for (String notificationEventUuid : notificationEventUuids) {
079 Map<String, NotificationEvent> unconfirmedNotificationEvents =
080 _getUnconfirmedNotificationEvents();
081
082 unconfirmedNotificationEvents.remove(notificationEventUuid);
083 }
084
085 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
086 if (archive) {
087 UserNotificationEventLocalServiceUtil.
088 updateUserNotificationEvents(
089 notificationEventUuids, archive);
090 }
091 else {
092 UserNotificationEventLocalServiceUtil.
093 deleteUserNotificationEvents(notificationEventUuids);
094 }
095 }
096 }
097 catch (Exception e) {
098 throw new ChannelException(
099 "Unable to confirm delivery for user " + getUserId() , e);
100 }
101 finally {
102 _reentrantLock.unlock();
103 }
104 }
105
106 public void confirmDelivery(String notificationEventUuid)
107 throws ChannelException {
108
109 confirmDelivery(notificationEventUuid, false);
110 }
111
112 public void confirmDelivery(String notificationEventUuid, boolean archive)
113 throws ChannelException {
114
115 _reentrantLock.lock();
116
117 try {
118 Map<String, NotificationEvent> unconfirmedNotificationEvents =
119 _getUnconfirmedNotificationEvents();
120
121 unconfirmedNotificationEvents.remove(notificationEventUuid);
122
123 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
124 if (archive) {
125 UserNotificationEventLocalServiceUtil.
126 updateUserNotificationEvent(
127 notificationEventUuid, archive);
128 }
129 else {
130 UserNotificationEventLocalServiceUtil.
131 deleteUserNotificationEvent(notificationEventUuid);
132 }
133 }
134 }
135 catch (Exception e) {
136 throw new ChannelException(
137 "Uanble to confirm delivery for " + notificationEventUuid , e);
138 }
139 finally {
140 _reentrantLock.unlock();
141 }
142 }
143
144 public void deleteUserNotificiationEvent(String notificationEventUuid)
145 throws ChannelException {
146
147 try {
148 UserNotificationEventLocalServiceUtil.
149 deleteUserNotificationEvent(notificationEventUuid);
150 }
151 catch (Exception e) {
152 throw new ChannelException(
153 "Uanble to delete event " + notificationEventUuid , e);
154 }
155 }
156
157 public void deleteUserNotificiationEvents(
158 Collection<String> notificationEventUuids)
159 throws ChannelException {
160
161 try {
162 UserNotificationEventLocalServiceUtil.
163 deleteUserNotificationEvents(notificationEventUuids);
164 }
165 catch (Exception e) {
166 throw new ChannelException(
167 "Uanble to delete events for user " + getUserId() , e);
168 }
169 }
170
171 public void flush() {
172 _reentrantLock.lock();
173
174 try {
175 if (_notificationEvents != null) {
176 _notificationEvents.clear();
177 }
178 }
179 finally {
180 _reentrantLock.unlock();
181 }
182 }
183
184 public void flush(long timestamp) {
185 _reentrantLock.lock();
186
187 try {
188 if (_notificationEvents == null) {
189 return;
190 }
191
192 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
193
194 while (itr.hasNext()) {
195 NotificationEvent notificationEvent = itr.next();
196
197 if (notificationEvent.getTimestamp() < timestamp) {
198 itr.remove();
199 }
200 }
201 }
202 finally {
203 _reentrantLock.unlock();
204 }
205 }
206
207 public List<NotificationEvent> getNotificationEvents(boolean flush)
208 throws ChannelException {
209
210 _reentrantLock.lock();
211
212 try {
213 return doGetNotificationEvents(flush);
214 }
215 catch (ChannelException ce) {
216 throw ce;
217 }
218 catch (Exception e) {
219 throw new ChannelException(e);
220 }
221 finally {
222 _reentrantLock.unlock();
223 }
224 }
225
226 public void init() throws ChannelException {
227 _reentrantLock.lock();
228
229 try {
230 doInit();
231 }
232 catch (SystemException se) {
233 throw new ChannelException(
234 "Unable to init channel " + getUserId(), se);
235 }
236 finally {
237 _reentrantLock.unlock();
238 }
239 }
240
241 public void removeTransientNotificationEvents(
242 Collection<NotificationEvent> notificationEvents) {
243
244 _reentrantLock.lock();
245
246 try {
247 if (_notificationEvents != null) {
248 _notificationEvents.removeAll(notificationEvents);
249 }
250 }
251 finally {
252 _reentrantLock.unlock();
253 }
254 }
255
256 public void removeTransientNotificationEventsByUuid(
257 Collection<String> notificationEventUuids) {
258
259 Set<String> notificationEventUuidsSet = new HashSet<String>(
260 notificationEventUuids);
261
262 _reentrantLock.lock();
263
264 try {
265 if (_notificationEvents == null) {
266 return;
267 }
268
269 Iterator<NotificationEvent> itr = _notificationEvents.iterator();
270
271 while (itr.hasNext()) {
272 NotificationEvent notificationEvent = itr.next();
273
274 if (notificationEventUuidsSet.contains(
275 notificationEvent.getUuid())) {
276
277 itr.remove();
278 }
279 }
280 }
281 finally {
282 _reentrantLock.unlock();
283 }
284 }
285
286 public void sendNotificationEvent(NotificationEvent notificationEvent)
287 throws ChannelException {
288
289 _reentrantLock.lock();
290
291 try {
292 long currentTime = System.currentTimeMillis();
293
294 storeNotificationEvent(notificationEvent, currentTime);
295
296 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
297 notificationEvent.isDeliveryRequired()) {
298
299 UserNotificationEventLocalServiceUtil.addUserNotificationEvent(
300 getUserId(), notificationEvent);
301 }
302 }
303 catch (Exception e) {
304 throw new ChannelException("Unable to send event", e);
305 }
306 finally {
307 _reentrantLock.unlock();
308 }
309
310 notifyChannelListeners();
311 }
312
313 public void sendNotificationEvents(
314 Collection<NotificationEvent> notificationEvents)
315 throws ChannelException {
316
317 _reentrantLock.lock();
318
319 try {
320 long currentTime = System.currentTimeMillis();
321
322 List<NotificationEvent> persistedNotificationEvents =
323 new ArrayList<NotificationEvent>(notificationEvents.size());
324
325 for (NotificationEvent notificationEvent : notificationEvents) {
326 storeNotificationEvent(notificationEvent, currentTime);
327
328 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
329 notificationEvent.isDeliveryRequired()) {
330
331 persistedNotificationEvents.add(notificationEvent);
332 }
333 }
334
335 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
336 !persistedNotificationEvents.isEmpty()) {
337
338 UserNotificationEventLocalServiceUtil.addUserNotificationEvents(
339 getUserId(), persistedNotificationEvents);
340 }
341 }
342 catch (Exception e) {
343 throw new ChannelException("Unable to send event", e);
344 }
345 finally {
346 _reentrantLock.unlock();
347 }
348
349 notifyChannelListeners();
350 }
351
352 @Override
353 protected void doCleanUp() throws Exception {
354 _reentrantLock.lock();
355
356 try {
357 long currentTime = System.currentTimeMillis();
358
359 TreeSet<NotificationEvent> notificationEvents =
360 _getNotificationEvents();
361
362 Iterator<NotificationEvent> itr1 = notificationEvents.iterator();
363
364 while (itr1.hasNext()) {
365 NotificationEvent notificationEvent = itr1.next();
366
367 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
368 itr1.remove();
369 }
370 }
371
372 Map<String, NotificationEvent> unconfirmedNotificationEvents =
373 _getUnconfirmedNotificationEvents();
374
375 List<String> invalidNotificationEventUuids = new ArrayList<String>(
376 unconfirmedNotificationEvents.size());
377
378 Set<Map.Entry<String, NotificationEvent>>
379 unconfirmedNotificationEventsSet =
380 unconfirmedNotificationEvents.entrySet();
381
382 Iterator<Map.Entry<String, NotificationEvent>> itr2 =
383 unconfirmedNotificationEventsSet.iterator();
384
385 while (itr2.hasNext()) {
386 Map.Entry<String, NotificationEvent> entry = itr2.next();
387
388 NotificationEvent notificationEvent = entry.getValue();
389
390 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
391 invalidNotificationEventUuids.add(entry.getKey());
392
393 itr2.remove();
394 }
395 }
396
397 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
398 !invalidNotificationEventUuids.isEmpty()) {
399
400 UserNotificationEventLocalServiceUtil.
401 deleteUserNotificationEvents(invalidNotificationEventUuids);
402 }
403 }
404 catch (Exception e) {
405 throw new ChannelException(
406 "Unable to clean up channel " + getUserId(), e);
407 }
408 finally {
409 _reentrantLock.unlock();
410 }
411 }
412
413 protected List<NotificationEvent> doGetNotificationEvents(boolean flush)
414 throws Exception {
415
416 long currentTime = System.currentTimeMillis();
417
418 TreeSet<NotificationEvent> notificationEventsSet =
419 _getNotificationEvents();
420
421 Map<String, NotificationEvent> unconfirmedNotificationEvents =
422 _getUnconfirmedNotificationEvents();
423
424 List<NotificationEvent> notificationEvents =
425 new ArrayList<NotificationEvent>(
426 notificationEventsSet.size() +
427 unconfirmedNotificationEvents.size());
428
429 for (NotificationEvent notificationEvent : notificationEventsSet) {
430 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
431 break;
432 }
433 else {
434 notificationEvents.add(notificationEvent);
435 }
436 }
437
438 if (flush) {
439 notificationEventsSet.clear();
440 }
441 else if (notificationEventsSet.size() != notificationEvents.size()) {
442 notificationEventsSet.retainAll(notificationEvents);
443 }
444
445 List<String> invalidNotificationEventUuids = new ArrayList<String>(
446 unconfirmedNotificationEvents.size());
447
448 Set<Map.Entry<String, NotificationEvent>>
449 unconfirmedNotificationEventsSet =
450 unconfirmedNotificationEvents.entrySet();
451
452 Iterator<Map.Entry<String, NotificationEvent>> itr =
453 unconfirmedNotificationEventsSet.iterator();
454
455 while (itr.hasNext()) {
456 Map.Entry<String, NotificationEvent> entry = itr.next();
457
458 NotificationEvent notificationEvent = entry.getValue();
459
460 if (isRemoveNotificationEvent(notificationEvent, currentTime) &&
461 !notificationEvent.isArchived()) {
462
463 invalidNotificationEventUuids.add(notificationEvent.getUuid());
464
465 itr.remove();
466 }
467 else {
468 notificationEvents.add(entry.getValue());
469 }
470 }
471
472 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
473 !invalidNotificationEventUuids.isEmpty()) {
474
475 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
476 invalidNotificationEventUuids);
477 }
478
479 return notificationEvents;
480 }
481
482 protected void doInit() throws SystemException {
483 if (!PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED) {
484 return;
485 }
486
487 List<UserNotificationEvent> userNotificationEvents =
488 UserNotificationEventLocalServiceUtil.getUserNotificationEvents(
489 getUserId(), false);
490
491 Map<String, NotificationEvent> unconfirmedNotificationEvents =
492 _getUnconfirmedNotificationEvents();
493
494 List<String> invalidNotificationEventUuids = new ArrayList<String>(
495 unconfirmedNotificationEvents.size());
496
497 long currentTime = System.currentTimeMillis();
498
499 for (UserNotificationEvent persistedNotificationEvent :
500 userNotificationEvents) {
501
502 try {
503 JSONObject payloadJSONObject = JSONFactoryUtil.createJSONObject(
504 persistedNotificationEvent.getPayload());
505
506 NotificationEvent notificationEvent =
507 NotificationEventFactoryUtil.createNotificationEvent(
508 persistedNotificationEvent.getTimestamp(),
509 persistedNotificationEvent.getType(),
510 payloadJSONObject);
511
512 notificationEvent.setDeliveryRequired(
513 persistedNotificationEvent.getDeliverBy());
514
515 notificationEvent.setUuid(persistedNotificationEvent.getUuid());
516
517 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
518 invalidNotificationEventUuids.add(
519 notificationEvent.getUuid());
520 }
521 else {
522 unconfirmedNotificationEvents.put(
523 notificationEvent.getUuid(), notificationEvent);
524 }
525 }
526 catch (JSONException jsone) {
527 _log.error(jsone, jsone);
528
529 invalidNotificationEventUuids.add(
530 persistedNotificationEvent.getUuid());
531 }
532 }
533
534 if (!invalidNotificationEventUuids.isEmpty()) {
535 UserNotificationEventLocalServiceUtil.deleteUserNotificationEvents(
536 invalidNotificationEventUuids);
537 }
538 }
539
540 protected boolean isRemoveNotificationEvent(
541 NotificationEvent notificationEvent, long currentTime) {
542
543 if ((notificationEvent.getDeliverBy() != 0) &&
544 (notificationEvent.getDeliverBy() <= currentTime)) {
545
546 return true;
547 }
548 else {
549 return false;
550 }
551 }
552
553 protected void storeNotificationEvent(
554 NotificationEvent notificationEvent, long currentTime) {
555
556 if (isRemoveNotificationEvent(notificationEvent, currentTime)) {
557 return;
558 }
559
560 if (PropsValues.USER_NOTIFICATION_EVENT_CONFIRMATION_ENABLED &&
561 notificationEvent.isDeliveryRequired()) {
562
563 Map<String, NotificationEvent> unconfirmedNotificationEvents =
564 _getUnconfirmedNotificationEvents();
565
566 unconfirmedNotificationEvents.put(
567 notificationEvent.getUuid(), notificationEvent);
568 }
569 else {
570 TreeSet<NotificationEvent> notificationEvents =
571 _getNotificationEvents();
572
573 notificationEvents.add(notificationEvent);
574
575 if (notificationEvents.size() >
576 PropsValues.NOTIFICATIONS_MAX_EVENTS) {
577
578 NotificationEvent firstNotificationEvent =
579 notificationEvents.first();
580
581 notificationEvents.remove(firstNotificationEvent);
582 }
583 }
584 }
585
586 private TreeSet<NotificationEvent> _getNotificationEvents() {
587 if (_notificationEvents == null) {
588 _notificationEvents = new TreeSet<NotificationEvent>(_comparator);
589 }
590
591 return _notificationEvents;
592 }
593
594 private Map<String, NotificationEvent> _getUnconfirmedNotificationEvents() {
595 if (_unconfirmedNotificationEvents == null) {
596 _unconfirmedNotificationEvents =
597 new HashMap<String, NotificationEvent>();
598 }
599
600 return _unconfirmedNotificationEvents;
601 }
602
603 private static Log _log = LogFactoryUtil.getLog(ChannelImpl.class);
604
605 private static Comparator<NotificationEvent> _comparator =
606 new NotificationEventComparator();
607
608 private TreeSet<NotificationEvent> _notificationEvents;
609 private ReentrantLock _reentrantLock = new ReentrantLock();
610 private Map<String, NotificationEvent> _unconfirmedNotificationEvents;
611
612 }