001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.scheduler;
016    
017    import com.liferay.portal.kernel.bean.BeanReference;
018    import com.liferay.portal.kernel.bean.IdentifiableBean;
019    import com.liferay.portal.kernel.cluster.Address;
020    import com.liferay.portal.kernel.cluster.ClusterEvent;
021    import com.liferay.portal.kernel.cluster.ClusterEventListener;
022    import com.liferay.portal.kernel.cluster.ClusterEventType;
023    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.Clusterable;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayInputStream;
030    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.messaging.Message;
034    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
035    import com.liferay.portal.kernel.scheduler.SchedulerEngineClusterManager;
036    import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
037    import com.liferay.portal.kernel.scheduler.SchedulerException;
038    import com.liferay.portal.kernel.scheduler.StorageType;
039    import com.liferay.portal.kernel.scheduler.Trigger;
040    import com.liferay.portal.kernel.scheduler.TriggerFactoryUtil;
041    import com.liferay.portal.kernel.scheduler.TriggerState;
042    import com.liferay.portal.kernel.scheduler.messaging.SchedulerResponse;
043    import com.liferay.portal.kernel.util.Base64;
044    import com.liferay.portal.kernel.util.CharPool;
045    import com.liferay.portal.kernel.util.MethodHandler;
046    import com.liferay.portal.kernel.util.MethodKey;
047    import com.liferay.portal.kernel.util.ObjectValuePair;
048    import com.liferay.portal.kernel.util.StringPool;
049    import com.liferay.portal.messaging.proxy.ProxyModeThreadLocal;
050    import com.liferay.portal.model.Lock;
051    import com.liferay.portal.service.LockLocalServiceUtil;
052    import com.liferay.portal.util.PropsValues;
053    
054    import java.io.ObjectInputStream;
055    import java.io.ObjectOutputStream;
056    
057    import java.util.Collections;
058    import java.util.Iterator;
059    import java.util.List;
060    import java.util.Map;
061    import java.util.Set;
062    import java.util.concurrent.ConcurrentHashMap;
063    import java.util.concurrent.TimeUnit;
064    import java.util.concurrent.locks.ReadWriteLock;
065    import java.util.concurrent.locks.ReentrantReadWriteLock;
066    
067    /**
068     * @author Tina Tian
069     */
070    public class ClusterSchedulerEngine
071            implements IdentifiableBean, SchedulerEngine,
072                               SchedulerEngineClusterManager {
073    
074            public static SchedulerEngine createClusterSchedulerEngine(
075                    SchedulerEngine schedulerEngine) {
076    
077                    if (PropsValues.CLUSTER_LINK_ENABLED) {
078                            schedulerEngine = new ClusterSchedulerEngine(schedulerEngine);
079                    }
080    
081                    return schedulerEngine;
082            }
083    
084            public ClusterSchedulerEngine(SchedulerEngine schedulerEngine) {
085                    _schedulerEngine = schedulerEngine;
086            }
087    
088            @Clusterable
089            public void delete(String groupName) throws SchedulerException {
090                    if (!PropsValues.SCHEDULER_ENABLED) {
091                            return;
092                    }
093    
094                    try {
095                            if (isMemorySchedulerSlave(groupName)) {
096                                    removeMemoryClusteredJobs(groupName);
097    
098                                    return;
099                            }
100                    }
101                    catch (Exception e) {
102                            throw new SchedulerException(
103                                    "Unable to delete jobs in group " + groupName, e);
104                    }
105    
106                    _readLock.lock();
107    
108                    try {
109                            _schedulerEngine.delete(groupName);
110                    }
111                    finally {
112                            _readLock.unlock();
113                    }
114    
115                    skipClusterInvoking(groupName);
116            }
117    
118            @Clusterable
119            public void delete(String jobName, String groupName)
120                    throws SchedulerException {
121    
122                    if (!PropsValues.SCHEDULER_ENABLED) {
123                            return;
124                    }
125    
126                    try {
127                            if (isMemorySchedulerSlave(groupName)) {
128                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
129    
130                                    return;
131                            }
132                    }
133                    catch (Exception e) {
134                            throw new SchedulerException(
135                                    "Unable to delete job {jobName=" + jobName + ", groupName=" +
136                                            groupName + "}",
137                                    e);
138                    }
139    
140                    _readLock.lock();
141    
142                    try {
143                            _schedulerEngine.delete(jobName, groupName);
144                    }
145                    finally {
146                            _readLock.unlock();
147                    }
148    
149                    skipClusterInvoking(groupName);
150            }
151    
152            public String getBeanIdentifier() {
153                    return _beanIdentifier;
154            }
155    
156            public SchedulerResponse getScheduledJob(String jobName, String groupName)
157                    throws SchedulerException {
158    
159                    if (!PropsValues.SCHEDULER_ENABLED) {
160                            return null;
161                    }
162    
163                    try {
164                            if (isMemorySchedulerSlave(groupName)) {
165                                    return (SchedulerResponse)callMaster(
166                                            _getScheduledJobMethodKey, jobName, groupName);
167                            }
168                    }
169                    catch (Exception e) {
170                            throw new SchedulerException(
171                                    "Unable to get job {jobName=" + jobName + ", groupName=" +
172                                            groupName + "}",
173                                    e);
174                    }
175    
176                    _readLock.lock();
177    
178                    try {
179                            return _schedulerEngine.getScheduledJob(jobName, groupName);
180                    }
181                    finally {
182                            _readLock.unlock();
183                    }
184            }
185    
186            public List<SchedulerResponse> getScheduledJobs()
187                    throws SchedulerException {
188    
189                    if (!PropsValues.SCHEDULER_ENABLED) {
190                            return Collections.emptyList();
191                    }
192    
193                    try {
194                            if (isMemorySchedulerSlave()) {
195                                    return (List<SchedulerResponse>)callMaster(
196                                            _getScheduledJobsMethodKey1);
197                            }
198                    }
199                    catch (Exception e) {
200                            throw new SchedulerException("Unable to get jobs", e);
201                    }
202    
203                    _readLock.lock();
204    
205                    try {
206                            return _schedulerEngine.getScheduledJobs();
207                    }
208                    finally {
209                            _readLock.unlock();
210                    }
211            }
212    
213            public List<SchedulerResponse> getScheduledJobs(String groupName)
214                    throws SchedulerException {
215    
216                    if (!PropsValues.SCHEDULER_ENABLED) {
217                            return Collections.emptyList();
218                    }
219    
220                    try {
221                            if (isMemorySchedulerSlave(groupName)) {
222                                    return (List<SchedulerResponse>)callMaster(
223                                            _getScheduledJobsMethodKey2, groupName);
224                            }
225                    }
226                    catch (Exception e) {
227                            throw new SchedulerException(
228                                    "Unable to get jobs in group " + groupName, e);
229                    }
230    
231                    _readLock.lock();
232    
233                    try {
234                            return _schedulerEngine.getScheduledJobs(groupName);
235                    }
236                    finally {
237                            _readLock.unlock();
238                    }
239            }
240    
241            public void initialize() throws SchedulerException {
242                    if (!PropsValues.SCHEDULER_ENABLED) {
243                            return;
244                    }
245    
246                    try {
247                            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
248    
249                            _readLock = readWriteLock.readLock();
250                            _writeLock = readWriteLock.writeLock();
251    
252                            _localClusterNodeAddress = getSerializedString(
253                                    ClusterExecutorUtil.getLocalClusterNodeAddress());
254    
255                            _clusterEventListener = new MemorySchedulerClusterEventListener();
256    
257                            ClusterExecutorUtil.addClusterEventListener(_clusterEventListener);
258    
259                            if (!isMemorySchedulerClusterLockOwner(
260                                            lockMemorySchedulerCluster(null))) {
261    
262                                    initMemoryClusteredJobs();
263                            }
264                    }
265                    catch (Exception e) {
266                            throw new SchedulerException("Unable to initialize scheduler", e);
267                    }
268            }
269    
270            @Clusterable
271            public void pause(String groupName) throws SchedulerException {
272                    if (!PropsValues.SCHEDULER_ENABLED) {
273                            return;
274                    }
275    
276                    try {
277                            if (isMemorySchedulerSlave(groupName)) {
278                                    updateMemoryClusteredJobs(groupName, TriggerState.PAUSED);
279    
280                                    return;
281                            }
282                    }
283                    catch (Exception e) {
284                            throw new SchedulerException(
285                                    "Unable to pause jobs in group " + groupName, e);
286                    }
287    
288                    _readLock.lock();
289    
290                    try {
291                            _schedulerEngine.pause(groupName);
292                    }
293                    finally {
294                            _readLock.unlock();
295                    }
296    
297                    skipClusterInvoking(groupName);
298            }
299    
300            @Clusterable
301            public void pause(String jobName, String groupName)
302                    throws SchedulerException {
303    
304                    if (!PropsValues.SCHEDULER_ENABLED) {
305                            return;
306                    }
307    
308                    try {
309                            if (isMemorySchedulerSlave(groupName)) {
310                                    updateMemoryClusteredJob(
311                                            jobName, groupName, TriggerState.PAUSED);
312    
313                                    return;
314                            }
315                    }
316                    catch (Exception e) {
317                            throw new SchedulerException(
318                                    "Unable to pause job {jobName=" + jobName + ", groupName=" +
319                                            groupName + "}",
320                                    e);
321                    }
322    
323                    _readLock.lock();
324    
325                    try {
326                            _schedulerEngine.pause(jobName, groupName);
327                    }
328                    finally {
329                            _readLock.unlock();
330                    }
331    
332                    skipClusterInvoking(groupName);
333            }
334    
335            @Clusterable
336            public void resume(String groupName) throws SchedulerException {
337                    if (!PropsValues.SCHEDULER_ENABLED) {
338                            return;
339                    }
340    
341                    try {
342                            if (isMemorySchedulerSlave(groupName)) {
343                                    updateMemoryClusteredJobs(groupName, TriggerState.NORMAL);
344    
345                                    return;
346                            }
347                    }
348                    catch (Exception e) {
349                            throw new SchedulerException(
350                                    "Unable to resume jobs in group " + groupName, e);
351                    }
352    
353                    _readLock.lock();
354    
355                    try {
356                            _schedulerEngine.resume(groupName);
357                    }
358                    finally {
359                            _readLock.unlock();
360                    }
361    
362                    skipClusterInvoking(groupName);
363            }
364    
365            @Clusterable
366            public void resume(String jobName, String groupName)
367                    throws SchedulerException {
368    
369                    if (!PropsValues.SCHEDULER_ENABLED) {
370                            return;
371                    }
372    
373                    try {
374                            if (isMemorySchedulerSlave(groupName)) {
375                                    updateMemoryClusteredJob(
376                                            jobName, groupName, TriggerState.NORMAL);
377    
378                                    return;
379                            }
380                    }
381                    catch (Exception e) {
382                            throw new SchedulerException(
383                                    "Unable to resume job {jobName=" + jobName + ", groupName=" +
384                                            groupName + "}",
385                                    e);
386                    }
387    
388                    _readLock.lock();
389    
390                    try {
391                            _schedulerEngine.resume(jobName, groupName);
392                    }
393                    finally {
394                            _readLock.unlock();
395                    }
396    
397                    skipClusterInvoking(groupName);
398            }
399    
400            @Clusterable
401            public void schedule(
402                            Trigger trigger, String description, String destinationName,
403                            Message message)
404                    throws SchedulerException {
405    
406                    if (!PropsValues.SCHEDULER_ENABLED) {
407                            return;
408                    }
409    
410                    String groupName = trigger.getGroupName();
411                    String jobName = trigger.getJobName();
412    
413                    try {
414                            if (isMemorySchedulerSlave(groupName)) {
415                                    SchedulerResponse schedulerResponse = new SchedulerResponse();
416    
417                                    schedulerResponse.setDescription(description);
418                                    schedulerResponse.setDestinationName(destinationName);
419                                    schedulerResponse.setGroupName(groupName);
420                                    schedulerResponse.setJobName(jobName);
421                                    schedulerResponse.setMessage(message);
422                                    schedulerResponse.setTrigger(trigger);
423    
424                                    _memoryClusteredJobs.put(
425                                            getFullName(jobName, groupName),
426                                            new ObjectValuePair<SchedulerResponse, TriggerState>(
427                                                    schedulerResponse, TriggerState.NORMAL));
428    
429                                    return;
430                            }
431                    }
432                    catch (Exception e) {
433                            throw new SchedulerException(
434                                    "Unable to schedule job {jobName=" + jobName + ", groupName=" +
435                                            groupName + "}",
436                                    e);
437                    }
438    
439                    _readLock.lock();
440    
441                    try {
442                            _schedulerEngine.schedule(
443                                    trigger, description, destinationName, message);
444                    }
445                    finally {
446                            _readLock.unlock();
447                    }
448    
449                    skipClusterInvoking(groupName);
450            }
451    
452            public void setBeanIdentifier(String beanIdentifier) {
453                    _beanIdentifier = beanIdentifier;
454            }
455    
456            public void shutdown() throws SchedulerException {
457                    if (!PropsValues.SCHEDULER_ENABLED) {
458                            return;
459                    }
460    
461                    try {
462                            ClusterExecutorUtil.removeClusterEventListener(
463                                    _clusterEventListener);
464    
465                            LockLocalServiceUtil.unlock(
466                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress,
467                                    PropsValues.MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
468                    }
469                    catch (Exception e) {
470                            throw new SchedulerException("Unable to shutdown scheduler", e);
471                    }
472    
473                    _schedulerEngine.shutdown();
474            }
475    
476            public void start() throws SchedulerException {
477                    if (!PropsValues.SCHEDULER_ENABLED) {
478                            return;
479                    }
480    
481                    _schedulerEngine.start();
482            }
483    
484            @Clusterable
485            public void suppressError(String jobName, String groupName)
486                    throws SchedulerException {
487    
488                    if (!PropsValues.SCHEDULER_ENABLED) {
489                            return;
490                    }
491    
492                    try {
493                            if (isMemorySchedulerSlave(groupName)) {
494                                    return;
495                            }
496                    }
497                    catch (Exception e) {
498                            throw new SchedulerException(
499                                    "Unable to suppress error for job {jobName=" + jobName +
500                                            ", groupName=" + groupName + "}",
501                                    e);
502                    }
503    
504                    _readLock.lock();
505    
506                    try {
507                            _schedulerEngine.suppressError(jobName, groupName);
508                    }
509                    finally {
510                            _readLock.unlock();
511                    }
512    
513                    skipClusterInvoking(groupName);
514            }
515    
516            @Clusterable
517            public void unschedule(String groupName) throws SchedulerException {
518                    if (!PropsValues.SCHEDULER_ENABLED) {
519                            return;
520                    }
521    
522                    try {
523                            if (isMemorySchedulerSlave(groupName)) {
524                                    removeMemoryClusteredJobs(groupName);
525    
526                                    return;
527                            }
528                    }
529                    catch (Exception e) {
530                            throw new SchedulerException(
531                                    "Unable to unschedule jobs in group " + groupName, e);
532                    }
533    
534                    _readLock.lock();
535    
536                    try {
537                            _schedulerEngine.unschedule(groupName);
538                    }
539                    finally {
540                            _readLock.unlock();
541                    }
542    
543                    skipClusterInvoking(groupName);
544            }
545    
546            @Clusterable
547            public void unschedule(String jobName, String groupName)
548                    throws SchedulerException {
549    
550                    if (!PropsValues.SCHEDULER_ENABLED) {
551                            return;
552                    }
553    
554                    try {
555                            if (isMemorySchedulerSlave(groupName)) {
556                                    _memoryClusteredJobs.remove(getFullName(jobName, groupName));
557    
558                                    return;
559                            }
560                    }
561                    catch (Exception e) {
562                            throw new SchedulerException(
563                                    "Unable to unschedule job {jobName=" + jobName +
564                                            ", groupName=" + groupName + "}",
565                                    e);
566                    }
567    
568                    _readLock.lock();
569    
570                    try {
571                            _schedulerEngine.unschedule(jobName, groupName);
572                    }
573                    finally {
574                            _readLock.unlock();
575                    }
576    
577                    skipClusterInvoking(groupName);
578            }
579    
580            @Clusterable
581            public void update(Trigger trigger) throws SchedulerException {
582                    if (!PropsValues.SCHEDULER_ENABLED) {
583                            return;
584                    }
585    
586                    String jobName = trigger.getJobName();
587                    String groupName = trigger.getGroupName();
588    
589                    try {
590                            if (isMemorySchedulerSlave(groupName)) {
591                                    for (ObjectValuePair<SchedulerResponse, TriggerState>
592                                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
593    
594                                            SchedulerResponse schedulerResponse =
595                                                    memoryClusteredJob.getKey();
596    
597                                            if (jobName.equals(schedulerResponse.getJobName()) &&
598                                                    groupName.equals(schedulerResponse.getGroupName())) {
599    
600                                                    schedulerResponse.setTrigger(trigger);
601    
602                                                    return;
603                                            }
604                                    }
605    
606                                    throw new Exception(
607                                            "Unable to update trigger for memory clustered job");
608                            }
609                    }
610                    catch (Exception e) {
611                            throw new SchedulerException(
612                                    "Unable to update job {jobName=" + jobName + ", groupName=" +
613                                            groupName + "}",
614                                    e);
615                    }
616    
617                    _readLock.lock();
618    
619                    try {
620                            _schedulerEngine.update(trigger);
621                    }
622                    finally {
623                            _readLock.unlock();
624                    }
625    
626                    skipClusterInvoking(groupName);
627            }
628    
629            public Lock updateMemorySchedulerClusterMaster() throws SchedulerException {
630                    try {
631                            Lock lock = lockMemorySchedulerCluster(null);
632    
633                            Address address = (Address)getDeserializedObject(lock.getOwner());
634    
635                            if (ClusterExecutorUtil.isClusterNodeAlive(address)) {
636                                    return lock;
637                            }
638    
639                            return lockMemorySchedulerCluster(lock.getOwner());
640                    }
641                    catch (Exception e) {
642                            throw new SchedulerException(
643                                    "Unable to update memory scheduler cluster master", e);
644                    }
645            }
646    
647            protected Object callMaster(MethodKey methodKey, Object... arguments)
648                    throws Exception {
649    
650                    MethodHandler methodHandler = new MethodHandler(methodKey, arguments);
651    
652                    Lock lock = updateMemorySchedulerClusterMaster();
653    
654                    Address address = (Address)getDeserializedObject(lock.getOwner());
655    
656                    if (address.equals(ClusterExecutorUtil.getLocalClusterNodeAddress())) {
657                            if (methodKey == _getScheduledJobsMethodKey3) {
658                                    return methodHandler.invoke(false);
659                            }
660                            else {
661                                    return methodHandler.invoke(schedulerEngine);
662                            }
663                    }
664    
665                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
666                            methodHandler, address);
667    
668                    clusterRequest.setBeanIdentifier(_beanIdentifier);
669    
670                    FutureClusterResponses futureClusterResponses =
671                            ClusterExecutorUtil.execute(clusterRequest);
672    
673                    try {
674                            ClusterNodeResponses clusterNodeResponses =
675                                    futureClusterResponses.get(20, TimeUnit.SECONDS);
676    
677                            ClusterNodeResponse clusterNodeResponse =
678                                    clusterNodeResponses.getClusterResponse(address);
679    
680                            return clusterNodeResponse.getResult();
681                    }
682                    catch (Exception e) {
683                            throw new SchedulerException(
684                                    "Unable to load scheduled jobs from cluster node " +
685                                            address.getDescription(),
686                                    e);
687                    }
688            }
689    
690            protected Object getDeserializedObject(String string) throws Exception {
691                    byte[] bytes = Base64.decode(string);
692    
693                    UnsyncByteArrayInputStream byteArrayInputStream =
694                            new UnsyncByteArrayInputStream(bytes);
695    
696                    ObjectInputStream objectInputStream = new ObjectInputStream(
697                            byteArrayInputStream);
698    
699                    Object object = objectInputStream.readObject();
700    
701                    objectInputStream.close();
702    
703                    return object;
704            }
705    
706            protected String getFullName(String jobName, String groupName) {
707                    return groupName.concat(StringPool.PERIOD).concat(jobName);
708            }
709    
710            protected String getSerializedString(Object object) throws Exception {
711                    UnsyncByteArrayOutputStream byteArrayOutputStream =
712                            new UnsyncByteArrayOutputStream();
713    
714                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(
715                            byteArrayOutputStream);
716    
717                    objectOutputStream.writeObject(object);
718                    objectOutputStream.close();
719    
720                    byte[] bytes = byteArrayOutputStream.toByteArray();
721    
722                    return Base64.encode(bytes);
723            }
724    
725            protected StorageType getStorageType(String groupName) {
726                    int pos = groupName.indexOf(CharPool.POUND);
727    
728                    String storageTypeString = groupName.substring(0, pos);
729    
730                    return StorageType.valueOf(storageTypeString);
731            }
732    
733            protected void initMemoryClusteredJobs() throws Exception {
734                    List<SchedulerResponse> schedulerResponses =
735                            (List<SchedulerResponse>)callMaster(
736                                    _getScheduledJobsMethodKey3, StorageType.MEMORY_CLUSTERED);
737    
738                    for (SchedulerResponse schedulerResponse : schedulerResponses) {
739                            Trigger oldTrigger = schedulerResponse.getTrigger();
740    
741                            String jobName = schedulerResponse.getJobName();
742                            String groupName = SchedulerEngineUtil.namespaceGroupName(
743                                    schedulerResponse.getGroupName(), StorageType.MEMORY_CLUSTERED);
744    
745                            Trigger newTrigger = TriggerFactoryUtil.buildTrigger(
746                                    oldTrigger.getTriggerType(), jobName, groupName,
747                                    oldTrigger.getStartDate(), oldTrigger.getEndDate(),
748                                    oldTrigger.getTriggerContent());
749    
750                            schedulerResponse.setTrigger(newTrigger);
751    
752                            TriggerState triggerState = SchedulerEngineUtil.getJobState(
753                                    schedulerResponse);
754    
755                            Message message = schedulerResponse.getMessage();
756    
757                            message.remove(JOB_STATE);
758    
759                            _memoryClusteredJobs.put(
760                                    getFullName(jobName, groupName),
761                                    new ObjectValuePair<SchedulerResponse, TriggerState>(
762                                            schedulerResponse, triggerState));
763                    }
764            }
765    
766            protected boolean isMemorySchedulerClusterLockOwner(Lock lock)
767                    throws Exception {
768    
769                    boolean master = _localClusterNodeAddress.equals(lock.getOwner());
770    
771                    if (master == _master) {
772                            return master;
773                    }
774    
775                    if (!_master) {
776                            _master = master;
777    
778                            return _master;
779                    }
780    
781                    _localClusterNodeAddress = getSerializedString(
782                            ClusterExecutorUtil.getLocalClusterNodeAddress());
783    
784                    for (ObjectValuePair<SchedulerResponse, TriggerState>
785                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
786    
787                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
788    
789                            _schedulerEngine.delete(
790                                    schedulerResponse.getJobName(),
791                                    schedulerResponse.getGroupName());
792                    }
793    
794                    initMemoryClusteredJobs();
795    
796                    if (_log.isInfoEnabled()) {
797                            _log.info("Another node is now the memory scheduler master");
798                    }
799    
800                    _master = master;
801    
802                    return master;
803            }
804    
805            protected boolean isMemorySchedulerSlave() throws Exception {
806                    return isMemorySchedulerSlave(null);
807            }
808    
809            protected boolean isMemorySchedulerSlave(String groupName)
810                    throws Exception {
811    
812                    if (groupName != null) {
813                            StorageType storageType = getStorageType(groupName);
814    
815                            if (!storageType.equals(StorageType.MEMORY_CLUSTERED)) {
816                                    return false;
817                            }
818                    }
819    
820                    Lock lock = lockMemorySchedulerCluster(null);
821    
822                    if (isMemorySchedulerClusterLockOwner(lock)) {
823                            return false;
824                    }
825    
826                    return true;
827            }
828    
829            protected Lock lockMemorySchedulerCluster(String owner) throws Exception {
830                    Lock lock = null;
831    
832                    while (true) {
833                            try {
834                                    if (owner == null) {
835                                            lock = LockLocalServiceUtil.lock(
836                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
837                                                    _localClusterNodeAddress,
838                                                    PropsValues.
839                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
840                                    }
841                                    else {
842                                            lock = LockLocalServiceUtil.lock(
843                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
844                                                    _localClusterNodeAddress,
845                                                    PropsValues.
846                                                            MEMORY_CLUSTER_SCHEDULER_LOCK_CACHE_ENABLED);
847                                    }
848    
849                                    break;
850                            }
851                            catch (Exception e) {
852                                    if (_log.isWarnEnabled()) {
853                                            _log.warn(
854                                                    "Unable to obtain memory scheduler cluster lock. " +
855                                                            "Trying again.");
856                                    }
857                            }
858                    }
859    
860                    if (!lock.isNew()) {
861                            return lock;
862                    }
863    
864                    boolean forceSync = ProxyModeThreadLocal.isForceSync();
865    
866                    ProxyModeThreadLocal.setForceSync(true);
867    
868                    _writeLock.lock();
869    
870                    try {
871                            for (ObjectValuePair<SchedulerResponse, TriggerState>
872                                            memoryClusteredJob : _memoryClusteredJobs.values()) {
873    
874                                    SchedulerResponse schedulerResponse =
875                                            memoryClusteredJob.getKey();
876    
877                                    _schedulerEngine.schedule(
878                                            schedulerResponse.getTrigger(),
879                                            schedulerResponse.getDescription(),
880                                            schedulerResponse.getDestinationName(),
881                                            schedulerResponse.getMessage());
882    
883                                    TriggerState triggerState = memoryClusteredJob.getValue();
884    
885                                    if (triggerState.equals(TriggerState.PAUSED)) {
886                                            _schedulerEngine.pause(
887                                                    schedulerResponse.getJobName(),
888                                                    schedulerResponse.getGroupName());
889                                    }
890                            }
891                    }
892                    finally {
893                            ProxyModeThreadLocal.setForceSync(forceSync);
894    
895                            _writeLock.unlock();
896                    }
897    
898                    return lock;
899            }
900    
901            protected void removeMemoryClusteredJobs(String groupName) {
902                    Set<Map.Entry<String, ObjectValuePair<SchedulerResponse, TriggerState>>>
903                            memoryClusteredJobs = _memoryClusteredJobs.entrySet();
904    
905                    Iterator
906                            <Map.Entry<String,
907                                    ObjectValuePair<SchedulerResponse, TriggerState>>> itr =
908                                            memoryClusteredJobs.iterator();
909    
910                    while (itr.hasNext()) {
911                            Map.Entry <String, ObjectValuePair<SchedulerResponse, TriggerState>>
912                                    entry = itr.next();
913    
914                            ObjectValuePair<SchedulerResponse, TriggerState>
915                                    memoryClusteredJob = entry.getValue();
916    
917                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
918    
919                            if (groupName.equals(schedulerResponse.getGroupName())) {
920                                    itr.remove();
921                            }
922                    }
923            }
924    
925            protected void skipClusterInvoking(String groupName)
926                    throws SchedulerException {
927    
928                    StorageType storageType = getStorageType(groupName);
929    
930                    if (storageType.equals(StorageType.PERSISTED)) {
931                            SchedulerException schedulerException = new SchedulerException();
932    
933                            schedulerException.setSwallowable(true);
934    
935                            throw schedulerException;
936                    }
937            }
938    
939            protected void updateMemoryClusteredJob(
940                    String jobName, String groupName, TriggerState triggerState) {
941    
942                    ObjectValuePair<SchedulerResponse, TriggerState>
943                            memoryClusteredJob = _memoryClusteredJobs.get(
944                                    getFullName(jobName, groupName));
945    
946                    if (memoryClusteredJob != null) {
947                            memoryClusteredJob.setValue(triggerState);
948                    }
949            }
950    
951            protected void updateMemoryClusteredJobs(
952                    String groupName, TriggerState triggerState) {
953    
954                    for (ObjectValuePair<SchedulerResponse, TriggerState>
955                                    memoryClusteredJob : _memoryClusteredJobs.values()) {
956    
957                            SchedulerResponse schedulerResponse = memoryClusteredJob.getKey();
958    
959                            if (groupName.equals(schedulerResponse.getGroupName())) {
960                                    memoryClusteredJob.setValue(triggerState);
961                            }
962                    }
963            }
964    
965            @BeanReference(
966                    name="com.liferay.portal.scheduler.ClusterSchedulerEngineService")
967            protected SchedulerEngine schedulerEngine;
968    
969            private static final String _LOCK_CLASS_NAME =
970                    SchedulerEngine.class.getName();
971    
972            private static Log _log = LogFactoryUtil.getLog(
973                    ClusterSchedulerEngine.class);
974    
975            private static MethodKey _getScheduledJobMethodKey = new MethodKey(
976                    SchedulerEngine.class.getName(), "getScheduledJob", String.class,
977                    String.class);
978            private static MethodKey _getScheduledJobsMethodKey1 = new MethodKey(
979                    SchedulerEngine.class.getName(), "getScheduledJobs");
980            private static MethodKey _getScheduledJobsMethodKey2 = new MethodKey(
981                    SchedulerEngine.class.getName(), "getScheduledJobs", String.class);
982            private static MethodKey _getScheduledJobsMethodKey3 = new MethodKey(
983                    SchedulerEngineUtil.class.getName(), "getScheduledJobs",
984                    StorageType.class);
985    
986            private String _beanIdentifier;
987            private ClusterEventListener _clusterEventListener;
988            private volatile String _localClusterNodeAddress;
989            private volatile boolean _master;
990            private Map<String, ObjectValuePair<SchedulerResponse, TriggerState>>
991                    _memoryClusteredJobs = new ConcurrentHashMap
992                            <String, ObjectValuePair<SchedulerResponse, TriggerState>>();
993            private java.util.concurrent.locks.Lock _readLock;
994            private SchedulerEngine _schedulerEngine;
995            private java.util.concurrent.locks.Lock _writeLock;
996    
997            private class MemorySchedulerClusterEventListener
998                    implements ClusterEventListener {
999    
1000                    public void processClusterEvent(ClusterEvent clusterEvent) {
1001                            ClusterEventType clusterEventType =
1002                                    clusterEvent.getClusterEventType();
1003    
1004                            if (!clusterEventType.equals(ClusterEventType.DEPART)) {
1005                                    return;
1006                            }
1007    
1008                            try {
1009                                    updateMemorySchedulerClusterMaster();
1010                            }
1011                            catch (Exception e) {
1012                                    _log.error("Unable to update memory scheduler cluster lock", e);
1013                            }
1014                    }
1015    
1016            }
1017    
1018    }