001
014
015 package com.liferay.portal.scheduler;
016
017 import com.liferay.portal.kernel.bean.BeanReference;
018 import com.liferay.portal.kernel.bean.IdentifiableBean;
019 import com.liferay.portal.kernel.cluster.Address;
020 import com.liferay.portal.kernel.cluster.ClusterEvent;
021 import com.liferay.portal.kernel.cluster.ClusterEventListener;
022 import com.liferay.portal.kernel.cluster.ClusterEventType;
023 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.Clusterable;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
030 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.messaging.Message;
034 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035 import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
036 import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
037 import com.liferay.portal.kernel.scheduler.SchedulerException;
038 import com.liferay.portal.kernel.scheduler.StorageType;
039 import com.liferay.portal.kernel.scheduler.Trigger;
040 import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
041 import com.liferay.portal.kernel.scheduler.TriggerState;
042 import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043 import com.liferay.portal.kernel.util.Base64;
044 import com.liferay.portal.kernel.util.CharPool;
045 import com.liferay.portal.kernel.util.MethodHandler;
046 import com.liferay.portal.kernel.util.MethodKey;
047 import com.liferay.portal.kernel.util.ObjectValuePair;
048 import com.liferay.portal.kernel.util.StringPool;
049 import com.liferay.portal.messaging.proxy.ProxyModeThreadLocal;
050 import com.liferay.portal.model.Lock;
051 import com.liferay.portal.service.LockLocalServiceUtil;
052 import com.liferay.portal.util.PropsValues;
053
054 import java.io.ObjectInputStream;
055 import java.io.ObjectOutputStream;
056
057 import java.util.Collections;
058 import java.util.Iterator;
059 import java.util.List;
060 import java.util.Map;
061 import java.util.Set;
062 import java.util.concurrent.ConcurrentHashMap;
063 import java.util.concurrent.TimeUnit;
064 import java.util.concurrent.locks.ReadWriteLock;
065 import java.util.concurrent.locks.ReentrantReadWriteLock;
066
067
070 public class ClusterSchedulerEngine
071 implements IdentifiableBean, SchedulerEngine,
072 SchedulerEngineClusterManager {
073
074 public static SchedulerEngine createClusterSchedulerEngine(
075 SchedulerEngine schedulerEngine) {
076
077 if (PropsValues.CLUSTER_LINK_ENABLED) {
078 schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
079 }
080
081 return schedulerEngine;
082 }
083
084 public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
085 _schedulerEngine = schedulerEngine;
086 }
087
088 @Clusterable
089 public void delete(String groupName) throws SchedulerException {
090 if (!PropsValues.SCHEDULER_ENABLED) {
091 return;
092 }
093
094 try {
095 if (isMemorySchedulerSlave(groupName)) {
096 removeMemoryClusteredJobs(groupName);
097
098 return;
099 }
100 }
101 catch (Exception e) {
102 throw new SchedulerException(
103 "Unable to delete jobs in group " + groupName, e);
104 }
105
106 _readLock.lock();
107
108 try {
109 _schedulerEngine.delete(groupName);
110 }
111 finally {
112 _readLock.unlock();
113 }
114
115 skipClusterInvoking(groupName);
116 }
117
118 @Clusterable
119 public void delete(String jobName, String groupName)
120 throws SchedulerException {
121
122 if (!PropsValues.SCHEDULER_ENABLED) {
123 return;
124 }
125
126 try {
127 if (isMemorySchedulerSlave(groupName)) {
128 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
129
130 return;
131 }
132 }
133 catch (Exception e) {
134 throw new SchedulerException(
135 "Unable to delete job {jobName=" + jobName + ", groupName=" +
136 groupName + "}",
137 e);
138 }
139
140 _readLock.lock();
141
142 try {
143 _schedulerEngine.delete(jobName, groupName);
144 }
145 finally {
146 _readLock.unlock();
147 }
148
149 skipClusterInvoking(groupName);
150 }
151
152 public String getBeanIdentifier() {
153 return _beanIdentifier;
154 }
155
156 public SchedulerResponse getScheduledJob(String jobName, String groupName)
157 throws SchedulerException {
158
159 if (!PropsValues.SCHEDULER_ENABLED) {
160 return null;
161 }
162
163 try {
164 if (isMemorySchedulerSlave(groupName)) {
165 return (SchedulerResponse)callMaster(
166 _getScheduledJobMethodKey, jobName, groupName);
167 }
168 }
169 catch (Exception e) {
170 throw new SchedulerException(
171 "Unable to get job {jobName=" + jobName + ", groupName=" +
172 groupName + "}",
173 e);
174 }
175
176 _readLock.lock();
177
178 try {
179 return _schedulerEngine.getScheduledJob(jobName, groupName);
180 }
181 finally {
182 _readLock.unlock();
183 }
184 }
185
186 public List<SchedulerResponse> getScheduledJobs()
187 throws SchedulerException {
188
189 if (!PropsValues.SCHEDULER_ENABLED) {
190 return Collections.emptyList();
191 }
192
193 try {
194 if (isMemorySchedulerSlave()) {
195 return (List<SchedulerResponse>)callMaster(
196 _getScheduledJobsMethodKey1);
197 }
198 }
199 catch (Exception e) {
200 throw new SchedulerException("Unable to get jobs", e);
201 }
202
203 _readLock.lock();
204
205 try {
206 return _schedulerEngine.getScheduledJobs();
207 }
208 finally {
209 _readLock.unlock();
210 }
211 }
212
213 public List<SchedulerResponse> getScheduledJobs(String groupName)
214 throws SchedulerException {
215
216 if (!PropsValues.SCHEDULER_ENABLED) {
217 return Collections.emptyList();
218 }
219
220 try {
221 if (isMemorySchedulerSlave(groupName)) {
222 return (List<SchedulerResponse>)callMaster(
223 _getScheduledJobsMethodKey2, groupName);
224 }
225 }
226 catch (Exception e) {
227 throw new SchedulerException(
228 "Unable to get jobs in group " + groupName, e);
229 }
230
231 _readLock.lock();
232
233 try {
234 return _schedulerEngine.getScheduledJobs(groupName);
235 }
236 finally {
237 _readLock.unlock();
238 }
239 }
240
241 public void initialize() throws SchedulerException {
242 if (!PropsValues.SCHEDULER_ENABLED) {
243 return;
244 }
245
246 try {
247 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
248
249 _readLock = readWriteLock.readLock();
250 _writeLock = readWriteLock.writeLock();
251
252 _localClusterNodeAddress = getSerializedString(
253 ClusterExecutorUtil.getLocalClusterNodeAddress());
254
255 _clusterEventListener = new MemorySchedulerClusterEventListener();
256
257 ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
258
259 if (!isMemorySchedulerClusterLockOwner(
260 lockMemorySchedulerCluster(null))) {
261
262 initMemoryClusteredJobs();
263 }
264 }
265 catch (Exception e) {
266 throw new SchedulerException("Unable to initialize scheduler", e);
267 }
268 }
269
270 @Clusterable
271 public void pause(String groupName) throws SchedulerException {
272 if (!PropsValues.SCHEDULER_ENABLED) {
273 return;
274 }
275
276 try {
277 if (isMemorySchedulerSlave(groupName)) {
278 updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
279
280 return;
281 }
282 }
283 catch (Exception e) {
284 throw new SchedulerException(
285 "Unable to pause jobs in group " + groupName, e);
286 }
287
288 _readLock.lock();
289
290 try {
291 _schedulerEngine.pause(groupName);
292 }
293 finally {
294 _readLock.unlock();
295 }
296
297 skipClusterInvoking(groupName);
298 }
299
300 @Clusterable
301 public void pause(String jobName, String groupName)
302 throws SchedulerException {
303
304 if (!PropsValues.SCHEDULER_ENABLED) {
305 return;
306 }
307
308 try {
309 if (isMemorySchedulerSlave(groupName)) {
310 updateMemoryClusteredJob(
311 jobName, groupName, TriggerState.PAUSED);
312
313 return;
314 }
315 }
316 catch (Exception e) {
317 throw new SchedulerException(
318 "Unable to pause job {jobName=" + jobName + ", groupName=" +
319 groupName + "}",
320 e);
321 }
322
323 _readLock.lock();
324
325 try {
326 _schedulerEngine.pause(jobName, groupName);
327 }
328 finally {
329 _readLock.unlock();
330 }
331
332 skipClusterInvoking(groupName);
333 }
334
335 @Clusterable
336 public void resume(String groupName) throws SchedulerException {
337 if (!PropsValues.SCHEDULER_ENABLED) {
338 return;
339 }
340
341 try {
342 if (isMemorySchedulerSlave(groupName)) {
343 updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
344
345 return;
346 }
347 }
348 catch (Exception e) {
349 throw new SchedulerException(
350 "Unable to resume jobs in group " + groupName, e);
351 }
352
353 _readLock.lock();
354
355 try {
356 _schedulerEngine.resume(groupName);
357 }
358 finally {
359 _readLock.unlock();
360 }
361
362 skipClusterInvoking(groupName);
363 }
364
365 @Clusterable
366 public void resume(String jobName, String groupName)
367 throws SchedulerException {
368
369 if (!PropsValues.SCHEDULER_ENABLED) {
370 return;
371 }
372
373 try {
374 if (isMemorySchedulerSlave(groupName)) {
375 updateMemoryClusteredJob(
376 jobName, groupName, TriggerState.NORMAL);
377
378 return;
379 }
380 }
381 catch (Exception e) {
382 throw new SchedulerException(
383 "Unable to resume job {jobName=" + jobName + ", groupName=" +
384 groupName + "}",
385 e);
386 }
387
388 _readLock.lock();
389
390 try {
391 _schedulerEngine.resume(jobName, groupName);
392 }
393 finally {
394 _readLock.unlock();
395 }
396
397 skipClusterInvoking(groupName);
398 }
399
400 @Clusterable
401 public void schedule(
402 Trigger trigger, String description, String destinationName,
403 Message message)
404 throws SchedulerException {
405
406 if (!PropsValues.SCHEDULER_ENABLED) {
407 return;
408 }
409
410 String groupName = trigger.getGroupName();
411 String jobName = trigger.getJobName();
412
413 try {
414 if (isMemorySchedulerSlave(groupName)) {
415 SchedulerResponse schedulerResponse = new SchedulerResponse();
416
417 schedulerResponse.setDescription(description);
418 schedulerResponse.setDestinationName(destinationName);
419 schedulerResponse.setGroupName(groupName);
420 schedulerResponse.setJobName(jobName);
421 schedulerResponse.setMessage(message);
422 schedulerResponse.setTrigger(trigger);
423
424 _memoryClusteredJobs.put(
425 getFullName(jobName, groupName),
426 new ObjectValuePair<SchedulerResponse, TriggerState>(
427 schedulerResponse, TriggerState.NORMAL));
428
429 return;
430 }
431 }
432 catch (Exception e) {
433 throw new SchedulerException(
434 "Unable to schedule job {jobName=" + jobName + ", groupName=" +
435 groupName + "}",
436 e);
437 }
438
439 _readLock.lock();
440
441 try {
442 _schedulerEngine.schedule(
443 trigger, description, destinationName, message);
444 }
445 finally {
446 _readLock.unlock();
447 }
448
449 skipClusterInvoking(groupName);
450 }
451
452 public void setBeanIdentifier(String beanIdentifier) {
453 _beanIdentifier = beanIdentifier;
454 }
455
456 public void shutdown() throws SchedulerException {
457 if (!PropsValues.SCHEDULER_ENABLED) {
458 return;
459 }
460
461 try {
462 ClusterExecutorUtil.removeClusterEventListener(
463 _clusterEventListener);
464
465 LockLocalServiceUtil.unlock(
466 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
467 PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
468 }
469 catch (Exception e) {
470 throw new SchedulerException("Unable to shutdown scheduler", e);
471 }
472
473 _schedulerEngine.shutdown();
474 }
475
476 public void start() throws SchedulerException {
477 if (!PropsValues.SCHEDULER_ENABLED) {
478 return;
479 }
480
481 _schedulerEngine.start();
482 }
483
484 @Clusterable
485 public void suppressError(String jobName, String groupName)
486 throws SchedulerException {
487
488 if (!PropsValues.SCHEDULER_ENABLED) {
489 return;
490 }
491
492 try {
493 if (isMemorySchedulerSlave(groupName)) {
494 return;
495 }
496 }
497 catch (Exception e) {
498 throw new SchedulerException(
499 "Unable to suppress error for job {jobName=" + jobName +
500 ", groupName=" + groupName + "}",
501 e);
502 }
503
504 _readLock.lock();
505
506 try {
507 _schedulerEngine.suppressError(jobName, groupName);
508 }
509 finally {
510 _readLock.unlock();
511 }
512
513 skipClusterInvoking(groupName);
514 }
515
516 @Clusterable
517 public void unschedule(String groupName) throws SchedulerException {
518 if (!PropsValues.SCHEDULER_ENABLED) {
519 return;
520 }
521
522 try {
523 if (isMemorySchedulerSlave(groupName)) {
524 removeMemoryClusteredJobs(groupName);
525
526 return;
527 }
528 }
529 catch (Exception e) {
530 throw new SchedulerException(
531 "Unable to unschedule jobs in group " + groupName, e);
532 }
533
534 _readLock.lock();
535
536 try {
537 _schedulerEngine.unschedule(groupName);
538 }
539 finally {
540 _readLock.unlock();
541 }
542
543 skipClusterInvoking(groupName);
544 }
545
546 @Clusterable
547 public void unschedule(String jobName, String groupName)
548 throws SchedulerException {
549
550 if (!PropsValues.SCHEDULER_ENABLED) {
551 return;
552 }
553
554 try {
555 if (isMemorySchedulerSlave(groupName)) {
556 _memoryClusteredJobs.remove(getFullName(jobName, groupName));
557
558 return;
559 }
560 }
561 catch (Exception e) {
562 throw new SchedulerException(
563 "Unable to unschedule job {jobName=" + jobName +
564 ", groupName=" + groupName + "}",
565 e);
566 }
567
568 _readLock.lock();
569
570 try {
571 _schedulerEngine.unschedule(jobName, groupName);
572 }
573 finally {
574 _readLock.unlock();
575 }
576
577 skipClusterInvoking(groupName);
578 }
579
580 @Clusterable
581 public void update(Trigger trigger) throws SchedulerException {
582 if (!PropsValues.SCHEDULER_ENABLED) {
583 return;
584 }
585
586 String jobName = trigger.getJobName();
587 String groupName = trigger.getGroupName();
588
589 try {
590 if (isMemorySchedulerSlave(groupName)) {
591 for (ObjectValuePair<SchedulerResponse, TriggerState>
592 memoryClusteredJob : _memoryClusteredJobs.values()) {
593
594 SchedulerResponse schedulerResponse =
595 memoryClusteredJob.getKey();
596
597 if (jobName.equals(schedulerResponse.getJobName()) &&
598 groupName.equals(schedulerResponse.getGroupName())) {
599
600 schedulerResponse.setTrigger(trigger);
601
602 return;
603 }
604 }
605
606 throw new Exception(
607 "Unable to update trigger for memory clustered job");
608 }
609 }
610 catch (Exception e) {
611 throw new SchedulerException(
612 "Unable to update job {jobName=" + jobName + ", groupName=" +
613 groupName + "}",
614 e);
615 }
616
617 _readLock.lock();
618
619 try {
620 _schedulerEngine.update(trigger);
621 }
622 finally {
623 _readLock.unlock();
624 }
625
626 skipClusterInvoking(groupName);
627 }
628
629 public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
630 try {
631 Lock lock = lockMemorySchedulerCluster(null);
632
633 Address address = (Address)getDeserializedObject(lock.getOwner());
634
635 if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
636 return lock;
637 }
638
639 return lockMemorySchedulerCluster(lock.getOwner());
640 }
641 catch (Exception e) {
642 throw new SchedulerException(
643 "Unable to update memory scheduler cluster master", e);
644 }
645 }
646
647 protected Object callMaster(MethodKey methodKey, Object... arguments)
648 throws Exception {
649
650 MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
651
652 Lock lock = updateMemorySchedulerClusterMaster();
653
654 Address address = (Address)getDeserializedObject(lock.getOwner());
655
656 if (address.equals(ClusterExecutorUtil.getLocalClusterNodeAddress())) {
657 if (methodKey == _getScheduledJobsMethodKey3) {
658 return methodHandler.invoke(false);
659 }
660 else {
661 return methodHandler.invoke(schedulerEngine);
662 }
663 }
664
665 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
666 methodHandler, address);
667
668 clusterRequest.setBeanIdentifier(_beanIdentifier);
669
670 FutureClusterResponses futureClusterResponses =
671 ClusterExecutorUtil.execute(clusterRequest);
672
673 try {
674 ClusterNodeResponses clusterNodeResponses =
675 futureClusterResponses.get(20, TimeUnit.SECONDS);
676
677 ClusterNodeResponse clusterNodeResponse =
678 clusterNodeResponses.getClusterResponse(address);
679
680 return clusterNodeResponse.getResult();
681 }
682 catch (Exception e) {
683 throw new SchedulerException(
684 "Unable to load scheduled jobs from cluster node " +
685 address.getDescription(),
686 e);
687 }
688 }
689
690 protected Object getDeserializedObject(String string) throws Exception {
691 byte[] bytes = Base64.decode(string);
692
693 UnsyncByteArrayInputStream byteArrayInputStream =
694 new UnsyncByteArrayInputStream(bytes);
695
696 ObjectInputStream objectInputStream = new ObjectInputStream(
697 byteArrayInputStream);
698
699 Object object = objectInputStream.readObject();
700
701 objectInputStream.close();
702
703 return object;
704 }
705
706 protected String getFullName(String jobName, String groupName) {
707 return groupName.concat(StringPool.PERIOD).concat(jobName);
708 }
709
710 protected String getSerializedString(Object object) throws Exception {
711 UnsyncByteArrayOutputStream byteArrayOutputStream =
712 new UnsyncByteArrayOutputStream();
713
714 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
715 byteArrayOutputStream);
716
717 objectOutputStream.writeObject(object);
718 objectOutputStream.close();
719
720 byte[] bytes = byteArrayOutputStream.toByteArray();
721
722 return Base64.encode(bytes);
723 }
724
725 protected StorageType getStorageType(String groupName) {
726 int pos = groupName.indexOf(CharPool.POUND);
727
728 String storageTypeString = groupName.substring(0, pos);
729
730 return StorageType.valueOf(storageTypeString);
731 }
732
733 protected void initMemoryClusteredJobs() throws Exception {
734 List<SchedulerResponse> schedulerResponses =
735 (List<SchedulerResponse>)callMaster(
736 _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
737
738 for (SchedulerResponse schedulerResponse : schedulerResponses) {
739 Trigger oldTrigger = schedulerResponse.getTrigger();
740
741 String jobName = schedulerResponse.getJobName();
742 String groupName = SchedulerEngineUtil.namespaceGroupName(
743 schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
744
745 Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
746 oldTrigger.getTriggerType(), jobName, groupName,
747 oldTrigger.getStartDate(), oldTrigger.getEndDate(),
748 oldTrigger.getTriggerContent());
749
750 schedulerResponse.setTrigger(newTrigger);
751
752 TriggerState triggerState = SchedulerEngineUtil.getJobState(
753 schedulerResponse);
754
755 Message message = schedulerResponse.getMessage();
756
757 message.remove(JOB_STATE);
758
759 _memoryClusteredJobs.put(
760 getFullName(jobName, groupName),
761 new ObjectValuePair<SchedulerResponse, TriggerState>(
762 schedulerResponse, triggerState));
763 }
764 }
765
766 protected boolean isMemorySchedulerClusterLockOwner(Lock lock)
767 throws Exception {
768
769 boolean master = _localClusterNodeAddress.equals(lock.getOwner());
770
771 if (master == _master) {
772 return master;
773 }
774
775 if (!_master) {
776 _master = master;
777
778 return _master;
779 }
780
781 _localClusterNodeAddress = getSerializedString(
782 ClusterExecutorUtil.getLocalClusterNodeAddress());
783
784 for (ObjectValuePair<SchedulerResponse, TriggerState>
785 memoryClusteredJob : _memoryClusteredJobs.values()) {
786
787 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
788
789 _schedulerEngine.delete(
790 schedulerResponse.getJobName(),
791 schedulerResponse.getGroupName());
792 }
793
794 initMemoryClusteredJobs();
795
796 if (_log.isInfoEnabled()) {
797 _log.info("Another node is now the memory scheduler master");
798 }
799
800 _master = master;
801
802 return master;
803 }
804
805 protected boolean isMemorySchedulerSlave() throws Exception {
806 return isMemorySchedulerSlave(null);
807 }
808
809 protected boolean isMemorySchedulerSlave(String groupName)
810 throws Exception {
811
812 if (groupName != null) {
813 StorageType storageType = getStorageType(groupName);
814
815 if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
816 return false;
817 }
818 }
819
820 Lock lock = lockMemorySchedulerCluster(null);
821
822 if (isMemorySchedulerClusterLockOwner(lock)) {
823 return false;
824 }
825
826 return true;
827 }
828
829 protected Lock lockMemorySchedulerCluster(String owner) throws Exception {
830 Lock lock = null;
831
832 while (true) {
833 try {
834 if (owner == null) {
835 lock = LockLocalServiceUtil.lock(
836 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
837 _localClusterNodeAddress,
838 PropsValues.
839 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
840 }
841 else {
842 lock = LockLocalServiceUtil.lock(
843 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
844 _localClusterNodeAddress,
845 PropsValues.
846 MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
847 }
848
849 break;
850 }
851 catch (Exception e) {
852 if (_log.isWarnEnabled()) {
853 _log.warn(
854 "Unable to obtain memory scheduler cluster lock. " +
855 "Trying again.");
856 }
857 }
858 }
859
860 if (!lock.isNew()) {
861 return lock;
862 }
863
864 boolean forceSync = ProxyModeThreadLocal.isForceSync();
865
866 ProxyModeThreadLocal.setForceSync(true);
867
868 _writeLock.lock();
869
870 try {
871 for (ObjectValuePair<SchedulerResponse, TriggerState>
872 memoryClusteredJob : _memoryClusteredJobs.values()) {
873
874 SchedulerResponse schedulerResponse =
875 memoryClusteredJob.getKey();
876
877 _schedulerEngine.schedule(
878 schedulerResponse.getTrigger(),
879 schedulerResponse.getDescription(),
880 schedulerResponse.getDestinationName(),
881 schedulerResponse.getMessage());
882
883 TriggerState triggerState = memoryClusteredJob.getValue();
884
885 if (triggerState.equals(TriggerState.PAUSED)) {
886 _schedulerEngine.pause(
887 schedulerResponse.getJobName(),
888 schedulerResponse.getGroupName());
889 }
890 }
891 }
892 finally {
893 ProxyModeThreadLocal.setForceSync(forceSync);
894
895 _writeLock.unlock();
896 }
897
898 return lock;
899 }
900
901 protected void removeMemoryClusteredJobs(String groupName) {
902 Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
903 memoryClusteredJobs = _memoryClusteredJobs.entrySet();
904
905 Iterator
906 <Map.Entry<String,
907 ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
908 memoryClusteredJobs.iterator();
909
910 while (itr.hasNext()) {
911 Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
912 entry = itr.next();
913
914 ObjectValuePair<SchedulerResponse, TriggerState>
915 memoryClusteredJob = entry.getValue();
916
917 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
918
919 if (groupName.equals(schedulerResponse.getGroupName())) {
920 itr.remove();
921 }
922 }
923 }
924
925 protected void skipClusterInvoking(String groupName)
926 throws SchedulerException {
927
928 StorageType storageType = getStorageType(groupName);
929
930 if (storageType.equals(StorageType.PERSISTED)) {
931 SchedulerException schedulerException = new SchedulerException();
932
933 schedulerException.setSwallowable(true);
934
935 throw schedulerException;
936 }
937 }
938
939 protected void updateMemoryClusteredJob(
940 String jobName, String groupName, TriggerState triggerState) {
941
942 ObjectValuePair<SchedulerResponse, TriggerState>
943 memoryClusteredJob = _memoryClusteredJobs.get(
944 getFullName(jobName, groupName));
945
946 if (memoryClusteredJob != null) {
947 memoryClusteredJob.setValue(triggerState);
948 }
949 }
950
951 protected void updateMemoryClusteredJobs(
952 String groupName, TriggerState triggerState) {
953
954 for (ObjectValuePair<SchedulerResponse, TriggerState>
955 memoryClusteredJob : _memoryClusteredJobs.values()) {
956
957 SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
958
959 if (groupName.equals(schedulerResponse.getGroupName())) {
960 memoryClusteredJob.setValue(triggerState);
961 }
962 }
963 }
964
965 @BeanReference(
966 name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
967 protected SchedulerEngine schedulerEngine;
968
969 private static final String _LOCK_CLASS_NAME =
970 SchedulerEngine.class.getName();
971
972 private static Log _log = LogFactoryUtil.getLog(
973 ClusterSchedulerEngine.class);
974
975 private static MethodKey _getScheduledJobMethodKey = new MethodKey(
976 SchedulerEngine.class.getName(), "getScheduledJob", String.class,
977 String.class);
978 private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
979 SchedulerEngine.class.getName(), "getScheduledJobs");
980 private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
981 SchedulerEngine.class.getName(), "getScheduledJobs", String.class);
982 private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
983 SchedulerEngineUtil.class.getName(), "getScheduledJobs",
984 StorageType.class);
985
986 private String _beanIdentifier;
987 private ClusterEventListener _clusterEventListener;
988 private volatile String _localClusterNodeAddress;
989 private volatile boolean _master;
990 private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
991 _memoryClusteredJobs = new ConcurrentHashMap
992 <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
993 private java.util.concurrent.locks.Lock _readLock;
994 private SchedulerEngine _schedulerEngine;
995 private java.util.concurrent.locks.Lock _writeLock;
996
997 private class MemorySchedulerClusterEventListener
998 implements ClusterEventListener {
999
1000 public void processClusterEvent(ClusterEvent clusterEvent) {
1001 ClusterEventType clusterEventType =
1002 clusterEvent.getClusterEventType();
1003
1004 if (!clusterEventType.equals(ClusterEventType.DEPART)) {
1005 return;
1006 }
1007
1008 try {
1009 updateMemorySchedulerClusterMaster();
1010 }
1011 catch (Exception e) {
1012 _log.error("Unable to update memory scheduler cluster lock", e);
1013 }
1014 }
1015
1016 }
1017
1018 }