001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.poller;
016    
017    import com.liferay.portal.kernel.exception.SystemException;
018    import com.liferay.portal.kernel.json.JSONFactoryUtil;
019    import com.liferay.portal.kernel.json.JSONObject;
020    import com.liferay.portal.kernel.log.Log;
021    import com.liferay.portal.kernel.log.LogFactoryUtil;
022    import com.liferay.portal.kernel.messaging.DestinationNames;
023    import com.liferay.portal.kernel.messaging.Message;
024    import com.liferay.portal.kernel.messaging.MessageBusUtil;
025    import com.liferay.portal.kernel.messaging.MessageListener;
026    import com.liferay.portal.kernel.poller.DefaultPollerResponse;
027    import com.liferay.portal.kernel.poller.PollerHeader;
028    import com.liferay.portal.kernel.poller.PollerProcessor;
029    import com.liferay.portal.kernel.poller.PollerRequest;
030    import com.liferay.portal.kernel.poller.PollerResponse;
031    import com.liferay.portal.kernel.util.GetterUtil;
032    import com.liferay.portal.kernel.util.StringPool;
033    import com.liferay.portal.kernel.util.StringUtil;
034    import com.liferay.portal.kernel.util.Validator;
035    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
036    import com.liferay.portal.model.BrowserTracker;
037    import com.liferay.portal.model.Company;
038    import com.liferay.portal.service.BrowserTrackerLocalServiceUtil;
039    import com.liferay.portal.service.CompanyLocalServiceUtil;
040    import com.liferay.util.Encryptor;
041    
042    import java.util.ArrayList;
043    import java.util.HashMap;
044    import java.util.HashSet;
045    import java.util.List;
046    import java.util.Map;
047    import java.util.Set;
048    
049    /**
050     * @author Michael C. Han
051     * @author Brian Wing Shun Chan
052     * @author Edward Han
053     */
054    public class PollerRequestHandlerImpl
055            implements PollerRequestHandler, MessageListener {
056    
057            public PollerHeader getPollerHeader(String pollerRequestString) {
058                    if (Validator.isNull(pollerRequestString)) {
059                            return null;
060                    }
061    
062                    Map<String, Object>[] pollerRequestChunks =
063                            parsePollerRequestParameters(pollerRequestString);
064    
065                    return parsePollerRequestHeader(pollerRequestChunks);
066            }
067    
068            public JSONObject processRequest(String path, String pollerRequestString)
069                    throws Exception {
070    
071                    if (Validator.isNull(pollerRequestString)) {
072                            return null;
073                    }
074    
075                    Map<String, Object>[] pollerRequestChunks =
076                            parsePollerRequestParameters(pollerRequestString);
077    
078                    PollerHeader pollerHeader = parsePollerRequestHeader(
079                            pollerRequestChunks);
080    
081                    if (pollerHeader == null) {
082                            return null;
083                    }
084    
085                    boolean receiveRequest = isReceiveRequest(path);
086    
087                    String pollerSessionId = getPollerSessionId(pollerHeader);
088    
089                    PollerSession pollerSession = null;
090    
091                    synchronized (_pollerSessions) {
092                            pollerSession = _pollerSessions.get(pollerSessionId);
093    
094                            if ((pollerSession == null) && receiveRequest) {
095                                    pollerSession = new PollerSession(pollerSessionId);
096    
097                                    _pollerSessions.put(pollerSessionId, pollerSession);
098                            }
099                    }
100    
101                    List<PollerRequest> pollerRequests = createPollerRequests(
102                            pollerHeader, pollerRequestChunks, receiveRequest);
103    
104                    executePollerRequests(pollerSession, pollerRequests);
105    
106                    if (receiveRequest) {
107                            return createPollerResponseHeader(pollerHeader);
108                    }
109                    else {
110                            return null;
111                    }
112            }
113    
114            public void receive(Message message) {
115                    Object messagePayload = message.getPayload();
116    
117                    if (!(messagePayload instanceof PollerResponse)) {
118                            return;
119                    }
120    
121                    PollerResponse pollerResponse = (PollerResponse) messagePayload;
122    
123                    PollerHeader pollerHeader = pollerResponse.getPollerHeader();
124    
125                    String pollerSessionId = getPollerSessionId(pollerHeader);
126    
127                    synchronized (_pollerSessions) {
128                            PollerSession pollerSession = _pollerSessions.get(pollerSessionId);
129    
130                            if ((pollerSession != null) &&
131                                    pollerSession.completePortletProcessing(
132                                            pollerResponse.getPortletId(), message.getResponseId())) {
133    
134                                    _pollerSessions.remove(pollerSessionId);
135                            }
136                    }
137            }
138    
139            protected PollerRequest createPollerRequest(
140                            boolean receiveRequest, PollerHeader pollerHeader, String portletId)
141                    throws Exception {
142    
143                    return createPollerRequest(
144                            receiveRequest, pollerHeader, portletId,
145                            new HashMap<String, String>(), null);
146            }
147    
148            protected PollerRequest createPollerRequest(
149                            boolean receiveRequest, PollerHeader pollerHeader, String portletId,
150                            Map<String, String> parameterMap, String chunkId)
151                    throws Exception {
152    
153                    PollerProcessor pollerProcessor =
154                            PollerProcessorUtil.getPollerProcessor(portletId);
155    
156                    if (pollerProcessor == null) {
157                            if (_log.isWarnEnabled()) {
158                                    _log.warn(
159                                            "Poller processor not found for portlet " + portletId);
160                            }
161    
162                            return null;
163                    }
164    
165                    return new PollerRequest(
166                            pollerHeader, portletId, parameterMap, chunkId, receiveRequest);
167            }
168    
169            protected List<PollerRequest> createPollerRequests(
170                            PollerHeader pollerHeader,
171                            Map<String, Object>[] pollerRequestChunks, boolean receiveRequest)
172                    throws Exception {
173    
174                    String[] portletIds = pollerHeader.getPortletIds();
175    
176                    List<PollerRequest> pollerRequests = new ArrayList<PollerRequest>(
177                            portletIds.length);
178    
179                    Set<String> receiveRequestPortletIds = null;
180    
181                    if (receiveRequest) {
182                            receiveRequestPortletIds = new HashSet<String>(
183                                    (int)(pollerRequestChunks.length / 0.75) + 1);
184                    }
185    
186                    for (int i = 1; i < pollerRequestChunks.length; i++) {
187                            Map<String, Object> pollerRequestChunk = pollerRequestChunks[i];
188    
189                            String portletId = (String)pollerRequestChunk.get("portletId");
190                            Map<String, String> parameterMap = parseData(pollerRequestChunk);
191                            String chunkId = (String)pollerRequestChunk.get("chunkId");
192    
193                            try {
194                                    PollerRequest pollerRequest = createPollerRequest(
195                                            receiveRequest, pollerHeader, portletId, parameterMap,
196                                            chunkId);
197    
198                                    pollerRequests.add(pollerRequest);
199    
200                                    if (receiveRequest) {
201                                            receiveRequestPortletIds.add(portletId);
202                                    }
203                            }
204                            catch (Exception e) {
205                                    _log.error(e, e);
206                            }
207                    }
208    
209                    if (receiveRequest) {
210                            for (String portletId : portletIds) {
211                                    if (receiveRequestPortletIds.contains(portletId)) {
212                                            continue;
213                                    }
214    
215                                    try {
216                                            PollerRequest pollerRequest = createPollerRequest(
217                                                    receiveRequest, pollerHeader, portletId);
218    
219                                            pollerRequests.add(pollerRequest);
220                                    }
221                                    catch (Exception e) {
222                                            _log.error(e, e);
223                                    }
224                            }
225                    }
226    
227                    return pollerRequests;
228            }
229    
230            protected JSONObject createPollerResponseHeader(PollerHeader pollerHeader)
231                    throws SystemException {
232    
233                    if (pollerHeader == null) {
234                            return null;
235                    }
236    
237                    boolean suspendPolling = false;
238    
239                    if (pollerHeader.isStartPolling()) {
240                            BrowserTrackerLocalServiceUtil.updateBrowserTracker(
241                                    pollerHeader.getUserId(), pollerHeader.getBrowserKey());
242                    }
243                    else {
244                            BrowserTracker browserTracker =
245                                    BrowserTrackerLocalServiceUtil.getBrowserTracker(
246                                            pollerHeader.getUserId(), pollerHeader.getBrowserKey());
247    
248                            if (browserTracker.getBrowserKey() !=
249                                    pollerHeader.getBrowserKey()) {
250    
251                                    suspendPolling = true;
252                            }
253                    }
254    
255                    JSONObject pollerResponseHeaderJSONObject =
256                            JSONFactoryUtil.createJSONObject();
257    
258                    pollerResponseHeaderJSONObject.put("userId", pollerHeader.getUserId());
259                    pollerResponseHeaderJSONObject.put(
260                            "initialRequest", pollerHeader.isInitialRequest());
261                    pollerResponseHeaderJSONObject.put("suspendPolling", suspendPolling);
262    
263                    return pollerResponseHeaderJSONObject;
264            }
265    
266            protected void executePollerRequests(
267                    PollerSession pollerSession, List<PollerRequest> pollerRequests) {
268    
269                    for (PollerRequest pollerRequest : pollerRequests) {
270                            PollerRequestResponsePair pollerRequestResponsePair =
271                                    new PollerRequestResponsePair(pollerRequest);
272    
273                            String responseId = null;
274    
275                            if (pollerRequest.isReceiveRequest()) {
276                                    responseId = PortalUUIDUtil.generate();
277    
278                                    PollerResponse pollerResponse = new DefaultPollerResponse(
279                                            pollerRequest.getPollerHeader(),
280                                            pollerRequest.getPortletId(), pollerRequest.getChunkId());
281    
282                                    pollerRequestResponsePair.setPollerResponse(pollerResponse);
283    
284                                    if (!pollerSession.beginPortletProcessing(
285                                                    pollerRequestResponsePair, responseId)) {
286    
287                                            continue;
288                                    }
289                            }
290    
291                            Message message = new Message();
292    
293                            message.setPayload(pollerRequestResponsePair);
294    
295                            if (pollerRequest.isReceiveRequest()) {
296                                    message.setResponseId(responseId);
297    
298                                    message.setResponseDestinationName(
299                                            DestinationNames.POLLER_RESPONSE);
300                            }
301    
302                            MessageBusUtil.sendMessage(DestinationNames.POLLER, message);
303                    }
304            }
305    
306            protected String fixPollerRequestString(String pollerRequestString) {
307                    if (Validator.isNull(pollerRequestString)) {
308                            return null;
309                    }
310    
311                    return StringUtil.replace(
312                            pollerRequestString,
313                            new String[] {
314                                    StringPool.OPEN_CURLY_BRACE, StringPool.CLOSE_CURLY_BRACE,
315                                    _ESCAPED_OPEN_CURLY_BRACE, _ESCAPED_CLOSE_CURLY_BRACE
316                            },
317                            new String[] {
318                                    _OPEN_HASH_MAP_WRAPPER, StringPool.DOUBLE_CLOSE_CURLY_BRACE,
319                                    StringPool.OPEN_CURLY_BRACE, StringPool.CLOSE_CURLY_BRACE
320                            });
321            }
322    
323            protected String getPollerSessionId(PollerHeader pollerHeader) {
324                    return String.valueOf(pollerHeader.getUserId());
325            }
326    
327            protected long getUserId(long companyId, String userIdString) {
328                    long userId = 0;
329    
330                    try {
331                            Company company = CompanyLocalServiceUtil.getCompany(companyId);
332    
333                            userId = GetterUtil.getLong(
334                                    Encryptor.decrypt(company.getKeyObj(), userIdString));
335                    }
336                    catch (Exception e) {
337                            _log.error(
338                                    "Invalid credentials for company id " + companyId +
339                                            " and user id " + userIdString);
340                    }
341    
342                    return userId;
343            }
344    
345            protected boolean isReceiveRequest(String path) {
346                    if ((path != null) && path.endsWith(_PATH_RECEIVE)) {
347                            return true;
348                    }
349                    else {
350                            return false;
351                    }
352            }
353    
354            protected Map<String, String> parseData(
355                    Map<String, Object> pollerRequestChunk)
356                    throws Exception {
357    
358                    Map<String, Object> oldParameterMap =
359                            (Map<String, Object>)pollerRequestChunk.get("data");
360    
361                    Map<String, String> newParameterMap = new HashMap<String, String>();
362    
363                    if (oldParameterMap == null) {
364                            return newParameterMap;
365                    }
366    
367                    for (Map.Entry<String, Object> entry : oldParameterMap.entrySet()) {
368                            newParameterMap.put(
369                                    entry.getKey(), String.valueOf(entry.getValue()));
370                    }
371    
372                    return newParameterMap;
373            }
374    
375            protected PollerHeader parsePollerRequestHeader(
376                    Map<String, Object>[] pollerRequestChunks) {
377    
378                    if ((pollerRequestChunks == null) || (pollerRequestChunks.length < 1)) {
379                            return null;
380                    }
381    
382                    Map<String, Object> pollerRequestChunk = pollerRequestChunks[0];
383    
384                    long companyId = GetterUtil.getLong(
385                            String.valueOf(pollerRequestChunk.get("companyId")));
386                    String userIdString = GetterUtil.getString(
387                            String.valueOf(pollerRequestChunk.get("userId")));
388                    long browserKey = GetterUtil.getLong(
389                            String.valueOf(pollerRequestChunk.get("browserKey")));
390                    String[] portletIds = StringUtil.split(
391                            String.valueOf(pollerRequestChunk.get("portletIds")));
392                    boolean initialRequest = GetterUtil.getBoolean(
393                            String.valueOf(pollerRequestChunk.get("initialRequest")));
394                    boolean startPolling = GetterUtil.getBoolean(
395                            String.valueOf(pollerRequestChunk.get("startPolling")));
396    
397                    long userId = getUserId(companyId, userIdString);
398    
399                    if (userId == 0) {
400                            return null;
401                    }
402    
403                    return new PollerHeader(
404                            companyId, userId, browserKey, portletIds, initialRequest,
405                            startPolling);
406            }
407    
408            protected Map<String, Object>[] parsePollerRequestParameters(
409                    String pollerRequestString) {
410    
411                    String fixedPollerRequestString = fixPollerRequestString(
412                            pollerRequestString);
413    
414                    return (Map<String, Object>[])JSONFactoryUtil.deserialize(
415                            fixedPollerRequestString);
416            }
417    
418            private static final String _ESCAPED_CLOSE_CURLY_BRACE =
419                    "[$CLOSE_CURLY_BRACE$]";
420    
421            private static final String _ESCAPED_OPEN_CURLY_BRACE =
422                    "[$OPEN_CURLY_BRACE$]";
423    
424            private static final String _OPEN_HASH_MAP_WRAPPER =
425                    "{\"javaClass\":\"java.util.HashMap\",\"map\":{";
426    
427            private static final String _PATH_RECEIVE = "/receive";
428    
429            private static Log _log = LogFactoryUtil.getLog(
430                    PollerRequestHandlerImpl.class);
431    
432            private Map<String, PollerSession> _pollerSessions =
433                    new HashMap<String, PollerSession>();
434    
435    }