001
014
015 package com.liferay.portal.poller;
016
017 import com.liferay.portal.kernel.exception.SystemException;
018 import com.liferay.portal.kernel.json.JSONFactoryUtil;
019 import com.liferay.portal.kernel.json.JSONObject;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.messaging.DestinationNames;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.messaging.MessageBusUtil;
025 import com.liferay.portal.kernel.messaging.MessageListener;
026 import com.liferay.portal.kernel.poller.DefaultPollerResponse;
027 import com.liferay.portal.kernel.poller.PollerHeader;
028 import com.liferay.portal.kernel.poller.PollerProcessor;
029 import com.liferay.portal.kernel.poller.PollerRequest;
030 import com.liferay.portal.kernel.poller.PollerResponse;
031 import com.liferay.portal.kernel.util.GetterUtil;
032 import com.liferay.portal.kernel.util.StringPool;
033 import com.liferay.portal.kernel.util.StringUtil;
034 import com.liferay.portal.kernel.util.Validator;
035 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
036 import com.liferay.portal.model.BrowserTracker;
037 import com.liferay.portal.model.Company;
038 import com.liferay.portal.service.BrowserTrackerLocalServiceUtil;
039 import com.liferay.portal.service.CompanyLocalServiceUtil;
040 import com.liferay.util.Encryptor;
041
042 import java.util.ArrayList;
043 import java.util.HashMap;
044 import java.util.HashSet;
045 import java.util.List;
046 import java.util.Map;
047 import java.util.Set;
048
049
054 public class PollerRequestHandlerImpl
055 implements PollerRequestHandler, MessageListener {
056
057 public PollerHeader getPollerHeader(String pollerRequestString) {
058 if (Validator.isNull(pollerRequestString)) {
059 return null;
060 }
061
062 Map<String, Object>[] pollerRequestChunks =
063 parsePollerRequestParameters(pollerRequestString);
064
065 return parsePollerRequestHeader(pollerRequestChunks);
066 }
067
068 public JSONObject processRequest(String path, String pollerRequestString)
069 throws Exception {
070
071 if (Validator.isNull(pollerRequestString)) {
072 return null;
073 }
074
075 Map<String, Object>[] pollerRequestChunks =
076 parsePollerRequestParameters(pollerRequestString);
077
078 PollerHeader pollerHeader = parsePollerRequestHeader(
079 pollerRequestChunks);
080
081 if (pollerHeader == null) {
082 return null;
083 }
084
085 boolean receiveRequest = isReceiveRequest(path);
086
087 String pollerSessionId = getPollerSessionId(pollerHeader);
088
089 PollerSession pollerSession = null;
090
091 synchronized (_pollerSessions) {
092 pollerSession = _pollerSessions.get(pollerSessionId);
093
094 if ((pollerSession == null) && receiveRequest) {
095 pollerSession = new PollerSession(pollerSessionId);
096
097 _pollerSessions.put(pollerSessionId, pollerSession);
098 }
099 }
100
101 List<PollerRequest> pollerRequests = createPollerRequests(
102 pollerHeader, pollerRequestChunks, receiveRequest);
103
104 executePollerRequests(pollerSession, pollerRequests);
105
106 if (receiveRequest) {
107 return createPollerResponseHeader(pollerHeader);
108 }
109 else {
110 return null;
111 }
112 }
113
114 public void receive(Message message) {
115 Object messagePayload = message.getPayload();
116
117 if (!(messagePayload instanceof PollerResponse)) {
118 return;
119 }
120
121 PollerResponse pollerResponse = (PollerResponse) messagePayload;
122
123 PollerHeader pollerHeader = pollerResponse.getPollerHeader();
124
125 String pollerSessionId = getPollerSessionId(pollerHeader);
126
127 synchronized (_pollerSessions) {
128 PollerSession pollerSession = _pollerSessions.get(pollerSessionId);
129
130 if ((pollerSession != null) &&
131 pollerSession.completePortletProcessing(
132 pollerResponse.getPortletId(), message.getResponseId())) {
133
134 _pollerSessions.remove(pollerSessionId);
135 }
136 }
137 }
138
139 protected PollerRequest createPollerRequest(
140 boolean receiveRequest, PollerHeader pollerHeader, String portletId)
141 throws Exception {
142
143 return createPollerRequest(
144 receiveRequest, pollerHeader, portletId,
145 new HashMap<String, String>(), null);
146 }
147
148 protected PollerRequest createPollerRequest(
149 boolean receiveRequest, PollerHeader pollerHeader, String portletId,
150 Map<String, String> parameterMap, String chunkId)
151 throws Exception {
152
153 PollerProcessor pollerProcessor =
154 PollerProcessorUtil.getPollerProcessor(portletId);
155
156 if (pollerProcessor == null) {
157 if (_log.isWarnEnabled()) {
158 _log.warn(
159 "Poller processor not found for portlet " + portletId);
160 }
161
162 return null;
163 }
164
165 return new PollerRequest(
166 pollerHeader, portletId, parameterMap, chunkId, receiveRequest);
167 }
168
169 protected List<PollerRequest> createPollerRequests(
170 PollerHeader pollerHeader,
171 Map<String, Object>[] pollerRequestChunks, boolean receiveRequest)
172 throws Exception {
173
174 String[] portletIds = pollerHeader.getPortletIds();
175
176 List<PollerRequest> pollerRequests = new ArrayList<PollerRequest>(
177 portletIds.length);
178
179 Set<String> receiveRequestPortletIds = null;
180
181 if (receiveRequest) {
182 receiveRequestPortletIds = new HashSet<String>(
183 (int)(pollerRequestChunks.length / 0.75) + 1);
184 }
185
186 for (int i = 1; i < pollerRequestChunks.length; i++) {
187 Map<String, Object> pollerRequestChunk = pollerRequestChunks[i];
188
189 String portletId = (String)pollerRequestChunk.get("portletId");
190 Map<String, String> parameterMap = parseData(pollerRequestChunk);
191 String chunkId = (String)pollerRequestChunk.get("chunkId");
192
193 try {
194 PollerRequest pollerRequest = createPollerRequest(
195 receiveRequest, pollerHeader, portletId, parameterMap,
196 chunkId);
197
198 pollerRequests.add(pollerRequest);
199
200 if (receiveRequest) {
201 receiveRequestPortletIds.add(portletId);
202 }
203 }
204 catch (Exception e) {
205 _log.error(e, e);
206 }
207 }
208
209 if (receiveRequest) {
210 for (String portletId : portletIds) {
211 if (receiveRequestPortletIds.contains(portletId)) {
212 continue;
213 }
214
215 try {
216 PollerRequest pollerRequest = createPollerRequest(
217 receiveRequest, pollerHeader, portletId);
218
219 pollerRequests.add(pollerRequest);
220 }
221 catch (Exception e) {
222 _log.error(e, e);
223 }
224 }
225 }
226
227 return pollerRequests;
228 }
229
230 protected JSONObject createPollerResponseHeader(PollerHeader pollerHeader)
231 throws SystemException {
232
233 if (pollerHeader == null) {
234 return null;
235 }
236
237 boolean suspendPolling = false;
238
239 if (pollerHeader.isStartPolling()) {
240 BrowserTrackerLocalServiceUtil.updateBrowserTracker(
241 pollerHeader.getUserId(), pollerHeader.getBrowserKey());
242 }
243 else {
244 BrowserTracker browserTracker =
245 BrowserTrackerLocalServiceUtil.getBrowserTracker(
246 pollerHeader.getUserId(), pollerHeader.getBrowserKey());
247
248 if (browserTracker.getBrowserKey() !=
249 pollerHeader.getBrowserKey()) {
250
251 suspendPolling = true;
252 }
253 }
254
255 JSONObject pollerResponseHeaderJSONObject =
256 JSONFactoryUtil.createJSONObject();
257
258 pollerResponseHeaderJSONObject.put("userId", pollerHeader.getUserId());
259 pollerResponseHeaderJSONObject.put(
260 "initialRequest", pollerHeader.isInitialRequest());
261 pollerResponseHeaderJSONObject.put("suspendPolling", suspendPolling);
262
263 return pollerResponseHeaderJSONObject;
264 }
265
266 protected void executePollerRequests(
267 PollerSession pollerSession, List<PollerRequest> pollerRequests) {
268
269 for (PollerRequest pollerRequest : pollerRequests) {
270 PollerRequestResponsePair pollerRequestResponsePair =
271 new PollerRequestResponsePair(pollerRequest);
272
273 String responseId = null;
274
275 if (pollerRequest.isReceiveRequest()) {
276 responseId = PortalUUIDUtil.generate();
277
278 PollerResponse pollerResponse = new DefaultPollerResponse(
279 pollerRequest.getPollerHeader(),
280 pollerRequest.getPortletId(), pollerRequest.getChunkId());
281
282 pollerRequestResponsePair.setPollerResponse(pollerResponse);
283
284 if (!pollerSession.beginPortletProcessing(
285 pollerRequestResponsePair, responseId)) {
286
287 continue;
288 }
289 }
290
291 Message message = new Message();
292
293 message.setPayload(pollerRequestResponsePair);
294
295 if (pollerRequest.isReceiveRequest()) {
296 message.setResponseId(responseId);
297
298 message.setResponseDestinationName(
299 DestinationNames.POLLER_RESPONSE);
300 }
301
302 MessageBusUtil.sendMessage(DestinationNames.POLLER, message);
303 }
304 }
305
306 protected String fixPollerRequestString(String pollerRequestString) {
307 if (Validator.isNull(pollerRequestString)) {
308 return null;
309 }
310
311 return StringUtil.replace(
312 pollerRequestString,
313 new String[] {
314 StringPool.OPEN_CURLY_BRACE, StringPool.CLOSE_CURLY_BRACE,
315 _ESCAPED_OPEN_CURLY_BRACE, _ESCAPED_CLOSE_CURLY_BRACE
316 },
317 new String[] {
318 _OPEN_HASH_MAP_WRAPPER, StringPool.DOUBLE_CLOSE_CURLY_BRACE,
319 StringPool.OPEN_CURLY_BRACE, StringPool.CLOSE_CURLY_BRACE
320 });
321 }
322
323 protected String getPollerSessionId(PollerHeader pollerHeader) {
324 return String.valueOf(pollerHeader.getUserId());
325 }
326
327 protected long getUserId(long companyId, String userIdString) {
328 long userId = 0;
329
330 try {
331 Company company = CompanyLocalServiceUtil.getCompany(companyId);
332
333 userId = GetterUtil.getLong(
334 Encryptor.decrypt(company.getKeyObj(), userIdString));
335 }
336 catch (Exception e) {
337 _log.error(
338 "Invalid credentials for company id " + companyId +
339 " and user id " + userIdString);
340 }
341
342 return userId;
343 }
344
345 protected boolean isReceiveRequest(String path) {
346 if ((path != null) && path.endsWith(_PATH_RECEIVE)) {
347 return true;
348 }
349 else {
350 return false;
351 }
352 }
353
354 protected Map<String, String> parseData(
355 Map<String, Object> pollerRequestChunk)
356 throws Exception {
357
358 Map<String, Object> oldParameterMap =
359 (Map<String, Object>)pollerRequestChunk.get("data");
360
361 Map<String, String> newParameterMap = new HashMap<String, String>();
362
363 if (oldParameterMap == null) {
364 return newParameterMap;
365 }
366
367 for (Map.Entry<String, Object> entry : oldParameterMap.entrySet()) {
368 newParameterMap.put(
369 entry.getKey(), String.valueOf(entry.getValue()));
370 }
371
372 return newParameterMap;
373 }
374
375 protected PollerHeader parsePollerRequestHeader(
376 Map<String, Object>[] pollerRequestChunks) {
377
378 if ((pollerRequestChunks == null) || (pollerRequestChunks.length < 1)) {
379 return null;
380 }
381
382 Map<String, Object> pollerRequestChunk = pollerRequestChunks[0];
383
384 long companyId = GetterUtil.getLong(
385 String.valueOf(pollerRequestChunk.get("companyId")));
386 String userIdString = GetterUtil.getString(
387 String.valueOf(pollerRequestChunk.get("userId")));
388 long browserKey = GetterUtil.getLong(
389 String.valueOf(pollerRequestChunk.get("browserKey")));
390 String[] portletIds = StringUtil.split(
391 String.valueOf(pollerRequestChunk.get("portletIds")));
392 boolean initialRequest = GetterUtil.getBoolean(
393 String.valueOf(pollerRequestChunk.get("initialRequest")));
394 boolean startPolling = GetterUtil.getBoolean(
395 String.valueOf(pollerRequestChunk.get("startPolling")));
396
397 long userId = getUserId(companyId, userIdString);
398
399 if (userId == 0) {
400 return null;
401 }
402
403 return new PollerHeader(
404 companyId, userId, browserKey, portletIds, initialRequest,
405 startPolling);
406 }
407
408 protected Map<String, Object>[] parsePollerRequestParameters(
409 String pollerRequestString) {
410
411 String fixedPollerRequestString = fixPollerRequestString(
412 pollerRequestString);
413
414 return (Map<String, Object>[])JSONFactoryUtil.deserialize(
415 fixedPollerRequestString);
416 }
417
418 private static final String _ESCAPED_CLOSE_CURLY_BRACE =
419 "[$CLOSE_CURLY_BRACE$]";
420
421 private static final String _ESCAPED_OPEN_CURLY_BRACE =
422 "[$OPEN_CURLY_BRACE$]";
423
424 private static final String _OPEN_HASH_MAP_WRAPPER =
425 "{\"javaClass\":\"java.util.HashMap\",\"map\":{";
426
427 private static final String _PATH_RECEIVE = "/receive";
428
429 private static Log _log = LogFactoryUtil.getLog(
430 PollerRequestHandlerImpl.class);
431
432 private Map<String, PollerSession> _pollerSessions =
433 new HashMap<String, PollerSession>();
434
435 }