001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterEventType;
021 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022 import com.liferay.portal.kernel.cluster.ClusterNode;
023 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
024 import com.liferay.portal.kernel.cluster.ClusterRequest;
025 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
026 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
027 import com.liferay.portal.kernel.exception.SystemException;
028 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
029 import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
030 import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.search.BooleanClauseOccur;
034 import com.liferay.portal.kernel.search.Field;
035 import com.liferay.portal.kernel.util.ArrayUtil;
036 import com.liferay.portal.kernel.util.GetterUtil;
037 import com.liferay.portal.kernel.util.MethodHandler;
038 import com.liferay.portal.kernel.util.MethodKey;
039 import com.liferay.portal.kernel.util.ObjectValuePair;
040 import com.liferay.portal.kernel.util.StringPool;
041 import com.liferay.portal.kernel.util.StringUtil;
042 import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
043 import com.liferay.portal.kernel.util.Validator;
044 import com.liferay.portal.model.CompanyConstants;
045 import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
046 import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
047 import com.liferay.portal.security.auth.TransientTokenUtil;
048 import com.liferay.portal.util.PortalInstances;
049 import com.liferay.portal.util.PropsValues;
050 import com.liferay.util.lucene.KeywordsUtil;
051
052 import java.io.IOException;
053 import java.io.InputStream;
054 import java.io.OutputStream;
055
056 import java.net.InetAddress;
057 import java.net.URL;
058 import java.net.URLConnection;
059
060 import java.util.HashSet;
061 import java.util.List;
062 import java.util.Map;
063 import java.util.Set;
064 import java.util.concurrent.BlockingQueue;
065 import java.util.concurrent.ConcurrentHashMap;
066
067 import org.apache.lucene.analysis.Analyzer;
068 import org.apache.lucene.analysis.TokenStream;
069 import org.apache.lucene.document.Document;
070 import org.apache.lucene.index.Term;
071 import org.apache.lucene.queryParser.QueryParser;
072 import org.apache.lucene.search.BooleanClause;
073 import org.apache.lucene.search.BooleanQuery;
074 import org.apache.lucene.search.IndexSearcher;
075 import org.apache.lucene.search.NumericRangeQuery;
076 import org.apache.lucene.search.Query;
077 import org.apache.lucene.search.TermQuery;
078 import org.apache.lucene.search.TermRangeQuery;
079 import org.apache.lucene.search.WildcardQuery;
080 import org.apache.lucene.search.highlight.Highlighter;
081 import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
082 import org.apache.lucene.search.highlight.QueryScorer;
083 import org.apache.lucene.search.highlight.SimpleFragmenter;
084 import org.apache.lucene.search.highlight.SimpleHTMLFormatter;
085 import org.apache.lucene.search.highlight.WeightedTerm;
086 import org.apache.lucene.util.Version;
087
088
096 public class LuceneHelperImpl implements LuceneHelper {
097
098 public void addDocument(long companyId, Document document)
099 throws IOException {
100
101 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
102
103 indexAccessor.addDocument(document);
104 }
105
106 public void addExactTerm(
107 BooleanQuery booleanQuery, String field, String value) {
108
109 addTerm(booleanQuery, field, value, false);
110 }
111
112 public void addNumericRangeTerm(
113 BooleanQuery booleanQuery, String field, String startValue,
114 String endValue) {
115
116 NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
117 field, GetterUtil.getLong(startValue), GetterUtil.getLong(endValue),
118 true, true);
119
120 booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
121 }
122
123 public void addRangeTerm(
124 BooleanQuery booleanQuery, String field, String startValue,
125 String endValue) {
126
127 boolean includesLower = true;
128
129 if ((startValue != null) && startValue.equals(StringPool.STAR)) {
130 includesLower = false;
131 }
132
133 boolean includesUpper = true;
134
135 if ((endValue != null) && endValue.equals(StringPool.STAR)) {
136 includesUpper = false;
137 }
138
139 TermRangeQuery termRangeQuery = new TermRangeQuery(
140 field, startValue, endValue, includesLower, includesUpper);
141
142 booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
143 }
144
145 public void addRequiredTerm(
146 BooleanQuery booleanQuery, String field, String value, boolean like) {
147
148 addRequiredTerm(booleanQuery, field, new String[] {value}, like);
149 }
150
151 public void addRequiredTerm(
152 BooleanQuery booleanQuery, String field, String[] values,
153 boolean like) {
154
155 if (values == null) {
156 return;
157 }
158
159 BooleanQuery query = new BooleanQuery();
160
161 for (String value : values) {
162 addTerm(query, field, value, like);
163 }
164
165 booleanQuery.add(query, BooleanClause.Occur.MUST);
166 }
167
168 public void addTerm(
169 BooleanQuery booleanQuery, String field, String value, boolean like) {
170
171 addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
172 }
173
174 public void addTerm(
175 BooleanQuery booleanQuery, String field, String value, boolean like,
176 BooleanClauseOccur booleanClauseOccur) {
177
178 if (Validator.isNull(value)) {
179 return;
180 }
181
182 if (like) {
183 value = StringUtil.replace(
184 value, StringPool.PERCENT, StringPool.BLANK);
185 }
186
187 try {
188 QueryParser queryParser = new QueryParser(
189 getVersion(), field, getAnalyzer());
190
191 Query query = null;
192
193 try {
194 query = queryParser.parse(value);
195 }
196 catch (Exception e) {
197 query = queryParser.parse(KeywordsUtil.escape(value));
198 }
199
200 BooleanClause.Occur occur = null;
201
202 if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
203 occur = BooleanClause.Occur.MUST;
204 }
205 else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
206 occur = BooleanClause.Occur.MUST_NOT;
207 }
208 else {
209 occur = BooleanClause.Occur.SHOULD;
210 }
211
212 _includeIfUnique(booleanQuery, query, occur, like);
213 }
214 catch (Exception e) {
215 _log.error(e, e);
216 }
217 }
218
219 public void addTerm(
220 BooleanQuery booleanQuery, String field, String[] values,
221 boolean like) {
222
223 for (String value : values) {
224 addTerm(booleanQuery, field, value, like);
225 }
226 }
227
228 public int countScoredFieldNames(Query query, String[] filedNames) {
229 int count = 0;
230
231 for (String fieldName : filedNames) {
232 WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
233 query, false, fieldName);
234
235 if ((weightedTerms.length > 0) &&
236 !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
237
238 count++;
239 }
240 }
241
242 return count;
243 }
244
245 public void delete(long companyId) {
246 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
247
248 if (indexAccessor == null) {
249 return;
250 }
251
252 indexAccessor.delete();
253 }
254
255 public void deleteDocuments(long companyId, Term term) throws IOException {
256 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
257
258 if (indexAccessor == null) {
259 return;
260 }
261
262 indexAccessor.deleteDocuments(term);
263 }
264
265 public void dumpIndex(long companyId, OutputStream outputStream)
266 throws IOException {
267
268 long lastGeneration = getLastGeneration(companyId);
269
270 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
271 if (_log.isDebugEnabled()) {
272 _log.debug(
273 "Dump index from cluster is not enabled for " + companyId);
274 }
275
276 return;
277 }
278
279 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
280
281 if (indexAccessor == null) {
282 return;
283 }
284
285 indexAccessor.dumpIndex(outputStream);
286 }
287
288 public Analyzer getAnalyzer() {
289 return _analyzer;
290 }
291
292 public long getLastGeneration(long companyId) {
293 if (!isLoadIndexFromClusterEnabled()) {
294 return IndexAccessor.DEFAULT_LAST_GENERATION;
295 }
296
297 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
298
299 if (indexAccessor == null) {
300 return IndexAccessor.DEFAULT_LAST_GENERATION;
301 }
302
303 return indexAccessor.getLastGeneration();
304 }
305
306 public InputStream getLoadIndexesInputStreamFromCluster(
307 long companyId, Address bootupAddress)
308 throws SystemException {
309
310 if (!isLoadIndexFromClusterEnabled()) {
311 return null;
312 }
313
314 InputStream inputStream = null;
315
316 try {
317 ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
318 _getBootupClusterNodeObjectValuePair(bootupAddress);
319
320 URL url = bootupClusterNodeObjectValuePair.getValue();
321
322 URLConnection urlConnection = url.openConnection();
323
324 urlConnection.setDoOutput(true);
325
326 UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
327 urlConnection.getOutputStream());
328
329 unsyncPrintWriter.write("transientToken=");
330 unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
331 unsyncPrintWriter.write("&companyId=");
332 unsyncPrintWriter.write(String.valueOf(companyId));
333
334 unsyncPrintWriter.close();
335
336 inputStream = urlConnection.getInputStream();
337
338 return inputStream;
339 }
340 catch (IOException ioe) {
341 throw new SystemException(ioe);
342 }
343 }
344
345 public String[] getQueryTerms(Query query) {
346 String queryString = StringUtil.replace(
347 query.toString(), StringPool.STAR, StringPool.BLANK);
348
349 Query tempQuery = null;
350
351 try {
352 QueryParser queryParser = new QueryParser(
353 getVersion(), StringPool.BLANK, getAnalyzer());
354
355 tempQuery = queryParser.parse(queryString);
356 }
357 catch (Exception e) {
358 if (_log.isWarnEnabled()) {
359 _log.warn("Unable to parse " + queryString);
360 }
361
362 tempQuery = query;
363 }
364
365 WeightedTerm[] weightedTerms = null;
366
367 for (String fieldName : Field.KEYWORDS) {
368 weightedTerms = QueryTermExtractor.getTerms(
369 tempQuery, false, fieldName);
370
371 if (weightedTerms.length > 0) {
372 break;
373 }
374 }
375
376 Set<String> queryTerms = new HashSet<String>();
377
378 for (WeightedTerm weightedTerm : weightedTerms) {
379 queryTerms.add(weightedTerm.getTerm());
380 }
381
382 return queryTerms.toArray(new String[queryTerms.size()]);
383 }
384
385 public IndexSearcher getSearcher(long companyId, boolean readOnly)
386 throws IOException {
387
388 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
389
390 IndexSearcher indexSearcher = new IndexSearcher(
391 indexAccessor.getLuceneDir(), readOnly);
392
393 indexSearcher.setDefaultFieldSortScoring(true, true);
394 indexSearcher.setSimilarity(new FieldWeightSimilarity());
395
396 return indexSearcher;
397 }
398
399 public String getSnippet(
400 Query query, String field, String s, int maxNumFragments,
401 int fragmentLength, String fragmentSuffix, String preTag,
402 String postTag)
403 throws IOException {
404
405 SimpleHTMLFormatter simpleHTMLFormatter = new SimpleHTMLFormatter(
406 preTag, postTag);
407
408 QueryScorer queryScorer = new QueryScorer(query, field);
409
410 Highlighter highlighter = new Highlighter(
411 simpleHTMLFormatter, queryScorer);
412
413 highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
414
415 TokenStream tokenStream = getAnalyzer().tokenStream(
416 field, new UnsyncStringReader(s));
417
418 try {
419 String snippet = highlighter.getBestFragments(
420 tokenStream, s, maxNumFragments, fragmentSuffix);
421
422 if (Validator.isNotNull(snippet) &&
423 !StringUtil.endsWith(snippet, fragmentSuffix)) {
424
425 snippet = snippet.concat(fragmentSuffix);
426 }
427
428 return snippet;
429 }
430 catch (InvalidTokenOffsetsException itoe) {
431 throw new IOException(itoe.getMessage());
432 }
433 }
434
435 public Version getVersion() {
436 return _version;
437 }
438
439 public boolean isLoadIndexFromClusterEnabled() {
440 if (PropsValues.CLUSTER_LINK_ENABLED &&
441 PropsValues.LUCENE_REPLICATE_WRITE) {
442
443 return true;
444 }
445
446 if (_log.isDebugEnabled()) {
447 _log.debug("Load index from cluster is not enabled");
448 }
449
450 return false;
451 }
452
453 public void loadIndex(long companyId, InputStream inputStream)
454 throws IOException {
455
456 if (!isLoadIndexFromClusterEnabled()) {
457 return;
458 }
459
460 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
461
462 if (indexAccessor == null) {
463 return;
464 }
465
466 indexAccessor.loadIndex(inputStream);
467 }
468
469 public Address selectBootupClusterAddress(
470 long companyId, long localLastGeneration)
471 throws SystemException {
472
473 if (!isLoadIndexFromClusterEnabled()) {
474 return null;
475 }
476
477 List<Address> clusterNodeAddresses =
478 ClusterExecutorUtil.getClusterNodeAddresses();
479
480 int clusterNodeAddressesCount = clusterNodeAddresses.size();
481
482 if (clusterNodeAddressesCount <= 1) {
483 if (_log.isDebugEnabled()) {
484 _log.debug(
485 "Do not load indexes because there is either one portal " +
486 "instance or no portal instances in the cluster");
487 }
488
489 return null;
490 }
491
492 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
493 new MethodHandler(_getLastGenerationMethodKey, companyId), true);
494
495 FutureClusterResponses futureClusterResponses =
496 ClusterExecutorUtil.execute(clusterRequest);
497
498 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
499 futureClusterResponses.getPartialResults();
500
501 Address bootupAddress = null;
502
503 do {
504 clusterNodeAddressesCount--;
505
506 ClusterNodeResponse clusterNodeResponse = null;
507
508 try {
509 clusterNodeResponse = clusterNodeResponses.poll(
510 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
511 java.util.concurrent.TimeUnit.MILLISECONDS);
512 }
513 catch (Exception e) {
514 throw new SystemException(e);
515 }
516
517 if (clusterNodeResponse == null) {
518 if (_log.isDebugEnabled()) {
519 _log.debug(
520 "Unable to get cluster node response in " +
521 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT +
522 java.util.concurrent.TimeUnit.MILLISECONDS);
523 }
524
525 continue;
526 }
527
528 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
529
530 if (clusterNode.getPort() > 0) {
531 try {
532 long remoteLastGeneration =
533 (Long)clusterNodeResponse.getResult();
534
535 if (remoteLastGeneration > localLastGeneration) {
536 bootupAddress = clusterNodeResponse.getAddress();
537
538 break;
539 }
540 }
541 catch (Exception e) {
542 if (_log.isDebugEnabled()) {
543 _log.debug(
544 "Suppress exception caused by remote method " +
545 "invocation",
546 e);
547 }
548
549 continue;
550 }
551 }
552 else {
553 if (_log.isDebugEnabled()) {
554 _log.debug(
555 "Cluster node " + clusterNode + " has invalid port");
556 }
557 }
558 } while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
559
560 return bootupAddress;
561 }
562
563 public void setAnalyzer(Analyzer analyzer) {
564 _analyzer = analyzer;
565 }
566
567 public void setVersion(Version version) {
568 _version = version;
569 }
570
571 public void shutdown() {
572 if (_luceneIndexThreadPoolExecutor != null) {
573 _luceneIndexThreadPoolExecutor.shutdownNow();
574
575 try {
576 _luceneIndexThreadPoolExecutor.awaitTermination(
577 60, java.util.concurrent.TimeUnit.SECONDS);
578 }
579 catch (InterruptedException ie) {
580 _log.error("Lucene indexer shutdown interrupted", ie);
581 }
582 }
583
584 if (isLoadIndexFromClusterEnabled()) {
585 ClusterExecutorUtil.removeClusterEventListener(
586 _loadIndexClusterEventListener);
587 }
588
589 for (IndexAccessor indexAccessor : _indexAccessors.values()) {
590 indexAccessor.close();
591 }
592 }
593
594 public void startup(long companyId) {
595 if (PropsValues.INDEX_ON_STARTUP) {
596 if (_log.isInfoEnabled()) {
597 _log.info("Indexing Lucene on startup");
598 }
599
600 LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
601
602 if (PropsValues.INDEX_WITH_THREAD) {
603 _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
604 }
605 else {
606 luceneIndexer.reindex();
607 }
608 }
609 }
610
611 public void updateDocument(long companyId, Term term, Document document)
612 throws IOException {
613
614 IndexAccessor indexAccessor = _getIndexAccessor(companyId);
615
616 indexAccessor.updateDocument(term, document);
617 }
618
619 private LuceneHelperImpl() {
620 if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
621 _luceneIndexThreadPoolExecutor =
622 PortalExecutorManagerUtil.getPortalExecutor(
623 LuceneHelperImpl.class.getName());
624 }
625
626 if (isLoadIndexFromClusterEnabled()) {
627 _loadIndexClusterEventListener =
628 new LoadIndexClusterEventListener();
629
630 ClusterExecutorUtil.addClusterEventListener(
631 _loadIndexClusterEventListener);
632 }
633 }
634
635 private ObjectValuePair<String, URL>
636 _getBootupClusterNodeObjectValuePair(Address bootupAddress)
637 throws SystemException {
638
639 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
640 new MethodHandler(
641 _createTokenMethodKey, _TRANSIENT_TOKEN_KEEP_ALIVE_TIME),
642 bootupAddress);
643
644 FutureClusterResponses futureClusterResponses =
645 ClusterExecutorUtil.execute(clusterRequest);
646
647 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
648 futureClusterResponses.getPartialResults();
649
650 try {
651 ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
652 _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT,
653 java.util.concurrent.TimeUnit.MILLISECONDS);
654
655 String transientToken = (String)clusterNodeResponse.getResult();
656
657 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
658
659 InetAddress inetAddress = clusterNode.getInetAddress();
660
661 URL url = new URL(
662 "http", inetAddress.getHostAddress(), clusterNode.getPort(),
663 "/lucene/dump");
664
665 return new ObjectValuePair<String, URL>(transientToken, url);
666 }
667 catch (Exception e) {
668 throw new SystemException(e);
669 }
670 }
671
672 private IndexAccessor _getIndexAccessor(long companyId) {
673 IndexAccessor indexAccessor = _indexAccessors.get(companyId);
674
675 if (indexAccessor != null) {
676 return indexAccessor;
677 }
678
679 synchronized (this) {
680 indexAccessor = _indexAccessors.get(companyId);
681
682 if (indexAccessor == null) {
683 indexAccessor = new IndexAccessorImpl(companyId);
684
685 if (isLoadIndexFromClusterEnabled()) {
686 InputStream inputStream = null;
687
688 try {
689 Address bootupAddress = selectBootupClusterAddress(
690 companyId, IndexAccessor.DEFAULT_LAST_GENERATION);
691
692 if (bootupAddress != null) {
693 inputStream = getLoadIndexesInputStreamFromCluster(
694 companyId, bootupAddress);
695
696 indexAccessor.loadIndex(inputStream);
697 }
698
699 indexAccessor.enableDumpIndex();
700 }
701 catch (Exception e) {
702 _log.error(
703 "Unable to load index for company " +
704 indexAccessor.getCompanyId(),
705 e);
706 }
707 finally {
708 if (inputStream != null) {
709 try {
710 inputStream.close();
711 }
712 catch (IOException ioe) {
713 _log.error(
714 "Unable to close input stream for " +
715 "company " +
716 indexAccessor.getCompanyId(),
717 ioe);
718 }
719 }
720 }
721 }
722
723 _indexAccessors.put(companyId, indexAccessor);
724 }
725 }
726
727 return indexAccessor;
728 }
729
730 private void _includeIfUnique(
731 BooleanQuery booleanQuery, Query query, BooleanClause.Occur occur,
732 boolean like) {
733
734 if (query instanceof TermQuery) {
735 Set<Term> terms = new HashSet<Term>();
736
737 query.extractTerms(terms);
738
739 for (Term term : terms) {
740 String termValue = term.text();
741
742 if (like) {
743 term = term.createTerm(
744 StringPool.STAR.concat(termValue).concat(
745 StringPool.STAR));
746
747 query = new WildcardQuery(term);
748 }
749 else {
750 query = new TermQuery(term);
751 }
752
753 boolean included = false;
754
755 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
756 if (query.equals(booleanClause.getQuery())) {
757 included = true;
758 }
759 }
760
761 if (!included) {
762 booleanQuery.add(query, occur);
763 }
764 }
765 }
766 else if (query instanceof BooleanQuery) {
767 BooleanQuery curBooleanQuery = (BooleanQuery)query;
768
769 for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
770 _includeIfUnique(
771 booleanQuery, booleanClause.getQuery(),
772 booleanClause.getOccur(), like);
773 }
774 }
775 else {
776 boolean included = false;
777
778 for (BooleanClause booleanClause : booleanQuery.getClauses()) {
779 if (query.equals(booleanClause.getQuery())) {
780 included = true;
781 }
782 }
783
784 if (!included) {
785 booleanQuery.add(query, occur);
786 }
787 }
788 }
789
790 private static final long _BOOTUP_CLUSTER_NODE_RESPONSE_TIMEOUT = 10000;
791
792 private static final long _TRANSIENT_TOKEN_KEEP_ALIVE_TIME = 10000;
793
794 private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
795
796 private static MethodKey _createTokenMethodKey =
797 new MethodKey(TransientTokenUtil.class.getName(), "createToken",
798 long.class);
799 private static MethodKey _getLastGenerationMethodKey =
800 new MethodKey(LuceneHelperUtil.class.getName(), "getLastGeneration",
801 long.class);
802
803 private Analyzer _analyzer;
804 private Map<Long, IndexAccessor> _indexAccessors =
805 new ConcurrentHashMap<Long, IndexAccessor>();
806 private LoadIndexClusterEventListener _loadIndexClusterEventListener;
807 private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
808 private Version _version;
809
810 private class LoadIndexClusterEventListener
811 implements ClusterEventListener {
812
813 public void processClusterEvent(ClusterEvent clusterEvent) {
814 ClusterEventType clusterEventType =
815 clusterEvent.getClusterEventType();
816
817 if (!clusterEventType.equals(ClusterEventType.JOIN)) {
818 return;
819 }
820
821 List<Address> clusterNodeAddresses =
822 ClusterExecutorUtil.getClusterNodeAddresses();
823 List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
824
825 if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
826 if (_log.isDebugEnabled()) {
827 _log.debug(
828 "Number of original cluster members is greater than " +
829 "one");
830 }
831
832 return;
833 }
834
835 long[] companyIds = PortalInstances.getCompanyIds();
836
837 for (long companyId : companyIds) {
838 loadIndexes(companyId);
839 }
840
841 loadIndexes(CompanyConstants.SYSTEM);
842 }
843
844 private void loadIndexes(long companyId) {
845 long lastGeneration = getLastGeneration(companyId);
846
847 if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
848 return;
849 }
850
851 try {
852 LuceneClusterUtil.loadIndexesFromCluster(companyId);
853 }
854 catch (Exception e) {
855 _log.error(
856 "Unable to load indexes for company " + companyId, e);
857 }
858 }
859
860 }
861
862 }