001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.scheduler.job;
016    
017    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018    import com.liferay.portal.kernel.cluster.ClusterRequest;
019    import com.liferay.portal.kernel.concurrent.LockRegistry;
020    import com.liferay.portal.kernel.json.JSONFactoryUtil;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.messaging.Message;
024    import com.liferay.portal.kernel.messaging.MessageBusUtil;
025    import com.liferay.portal.kernel.scheduler.JobState;
026    import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
027    import com.liferay.portal.kernel.scheduler.SchedulerEngine;
028    import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
029    import com.liferay.portal.kernel.scheduler.StorageType;
030    import com.liferay.portal.kernel.util.MethodHandler;
031    import com.liferay.portal.kernel.util.MethodKey;
032    import com.liferay.portal.spring.context.PortletContextLoaderListener;
033    import com.liferay.portal.util.PropsValues;
034    
035    import java.util.Map;
036    import java.util.concurrent.locks.ReentrantLock;
037    
038    import org.quartz.Job;
039    import org.quartz.JobDataMap;
040    import org.quartz.JobDetail;
041    import org.quartz.JobExecutionContext;
042    import org.quartz.JobKey;
043    import org.quartz.Scheduler;
044    
045    /**
046     * @author Michael C. Han
047     * @author Bruno Farache
048     */
049    public class MessageSenderJob implements Job {
050    
051            public void execute(JobExecutionContext jobExecutionContext) {
052                    try {
053                            doExecute(jobExecutionContext);
054                    }
055                    catch (Exception e) {
056                            _log.error("Unable to execute job", e);
057                    }
058            }
059    
060            protected void doExecute(JobExecutionContext jobExecutionContext)
061                    throws Exception {
062    
063                    JobDetail jobDetail = jobExecutionContext.getJobDetail();
064    
065                    JobDataMap jobDataMap = jobDetail.getJobDataMap();
066    
067                    String destinationName = jobDataMap.getString(
068                            SchedulerEngine.DESTINATION_NAME);
069    
070                    String messageJSON = (String)jobDataMap.get(SchedulerEngine.MESSAGE);
071    
072                    Message message = null;
073    
074                    if (messageJSON == null) {
075                            message = new Message();
076                    }
077                    else {
078                            message = (Message)JSONFactoryUtil.deserialize(messageJSON);
079                    }
080    
081                    String contextPath = message.getString(SchedulerEngine.CONTEXT_PATH);
082    
083                    String lockKey = PortletContextLoaderListener.getLockKey(contextPath);
084    
085                    ReentrantLock executionLock = null;
086    
087                    if (lockKey != null) {
088                            executionLock = LockRegistry.getLock(lockKey, lockKey);
089    
090                            if (executionLock != null) {
091                                    if (executionLock.hasQueuedThreads()) {
092                                            return;
093                                    }
094    
095                                    executionLock.lock();
096                            }
097                    }
098    
099                    try {
100                            message.put(SchedulerEngine.DESTINATION_NAME, destinationName);
101    
102                            Map<String, Object> jobStateMap =
103                                    (Map<String, Object>)jobDataMap.get(SchedulerEngine.JOB_STATE);
104    
105                            JobState jobState = JobStateSerializeUtil.deserialize(jobStateMap);
106    
107                            JobKey jobKey = jobDetail.getKey();
108    
109                            if (jobExecutionContext.getNextFireTime() == null) {
110                                    message.put(SchedulerEngine.DISABLE, true);
111    
112                                    StorageType storageType = StorageType.valueOf(
113                                            jobDataMap.getString(SchedulerEngine.STORAGE_TYPE));
114    
115                                    if (PropsValues.CLUSTER_LINK_ENABLED &&
116                                            storageType.equals(StorageType.MEMORY_CLUSTERED)) {
117    
118                                            notifyClusterMember(jobKey, storageType);
119                                    }
120    
121                                    if (storageType.equals(StorageType.PERSISTED)) {
122                                            Scheduler scheduler = jobExecutionContext.getScheduler();
123    
124                                            scheduler.deleteJob(jobKey);
125                                    }
126                            }
127    
128                            message.put(SchedulerEngine.JOB_NAME, jobKey.getName());
129                            message.put(SchedulerEngine.JOB_STATE, jobState);
130                            message.put(SchedulerEngine.GROUP_NAME, jobKey.getGroup());
131    
132                            MessageBusUtil.sendMessage(destinationName, message);
133                    }
134                    finally {
135                            if (executionLock != null) {
136                                    executionLock.unlock();
137                            }
138                    }
139            }
140    
141            protected void notifyClusterMember(JobKey jobKey, StorageType storageType)
142                    throws Exception {
143    
144                    MethodHandler methodHandler = new MethodHandler(
145                            _deleteJobMethodKey, jobKey.getName(), jobKey.getGroup(),
146                            storageType);
147    
148                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
149                            methodHandler, true);
150    
151                    ClusterExecutorUtil.execute(clusterRequest);
152            }
153    
154            private static Log _log = LogFactoryUtil.getLog(MessageSenderJob.class);
155    
156            private static MethodKey _deleteJobMethodKey = new MethodKey(
157                    SchedulerEngineUtil.class.getName(), "delete", String.class,
158                    String.class, StorageType.class);
159    
160    }