001
014
015 package com.liferay.portal.scheduler.job;
016
017 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
018 import com.liferay.portal.kernel.cluster.ClusterRequest;
019 import com.liferay.portal.kernel.concurrent.LockRegistry;
020 import com.liferay.portal.kernel.json.JSONFactoryUtil;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.messaging.MessageBusUtil;
025 import com.liferay.portal.kernel.scheduler.JobState;
026 import com.liferay.portal.kernel.scheduler.JobStateSerializeUtil;
027 import com.liferay.portal.kernel.scheduler.SchedulerEngine;
028 import com.liferay.portal.kernel.scheduler.SchedulerEngineUtil;
029 import com.liferay.portal.kernel.scheduler.StorageType;
030 import com.liferay.portal.kernel.util.MethodHandler;
031 import com.liferay.portal.kernel.util.MethodKey;
032 import com.liferay.portal.spring.context.PortletContextLoaderListener;
033 import com.liferay.portal.util.PropsValues;
034
035 import java.util.Map;
036 import java.util.concurrent.locks.ReentrantLock;
037
038 import org.quartz.Job;
039 import org.quartz.JobDataMap;
040 import org.quartz.JobDetail;
041 import org.quartz.JobExecutionContext;
042 import org.quartz.JobKey;
043 import org.quartz.Scheduler;
044
045
049 public class MessageSenderJob implements Job {
050
051 public void execute(JobExecutionContext jobExecutionContext) {
052 try {
053 doExecute(jobExecutionContext);
054 }
055 catch (Exception e) {
056 _log.error("Unable to execute job", e);
057 }
058 }
059
060 protected void doExecute(JobExecutionContext jobExecutionContext)
061 throws Exception {
062
063 JobDetail jobDetail = jobExecutionContext.getJobDetail();
064
065 JobDataMap jobDataMap = jobDetail.getJobDataMap();
066
067 String destinationName = jobDataMap.getString(
068 SchedulerEngine.DESTINATION_NAME);
069
070 String messageJSON = (String)jobDataMap.get(SchedulerEngine.MESSAGE);
071
072 Message message = null;
073
074 if (messageJSON == null) {
075 message = new Message();
076 }
077 else {
078 message = (Message)JSONFactoryUtil.deserialize(messageJSON);
079 }
080
081 String contextPath = message.getString(SchedulerEngine.CONTEXT_PATH);
082
083 String lockKey = PortletContextLoaderListener.getLockKey(contextPath);
084
085 ReentrantLock executionLock = null;
086
087 if (lockKey != null) {
088 executionLock = LockRegistry.getLock(lockKey, lockKey);
089
090 if (executionLock != null) {
091 if (executionLock.hasQueuedThreads()) {
092 return;
093 }
094
095 executionLock.lock();
096 }
097 }
098
099 try {
100 message.put(SchedulerEngine.DESTINATION_NAME, destinationName);
101
102 Map<String, Object> jobStateMap =
103 (Map<String, Object>)jobDataMap.get(SchedulerEngine.JOB_STATE);
104
105 JobState jobState = JobStateSerializeUtil.deserialize(jobStateMap);
106
107 JobKey jobKey = jobDetail.getKey();
108
109 if (jobExecutionContext.getNextFireTime() == null) {
110 message.put(SchedulerEngine.DISABLE, true);
111
112 StorageType storageType = StorageType.valueOf(
113 jobDataMap.getString(SchedulerEngine.STORAGE_TYPE));
114
115 if (PropsValues.CLUSTER_LINK_ENABLED &&
116 storageType.equals(StorageType.MEMORY_CLUSTERED)) {
117
118 notifyClusterMember(jobKey, storageType);
119 }
120
121 if (storageType.equals(StorageType.PERSISTED)) {
122 Scheduler scheduler = jobExecutionContext.getScheduler();
123
124 scheduler.deleteJob(jobKey);
125 }
126 }
127
128 message.put(SchedulerEngine.JOB_NAME, jobKey.getName());
129 message.put(SchedulerEngine.JOB_STATE, jobState);
130 message.put(SchedulerEngine.GROUP_NAME, jobKey.getGroup());
131
132 MessageBusUtil.sendMessage(destinationName, message);
133 }
134 finally {
135 if (executionLock != null) {
136 executionLock.unlock();
137 }
138 }
139 }
140
141 protected void notifyClusterMember(JobKey jobKey, StorageType storageType)
142 throws Exception {
143
144 MethodHandler methodHandler = new MethodHandler(
145 _deleteJobMethodKey, jobKey.getName(), jobKey.getGroup(),
146 storageType);
147
148 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
149 methodHandler, true);
150
151 ClusterExecutorUtil.execute(clusterRequest);
152 }
153
154 private static Log _log = LogFactoryUtil.getLog(MessageSenderJob.class);
155
156 private static MethodKey _deleteJobMethodKey = new MethodKey(
157 SchedulerEngineUtil.class.getName(), "delete", String.class,
158 String.class, StorageType.class);
159
160 }