001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterEventType;
021    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022    import com.liferay.portal.kernel.cluster.ClusterNode;
023    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024    import com.liferay.portal.kernel.cluster.ClusterRequest;
025    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
027    import com.liferay.portal.kernel.exception.SystemException;
028    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
029    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
030    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.search.BooleanClauseOccur;
034    import com.liferay.portal.kernel.search.Field;
035    import com.liferay.portal.kernel.util.ArrayUtil;
036    import com.liferay.portal.kernel.util.GetterUtil;
037    import com.liferay.portal.kernel.util.MethodHandler;
038    import com.liferay.portal.kernel.util.MethodKey;
039    import com.liferay.portal.kernel.util.ObjectValuePair;
040    import com.liferay.portal.kernel.util.StringPool;
041    import com.liferay.portal.kernel.util.StringUtil;
042    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
043    import com.liferay.portal.kernel.util.Validator;
044    import com.liferay.portal.model.CompanyConstants;
045    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
046    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
047    import com.liferay.portal.security.auth.TransientTokenUtil;
048    import com.liferay.portal.util.PortalInstances;
049    import com.liferay.portal.util.PropsValues;
050    import com.liferay.util.lucene.KeywordsUtil;
051    
052    import java.io.IOException;
053    import java.io.InputStream;
054    import java.io.OutputStream;
055    
056    import java.net.InetAddress;
057    import java.net.URL;
058    import java.net.URLConnection;
059    
060    import java.util.HashSet;
061    import java.util.List;
062    import java.util.Map;
063    import java.util.Set;
064    import java.util.concurrent.BlockingQueue;
065    import java.util.concurrent.ConcurrentHashMap;
066    
067    import org.apache.lucene.analysis.Analyzer;
068    import org.apache.lucene.analysis.TokenStream;
069    import org.apache.lucene.document.Document;
070    import org.apache.lucene.index.Term;
071    import org.apache.lucene.queryParser.QueryParser;
072    import org.apache.lucene.search.BooleanClause;
073    import org.apache.lucene.search.BooleanQuery;
074    import org.apache.lucene.search.IndexSearcher;
075    import org.apache.lucene.search.NumericRangeQuery;
076    import org.apache.lucene.search.Query;
077    import org.apache.lucene.search.TermQuery;
078    import org.apache.lucene.search.TermRangeQuery;
079    import org.apache.lucene.search.WildcardQuery;
080    import org.apache.lucene.search.highlight.Highlighter;
081    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
082    import org.apache.lucene.search.highlight.QueryScorer;
083    import org.apache.lucene.search.highlight.SimpleFragmenter;
084    import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
085    import org.apache.lucene.search.highlight.WeightedTerm;
086    import org.apache.lucene.util.Version;
087    
088    /**
089     * @author Brian Wing Shun Chan
090     * @author Harry Mark
091     * @author Bruno Farache
092     * @author Shuyang Zhou
093     * @author Tina Tian
094     * @author Hugo Huijser
095     */
096    public class LuceneHelperImpl implements LuceneHelper {
097    
098            public void addDocument(long companyId, Document document)
099                    throws IOException {
100    
101                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
102    
103                    indexAccessor.addDocument(document);
104            }
105    
106            public void addExactTerm(
107                    BooleanQuery booleanQuery, String field, String value) {
108    
109                    addTerm(booleanQuery, field, value, false);
110            }
111    
112            public void addNumericRangeTerm(
113                    BooleanQuery booleanQuery, String field, String startValue,
114                    String endValue) {
115    
116                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
117                            field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
118                            true, true);
119    
120                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
121            }
122    
123            public void addRangeTerm(
124                    BooleanQuery booleanQuery, String field, String startValue,
125                    String endValue) {
126    
127                    boolean includesLower = true;
128    
129                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
130                            includesLower = false;
131                    }
132    
133                    boolean includesUpper = true;
134    
135                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
136                            includesUpper = false;
137                    }
138    
139                    TermRangeQuery termRangeQuery = new TermRangeQuery(
140                            field, startValue, endValue, includesLower, includesUpper);
141    
142                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
143            }
144    
145            public void addRequiredTerm(
146                    BooleanQuery booleanQuery, String field, String value, boolean like) {
147    
148                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
149            }
150    
151            public void addRequiredTerm(
152                    BooleanQuery booleanQuery, String field, String[] values,
153                    boolean like) {
154    
155                    if (values == null) {
156                            return;
157                    }
158    
159                    BooleanQuery query = new BooleanQuery();
160    
161                    for (String value : values) {
162                            addTerm(query, field, value, like);
163                    }
164    
165                    booleanQuery.add(query, BooleanClause.Occur.MUST);
166            }
167    
168            public void addTerm(
169                    BooleanQuery booleanQuery, String field, String value, boolean like) {
170    
171                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
172            }
173    
174            public void addTerm(
175                    BooleanQuery booleanQuery, String field, String value, boolean like,
176                    BooleanClauseOccur booleanClauseOccur) {
177    
178                    if (Validator.isNull(value)) {
179                            return;
180                    }
181    
182                    if (like) {
183                            value = StringUtil.replace(
184                                    value, StringPool.PERCENT, StringPool.BLANK);
185                    }
186    
187                    try {
188                            QueryParser queryParser = new QueryParser(
189                                    getVersion(), field, getAnalyzer());
190    
191                            Query query = null;
192    
193                            try {
194                                    query = queryParser.parse(value);
195                            }
196                            catch (Exception e) {
197                                    query = queryParser.parse(KeywordsUtil.escape(value));
198                            }
199    
200                            BooleanClause.Occur occur = null;
201    
202                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
203                                    occur = BooleanClause.Occur.MUST;
204                            }
205                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
206                                    occur = BooleanClause.Occur.MUST_NOT;
207                            }
208                            else {
209                                    occur = BooleanClause.Occur.SHOULD;
210                            }
211    
212                            _includeIfUnique(booleanQuery, query, occur, like);
213                    }
214                    catch (Exception e) {
215                            _log.error(e, e);
216                    }
217            }
218    
219            public void addTerm(
220                    BooleanQuery booleanQuery, String field, String[] values,
221                    boolean like) {
222    
223                    for (String value : values) {
224                            addTerm(booleanQuery, field, value, like);
225                    }
226            }
227    
228            public int countScoredFieldNames(Query query, String[] filedNames) {
229                    int count = 0;
230    
231                    for (String fieldName : filedNames) {
232                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
233                                    query, false, fieldName);
234    
235                            if ((weightedTerms.length > 0) &&
236                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
237    
238                                    count++;
239                            }
240                    }
241    
242                    return count;
243            }
244    
245            public void delete(long companyId) {
246                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
247    
248                    if (indexAccessor == null) {
249                            return;
250                    }
251    
252                    indexAccessor.delete();
253            }
254    
255            public void deleteDocuments(long companyId, Term term) throws IOException {
256                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
257    
258                    if (indexAccessor == null) {
259                            return;
260                    }
261    
262                    indexAccessor.deleteDocuments(term);
263            }
264    
265            public void dumpIndex(long companyId, OutputStream outputStream)
266                    throws IOException {
267    
268                    long lastGeneration = getLastGeneration(companyId);
269    
270                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
271                            if (_log.isDebugEnabled()) {
272                                    _log.debug(
273                                            "Dump index from cluster is not enabled for " + companyId);
274                            }
275    
276                            return;
277                    }
278    
279                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
280    
281                    if (indexAccessor == null) {
282                            return;
283                    }
284    
285                    indexAccessor.dumpIndex(outputStream);
286            }
287    
288            public Analyzer getAnalyzer() {
289                    return _analyzer;
290            }
291    
292            public long getLastGeneration(long companyId) {
293                    if (!isLoadIndexFromClusterEnabled()) {
294                            return IndexAccessor.DEFAULT_LAST_GENERATION;
295                    }
296    
297                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
298    
299                    if (indexAccessor == null) {
300                            return IndexAccessor.DEFAULT_LAST_GENERATION;
301                    }
302    
303                    return indexAccessor.getLastGeneration();
304            }
305    
306            public InputStream getLoadIndexesInputStreamFromCluster(
307                            long companyId, Address bootupAddress)
308                    throws SystemException {
309    
310                    if (!isLoadIndexFromClusterEnabled()) {
311                            return null;
312                    }
313    
314                    InputStream inputStream = null;
315    
316                    try {
317                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
318                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
319    
320                            URL url = bootupClusterNodeObjectValuePair.getValue();
321    
322                            URLConnection urlConnection = url.openConnection();
323    
324                            urlConnection.setDoOutput(true);
325    
326                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
327                                    urlConnection.getOutputStream());
328    
329                            unsyncPrintWriter.write("transientToken=");
330                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
331                            unsyncPrintWriter.write("&companyId=");
332                            unsyncPrintWriter.write(String.valueOf(companyId));
333    
334                            unsyncPrintWriter.close();
335    
336                            inputStream = urlConnection.getInputStream();
337    
338                            return inputStream;
339                    }
340                    catch (IOException ioe) {
341                            throw new SystemException(ioe);
342                    }
343            }
344    
345            public String[] getQueryTerms(Query query) {
346                    String queryString = StringUtil.replace(
347                            query.toString(), StringPool.STAR, StringPool.BLANK);
348    
349                    Query tempQuery = null;
350    
351                    try {
352                            QueryParser queryParser = new QueryParser(
353                                    getVersion(), StringPool.BLANK, getAnalyzer());
354    
355                            tempQuery = queryParser.parse(queryString);
356                    }
357                    catch (Exception e) {
358                            if (_log.isWarnEnabled()) {
359                                    _log.warn("Unable to parse " + queryString);
360                            }
361    
362                            tempQuery = query;
363                    }
364    
365                    WeightedTerm[] weightedTerms = null;
366    
367                    for (String fieldName : Field.KEYWORDS) {
368                            weightedTerms = QueryTermExtractor.getTerms(
369                                    tempQuery, false, fieldName);
370    
371                            if (weightedTerms.length > 0) {
372                                    break;
373                            }
374                    }
375    
376                    Set<String> queryTerms = new HashSet<String>();
377    
378                    for (WeightedTerm weightedTerm : weightedTerms) {
379                            queryTerms.add(weightedTerm.getTerm());
380                    }
381    
382                    return queryTerms.toArray(new String[queryTerms.size()]);
383            }
384    
385            public IndexSearcher getSearcher(long companyId, boolean readOnly)
386                    throws IOException {
387    
388                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
389    
390                    IndexSearcher indexSearcher = new IndexSearcher(
391                            indexAccessor.getLuceneDir(), readOnly);
392    
393                    indexSearcher.setDefaultFieldSortScoring(true, true);
394                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
395    
396                    return indexSearcher;
397            }
398    
399            public String getSnippet(
400                            Query query, String field, String s, int maxNumFragments,
401                            int fragmentLength, String fragmentSuffix, String preTag,
402                            String postTag)
403                    throws IOException {
404    
405                    SimpleHTMLFormatter simpleHTMLFormatter = new SimpleHTMLFormatter(
406                            preTag, postTag);
407    
408                    QueryScorer queryScorer = new QueryScorer(query, field);
409    
410                    Highlighter highlighter = new Highlighter(
411                            simpleHTMLFormatter, queryScorer);
412    
413                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
414    
415                    TokenStream tokenStream = getAnalyzer().tokenStream(
416                            field, new UnsyncStringReader(s));
417    
418                    try {
419                            String snippet = highlighter.getBestFragments(
420                                    tokenStream, s, maxNumFragments, fragmentSuffix);
421    
422                            if (Validator.isNotNull(snippet) &&
423                                    !StringUtil.endsWith(snippet, fragmentSuffix)) {
424    
425                                    snippet = snippet.concat(fragmentSuffix);
426                            }
427    
428                            return snippet;
429                    }
430                    catch (InvalidTokenOffsetsException itoe) {
431                            throw new IOException(itoe.getMessage());
432                    }
433            }
434    
435            public Version getVersion() {
436                    return _version;
437            }
438    
439            public boolean isLoadIndexFromClusterEnabled() {
440                    if (PropsValues.CLUSTER_LINK_ENABLED &&
441                            PropsValues.LUCENE_REPLICATE_WRITE) {
442    
443                            return true;
444                    }
445    
446                    if (_log.isDebugEnabled()) {
447                            _log.debug("Load index from cluster is not enabled");
448                    }
449    
450                    return false;
451            }
452    
453            public void loadIndex(long companyId, InputStream inputStream)
454                    throws IOException {
455    
456                    if (!isLoadIndexFromClusterEnabled()) {
457                            return;
458                    }
459    
460                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
461    
462                    if (indexAccessor == null) {
463                            return;
464                    }
465    
466                    indexAccessor.loadIndex(inputStream);
467            }
468    
469            public Address selectBootupClusterAddress(
470                            long companyId, long localLastGeneration)
471                    throws SystemException {
472    
473                    if (!isLoadIndexFromClusterEnabled()) {
474                            return null;
475                    }
476    
477                    List<Address> clusterNodeAddresses =
478                            ClusterExecutorUtil.getClusterNodeAddresses();
479    
480                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
481    
482                    if (clusterNodeAddressesCount <= 1) {
483                            if (_log.isDebugEnabled()) {
484                                    _log.debug(
485                                            "Do not load indexes because there is either one portal " +
486                                                    "instance or no portal instances in the cluster");
487                            }
488    
489                            return null;
490                    }
491    
492                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
493                            new MethodHandler(_getLastGenerationMethodKey, companyId), true);
494    
495                    FutureClusterResponses futureClusterResponses =
496                            ClusterExecutorUtil.execute(clusterRequest);
497    
498                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
499                            futureClusterResponses.getPartialResults();
500    
501                    Address bootupAddress = null;
502    
503                    do {
504                            clusterNodeAddressesCount--;
505    
506                            ClusterNodeResponse clusterNodeResponse = null;
507    
508                            try {
509                                    clusterNodeResponse = clusterNodeResponses.poll(
510                                            _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
511                                            java.util.concurrent.TimeUnit.MILLISECONDS);
512                            }
513                            catch (Exception e) {
514                                    throw new SystemException(e);
515                            }
516    
517                            if (clusterNodeResponse == null) {
518                                    if (_log.isDebugEnabled()) {
519                                            _log.debug(
520                                                    "Unable to get cluster node response in " +
521                                                            _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT +
522                                                                    java.util.concurrent.TimeUnit.MILLISECONDS);
523                                    }
524    
525                                    continue;
526                            }
527    
528                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
529    
530                            if (clusterNode.getPort() > 0) {
531                                    try {
532                                            long remoteLastGeneration =
533                                                    (Long)clusterNodeResponse.getResult();
534    
535                                            if (remoteLastGeneration > localLastGeneration) {
536                                                    bootupAddress = clusterNodeResponse.getAddress();
537    
538                                                    break;
539                                            }
540                                    }
541                                    catch (Exception e) {
542                                            if (_log.isDebugEnabled()) {
543                                                    _log.debug(
544                                                            "Suppress exception caused by remote method " +
545                                                                    "invocation",
546                                                            e);
547                                            }
548    
549                                            continue;
550                                    }
551                            }
552                            else {
553                                    if (_log.isDebugEnabled()) {
554                                            _log.debug(
555                                                    "Cluster node " + clusterNode + " has invalid port");
556                                    }
557                            }
558                    } while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
559    
560                    return bootupAddress;
561            }
562    
563            public void setAnalyzer(Analyzer analyzer) {
564                    _analyzer = analyzer;
565            }
566    
567            public void setVersion(Version version) {
568                    _version = version;
569            }
570    
571            public void shutdown() {
572                    if (_luceneIndexThreadPoolExecutor != null) {
573                            _luceneIndexThreadPoolExecutor.shutdownNow();
574    
575                            try {
576                                    _luceneIndexThreadPoolExecutor.awaitTermination(
577                                            60, java.util.concurrent.TimeUnit.SECONDS);
578                            }
579                            catch (InterruptedException ie) {
580                                    _log.error("Lucene indexer shutdown interrupted", ie);
581                            }
582                    }
583    
584                    if (isLoadIndexFromClusterEnabled()) {
585                            ClusterExecutorUtil.removeClusterEventListener(
586                                    _loadIndexClusterEventListener);
587                    }
588    
589                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
590                            indexAccessor.close();
591                    }
592            }
593    
594            public void startup(long companyId) {
595                    if (PropsValues.INDEX_ON_STARTUP) {
596                            if (_log.isInfoEnabled()) {
597                                    _log.info("Indexing Lucene on startup");
598                            }
599    
600                            LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
601    
602                            if (PropsValues.INDEX_WITH_THREAD) {
603                                    _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
604                            }
605                            else {
606                                    luceneIndexer.reindex();
607                            }
608                    }
609            }
610    
611            public void updateDocument(long companyId, Term term, Document document)
612                    throws IOException {
613    
614                    IndexAccessor indexAccessor = _getIndexAccessor(companyId);
615    
616                    indexAccessor.updateDocument(term, document);
617            }
618    
619            private LuceneHelperImpl() {
620                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
621                            _luceneIndexThreadPoolExecutor =
622                                    PortalExecutorManagerUtil.getPortalExecutor(
623                                            LuceneHelperImpl.class.getName());
624                    }
625    
626                    if (isLoadIndexFromClusterEnabled()) {
627                            _loadIndexClusterEventListener =
628                                    new LoadIndexClusterEventListener();
629    
630                            ClusterExecutorUtil.addClusterEventListener(
631                                    _loadIndexClusterEventListener);
632                    }
633            }
634    
635            private ObjectValuePair<String, URL>
636                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
637                    throws SystemException {
638    
639                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
640                            new MethodHandler(
641                                    _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
642                            bootupAddress);
643    
644                    FutureClusterResponses futureClusterResponses =
645                            ClusterExecutorUtil.execute(clusterRequest);
646    
647                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
648                            futureClusterResponses.getPartialResults();
649    
650                    try {
651                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
652                                    _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
653                                    java.util.concurrent.TimeUnit.MILLISECONDS);
654    
655                            String transientToken = (String)clusterNodeResponse.getResult();
656    
657                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
658    
659                            InetAddress inetAddress = clusterNode.getInetAddress();
660    
661                            URL url = new URL(
662                                    "http", inetAddress.getHostAddress(), clusterNode.getPort(),
663                                    "/lucene/dump");
664    
665                            return new ObjectValuePair<String, URL>(transientToken, url);
666                    }
667                    catch (Exception e) {
668                            throw new SystemException(e);
669                    }
670            }
671    
672            private IndexAccessor _getIndexAccessor(long companyId) {
673                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
674    
675                    if (indexAccessor != null) {
676                            return indexAccessor;
677                    }
678    
679                    synchronized (this) {
680                            indexAccessor = _indexAccessors.get(companyId);
681    
682                            if (indexAccessor == null) {
683                                    indexAccessor = new IndexAccessorImpl(companyId);
684    
685                                    if (isLoadIndexFromClusterEnabled()) {
686                                            InputStream inputStream = null;
687    
688                                            try {
689                                                    Address bootupAddress = selectBootupClusterAddress(
690                                                            companyId, IndexAccessor.DEFAULT_LAST_GENERATION);
691    
692                                                    if (bootupAddress != null) {
693                                                            inputStream = getLoadIndexesInputStreamFromCluster(
694                                                                    companyId, bootupAddress);
695    
696                                                            indexAccessor.loadIndex(inputStream);
697                                                    }
698    
699                                                    indexAccessor.enableDumpIndex();
700                                            }
701                                            catch (Exception e) {
702                                                    _log.error(
703                                                            "Unable to load index for company " +
704                                                                    indexAccessor.getCompanyId(),
705                                                            e);
706                                            }
707                                            finally {
708                                                    if (inputStream != null) {
709                                                            try {
710                                                                    inputStream.close();
711                                                            }
712                                                            catch (IOException ioe) {
713                                                                    _log.error(
714                                                                            "Unable to close input stream for " +
715                                                                                    "company " +
716                                                                                            indexAccessor.getCompanyId(),
717                                                                            ioe);
718                                                            }
719                                                    }
720                                            }
721                                    }
722    
723                                    _indexAccessors.put(companyId, indexAccessor);
724                            }
725                    }
726    
727                    return indexAccessor;
728            }
729    
730            private void _includeIfUnique(
731                    BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
732                    boolean like) {
733    
734                    if (query instanceof TermQuery) {
735                            Set<Term> terms = new HashSet<Term>();
736    
737                            query.extractTerms(terms);
738    
739                            for (Term term : terms) {
740                                    String termValue = term.text();
741    
742                                    if (like) {
743                                            term = term.createTerm(
744                                                    StringPool.STAR.concat(termValue).concat(
745                                                            StringPool.STAR));
746    
747                                            query = new WildcardQuery(term);
748                                    }
749                                    else {
750                                            query = new TermQuery(term);
751                                    }
752    
753                                    boolean included = false;
754    
755                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
756                                            if (query.equals(booleanClause.getQuery())) {
757                                                    included = true;
758                                            }
759                                    }
760    
761                                    if (!included) {
762                                            booleanQuery.add(query, occur);
763                                    }
764                            }
765                    }
766                    else if (query instanceof BooleanQuery) {
767                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
768    
769                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
770                                    _includeIfUnique(
771                                            booleanQuery, booleanClause.getQuery(),
772                                            booleanClause.getOccur(), like);
773                            }
774                    }
775                    else {
776                            boolean included = false;
777    
778                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
779                                    if (query.equals(booleanClause.getQuery())) {
780                                            included = true;
781                                    }
782                            }
783    
784                            if (!included) {
785                                    booleanQuery.add(query, occur);
786                            }
787                    }
788            }
789    
790            private static final long _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT = 10000;
791    
792            private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
793    
794            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
795    
796            private static MethodKey _createTokenMethodKey =
797                    new MethodKey(TransientTokenUtil.class.getName(), "createToken",
798                    long.class);
799            private static MethodKey _getLastGenerationMethodKey =
800                    new MethodKey(LuceneHelperUtil.class.getName(), "getLastGeneration",
801                    long.class);
802    
803            private Analyzer _analyzer;
804            private Map<Long, IndexAccessor> _indexAccessors =
805                    new ConcurrentHashMap<Long, IndexAccessor>();
806            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
807            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
808            private Version _version;
809    
810            private class LoadIndexClusterEventListener
811                    implements ClusterEventListener {
812    
813                    public void processClusterEvent(ClusterEvent clusterEvent) {
814                            ClusterEventType clusterEventType =
815                                    clusterEvent.getClusterEventType();
816    
817                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
818                                    return;
819                            }
820    
821                            List<Address> clusterNodeAddresses =
822                                    ClusterExecutorUtil.getClusterNodeAddresses();
823                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
824    
825                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
826                                    if (_log.isDebugEnabled()) {
827                                            _log.debug(
828                                                    "Number of original cluster members is greater than " +
829                                                            "one");
830                                    }
831    
832                                    return;
833                            }
834    
835                            long[] companyIds = PortalInstances.getCompanyIds();
836    
837                            for (long companyId : companyIds) {
838                                    loadIndexes(companyId);
839                            }
840    
841                            loadIndexes(CompanyConstants.SYSTEM);
842                    }
843    
844                    private void loadIndexes(long companyId) {
845                            long lastGeneration = getLastGeneration(companyId);
846    
847                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
848                                    return;
849                            }
850    
851                            try {
852                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
853                            }
854                            catch (Exception e) {
855                                    _log.error(
856                                            "Unable to load indexes for company " + companyId, e);
857                            }
858                    }
859    
860            }
861    
862    }