2 * This file contains code from Florent Guillame's nuxeo-reindex-fulltext module.
6 package org.collectionspace.services.batch.nuxeo;
9 import java.io.Serializable;
10 import java.lang.reflect.Field;
11 import java.net.URISyntaxException;
13 import java.util.ArrayList;
14 import java.util.Arrays;
15 import java.util.Collections;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.LinkedHashSet;
19 import java.util.List;
23 import org.apache.commons.lang.StringUtils;
24 import org.collectionspace.services.batch.BatchCommon;
25 import org.collectionspace.services.common.CollectionSpaceResource;
26 import org.collectionspace.services.common.NuxeoBasedResource;
27 import org.collectionspace.services.common.StoredValuesUriTemplate;
28 import org.collectionspace.services.common.UriTemplateFactory;
29 import org.collectionspace.services.common.UriTemplateRegistryKey;
30 import org.collectionspace.services.common.invocable.InvocationContext.ListCSIDs;
31 import org.collectionspace.services.common.invocable.InvocationContext.Params.Param;
32 import org.collectionspace.services.common.invocable.InvocationResults;
33 import org.collectionspace.services.common.vocabulary.AuthorityResource;
35 import org.dom4j.DocumentException;
36 import org.nuxeo.ecm.core.api.AbstractSession;
37 import org.nuxeo.ecm.core.api.CoreSession;
38 import org.nuxeo.ecm.core.api.IterableQueryResult;
39 import org.nuxeo.ecm.core.api.NuxeoException;
40 import org.nuxeo.ecm.core.event.EventService;
41 import org.nuxeo.ecm.core.query.QueryFilter;
42 import org.nuxeo.ecm.core.query.sql.NXQL;
43 import org.nuxeo.ecm.core.storage.FulltextConfiguration;
44 import org.nuxeo.ecm.core.storage.sql.Model;
45 import org.nuxeo.ecm.core.storage.sql.Node;
46 import org.nuxeo.ecm.core.storage.sql.Session;
47 import org.nuxeo.ecm.core.storage.sql.SimpleProperty;
48 import org.nuxeo.ecm.core.storage.sql.coremodel.SQLFulltextExtractorWork;
49 import org.nuxeo.ecm.core.storage.sql.coremodel.SQLSession;
50 import org.nuxeo.ecm.core.work.api.Work;
51 import org.nuxeo.ecm.core.work.api.WorkManager;
52 import org.nuxeo.ecm.core.work.api.WorkManager.Scheduling;
53 import org.nuxeo.runtime.api.Framework;
54 import org.nuxeo.runtime.transaction.TransactionHelper;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
58 public class ReindexFullTextBatchJob extends AbstractBatchJob {
59 final Logger log = LoggerFactory.getLogger(ReindexFullTextBatchJob.class);
61 public static final String DC_TITLE = "dc:title";
62 public static final int DEFAULT_BATCH_SIZE = 1000;
63 public static final int DEFAULT_START_BATCH = 0;
64 public static final int DEFAULT_END_BATCH = 0;
65 public static final int DEFAULT_BATCH_PAUSE = 0;
66 public static final String BATCH_STOP_FILE = "stopBatch";
67 public static final String DOCTYPE_STOP_FILE = "stopDocType";
69 private int batchSize = DEFAULT_BATCH_SIZE;
70 private int batchPause = DEFAULT_BATCH_PAUSE;
71 private int startBatch = DEFAULT_START_BATCH;
72 private int endBatch = DEFAULT_END_BATCH;
73 private int numAffected = 0;
75 private String stopFileDirectory;
77 private CoreSession coreSession;
78 private Session session = null;
79 protected FulltextConfiguration fulltextConfiguration;
81 private Map<String, NuxeoBasedResource> resourcesByDocType;
83 public ReindexFullTextBatchJob() {
84 setSupportedInvocationModes(Arrays.asList(INVOCATION_MODE_NO_CONTEXT, INVOCATION_MODE_SINGLE, INVOCATION_MODE_LIST));
86 stopFileDirectory = System.getProperty("java.io.tmpdir") + File.separator + ReindexFullTextBatchJob.class.getName();
88 log.debug("stop file directory is " + stopFileDirectory);
92 // Since the ReindexFullTextBatchJob class deals with transactions differently than other batch jobs, we need to
93 // override this method to ensure there is an active transaction.
96 protected List<String> getVocabularyCsids(AuthorityResource<?, ?> resource) throws URISyntaxException {
98 if (TransactionHelper.isTransactionActive() == false) {
99 tx = TransactionHelper.startTransaction();
103 return super.getVocabularyCsids(resource);
106 TransactionHelper.commitOrRollbackTransaction();
112 // Since the ReindexFullTextBatchJob class deals with transactions differently than other batch jobs, we need to
113 // override this method to ensure there is an active transaction.
116 protected List<String> findAll(NuxeoBasedResource resource, int pageSize, int pageNum, String sortBy)
117 throws URISyntaxException, DocumentException {
119 if (TransactionHelper.isTransactionActive() == false) {
120 tx = TransactionHelper.startTransaction();
124 return super.findAll(resource, pageSize, pageNum, sortBy);
127 TransactionHelper.commitOrRollbackTransaction();
133 // Since the ReindexFullTextBatchJob class deals with transactions differently than other batch jobs, we need to
134 // override this method to ensure there is an active transaction.
137 protected List<String> findAllAuthorityItems(AuthorityResource<?, ?> resource, String vocabularyCsid, int pageSize, int pageNum, String sortBy)
138 throws URISyntaxException, Exception {
140 if (TransactionHelper.isTransactionActive() == false) {
141 tx = TransactionHelper.startTransaction();
145 return super.findAllAuthorityItems(resource, vocabularyCsid, pageSize, pageNum, sortBy);
148 TransactionHelper.commitOrRollbackTransaction();
160 public void run(BatchCommon batchCommon) {
161 setCompletionStatus(STATUS_MIN_PROGRESS);
165 boolean isTransactionActive = TransactionHelper.isTransactionActive();
167 // Commit and close the transaction that was started by the standard request lifecycle.
169 if (isTransactionActive) {
170 TransactionHelper.commitOrRollbackTransaction();
174 coreSession = getRepoSession().getCoreSession();
176 if (requestIsForInvocationModeSingle()) {
177 String csid = getInvocationContext().getSingleCSID();
180 throw new Exception("No singleCSID was supplied in invocation context.");
183 String docType = getInvocationContext().getDocType();
185 if (StringUtils.isEmpty(docType)) {
186 throw new Exception("No docType was supplied in invocation context.");
189 log.debug("Reindexing " + docType + " record with csid: " + csid);
191 reindexDocument(docType, csid);
193 else if (requestIsForInvocationModeList()) {
194 ListCSIDs list = getInvocationContext().getListCSIDs();
195 List<String> csids = list.getCsid();
197 if (csids == null || csids.size() == 0) {
198 throw new Exception("no listCSIDs were supplied");
201 String docType = getInvocationContext().getDocType();
203 if (StringUtils.isEmpty(docType)) {
204 throw new Exception("No docType was supplied in invocation context.");
207 log.debug("Reindexing " + csids.size() + " " + docType + " records with csids: " + csids.get(0) + ", ...");
209 if (log.isTraceEnabled()) {
210 log.trace(StringUtils.join(csids, ", "));
213 reindexDocuments(docType, csids);
215 else if (requestIsForInvocationModeNoContext()) {
216 Set<String> docTypes = new LinkedHashSet<String>();
219 docType = getInvocationContext().getDocType();
221 if (StringUtils.isNotEmpty(docType)) {
222 docTypes.add(docType);
225 // Read batch size, start and end batches, pause, and additional doctypes from params.
227 for (Param param : this.getParams()) {
228 if (param.getKey().equals("batchSize")) {
229 batchSize = Integer.parseInt(param.getValue());
231 else if (param.getKey().equals("startBatch")) {
232 startBatch = Integer.parseInt(param.getValue());
234 else if (param.getKey().equals("endBatch")) {
235 endBatch = Integer.parseInt(param.getValue());
237 else if (param.getKey().equals("batchPause")) {
238 batchPause = Integer.parseInt(param.getValue());
240 else if (param.getKey().equals("docType")) {
241 docType = param.getValue();
243 if (StringUtils.isNotEmpty(docType)) {
244 docTypes.add(docType);
250 // If docTypes is empty, we should use the <forDocTypes> list from the resource/payload
252 if (docTypes.isEmpty() == true && batchCommon != null) {
253 List<String> payloadDocTypes = batchCommon.getForDocTypes().getForDocType();
254 if (payloadDocTypes != null && !payloadDocTypes.isEmpty()) {
255 docTypes = convertListToSet(payloadDocTypes);
260 reindexDocuments(docTypes);
263 log.debug("reindexing complete");
265 InvocationResults results = new InvocationResults();
266 results.setNumAffected(numAffected);
267 results.setUserNote("reindexed " + numAffected + " records");
270 setCompletionStatus(STATUS_COMPLETE);
272 catch(StoppedException e) {
273 log.debug("reindexing terminated by stop file");
275 InvocationResults results = new InvocationResults();
276 results.setNumAffected(numAffected);
277 results.setUserNote("reindexing terminated by stop file");
280 setCompletionStatus(STATUS_COMPLETE);
283 setErrorResult(e.getMessage());
286 // Start a new transaction so the standard request lifecycle can complete.
288 if (isTransactionActive) {
289 TransactionHelper.startTransaction();
294 private void initResourceMap() {
295 resourcesByDocType = new HashMap<String, NuxeoBasedResource>();
297 for (CollectionSpaceResource<?, ?> resource : getResourceMap().values()) {
298 Map<UriTemplateRegistryKey, StoredValuesUriTemplate> entries = resource.getUriRegistryEntries();
300 for (UriTemplateRegistryKey key : entries.keySet()) {
301 String docType = key.getDocType();
302 String tenantId = key.getTenantId();
304 if (getTenantId().equals(tenantId)) {
305 if (resourcesByDocType.containsKey(docType)) {
306 log.warn("multiple resources found for docType " + docType);
308 NuxeoBasedResource currentResource = resourcesByDocType.get(docType);
309 NuxeoBasedResource candidateResource = (NuxeoBasedResource) resource;
311 // Favor the resource that isn't an AuthorityResource. This
312 // is really just to deal with Contacts, which are handled
313 // by ContactResource, PersonAuthorityResource, and
314 // OrgAuthorityResource. We want to use ContactResource.
316 if (!(candidateResource instanceof AuthorityResource) && (currentResource instanceof AuthorityResource)) {
317 resourcesByDocType.put(docType, candidateResource);
320 log.warn("using " + resourcesByDocType.get(docType));
323 resourcesByDocType.put(docType, (NuxeoBasedResource) resource);
330 private void reindexDocuments(Set<String> docTypes) throws Exception {
331 if (docTypes == null) {
332 docTypes = new LinkedHashSet<String>();
335 // If no types are specified, do them all.
337 if (docTypes.size() == 0) {
338 docTypes.addAll(getAllDocTypes());
341 for (String docType : docTypes) {
342 reindexDocuments(docType);
346 private List<String> getAllDocTypes() {
347 List<String> docTypes = new ArrayList<String>(resourcesByDocType.keySet());
348 Collections.sort(docTypes);
350 log.debug("Call to getAllDocTypes() method found: " + StringUtils.join(docTypes, ", "));
355 private void reindexDocuments(String docType) throws Exception {
356 // Check for a stop file before reindexing the docType.
358 if (batchStopFileExists() || docTypeStopFileExists()) {
359 throw new StoppedException();
362 log.debug("reindexing docType " + docType);
364 NuxeoBasedResource resource = resourcesByDocType.get(docType);
366 if (resource == null) {
367 log.warn("No service resource found for docType " + docType);
370 boolean isAuthorityItem = false;
372 if (resource instanceof AuthorityResource) {
373 UriTemplateRegistryKey key = new UriTemplateRegistryKey(getTenantId(), docType);
374 StoredValuesUriTemplate uriTemplate = resource.getUriRegistryEntries().get(key);
376 log.debug("uriTemplateType=" + uriTemplate.getUriTemplateType());
378 if (uriTemplate.getUriTemplateType() == UriTemplateFactory.ITEM) {
379 isAuthorityItem = true;
383 int pageSize = batchSize;
385 // The supplied start and end batch numbers start with 1, but the page number starts with 0.
386 int startPage = (startBatch > 0) ? startBatch - 1 : 0;
387 int endPage = (endBatch > 0) ? endBatch - 1 : Integer.MAX_VALUE;
389 if (isAuthorityItem) {
390 List<String> vocabularyCsids = getVocabularyCsids((AuthorityResource<?, ?>) resource);
392 for (String vocabularyCsid : vocabularyCsids) {
393 int pageNum = startPage;
394 List<String> csids = null;
396 log.debug("Reindexing vocabulary of " + docType + " with csid " + vocabularyCsid);
399 // Check for a stop file before reindexing the batch.
401 if (batchStopFileExists()) {
402 throw new StoppedException();
405 csids = findAllAuthorityItems((AuthorityResource<?, ?>) resource, vocabularyCsid, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name");
407 if (csids.size() > 0) {
408 log.debug("reindexing vocabulary of " + docType +" with csid " + vocabularyCsid + ", batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0));
410 // Pause for the configured amount of time.
412 if (batchPause > 0) {
413 log.trace("pausing " + batchPause + " ms");
415 Thread.sleep(batchPause);
418 reindexDocuments(docType, csids);
423 while(csids.size() == pageSize && pageNum <= endPage);
426 int pageNum = startPage;
427 List<String> csids = null;
430 // Check for a stop file before reindexing the batch.
432 if (batchStopFileExists()) {
433 throw new StoppedException();
436 csids = findAll(resource, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name");
438 if (csids.size() > 0) {
439 log.debug("reindexing " + docType +" batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0));
441 // Pause for the configured amount of time.
443 if (batchPause > 0) {
444 log.trace("pausing " + batchPause + " ms");
446 Thread.sleep(batchPause);
449 reindexDocuments(docType, csids);
454 while(csids.size() == pageSize && pageNum <= endPage);
458 private void reindexDocument(String docType, String csid) throws Exception {
459 reindexDocuments(docType, Arrays.asList(csid));
462 private void reindexDocuments(String docType, List<String> csids) throws Exception {
463 // Convert the csids to structs of nuxeo id and type, as expected
466 if (csids == null || csids.size() == 0) {
470 // Transaction for the batch
471 boolean tx = TransactionHelper.startTransaction();
473 getLowLevelSession();
475 List<Info> infos = new ArrayList<Info>();
477 String query = "SELECT ecm:uuid, ecm:primaryType FROM Document " +
478 "WHERE ecm:name IN (" + StringUtils.join(quoteList(csids), ',') + ") " +
479 "AND ecm:primaryType LIKE '" + docType + "%' " +
480 "AND ecm:isCheckedInVersion = 0 AND ecm:isProxy = 0";
481 IterableQueryResult result = session.queryAndFetch(query, NXQL.NXQL, QueryFilter.EMPTY);
484 for (Map<String, Serializable> map : result) {
485 String id = (String) map.get(NXQL.ECM_UUID);
486 String type = (String) map.get(NXQL.ECM_PRIMARYTYPE);
487 infos.add(new Info(id, type));
493 if (csids.size() != infos.size()) {
494 log.warn("didn't find info for all the supplied csids: expected " + csids.size() + ", found " + infos.size());
497 if (log.isTraceEnabled()) {
498 for (Info info : infos) {
499 log.trace(info.type + " " + info.id);
503 numAffected += infos.size();
505 // Below code copied from the doBatch function.
509 List<Serializable> ids = new ArrayList<Serializable>(infos.size());
510 Set<String> asyncIds = new HashSet<String>();
511 Model model = session.getModel();
512 for (Info info : infos) {
514 if (fulltextConfiguration.isFulltextIndexable(info.type)) {
515 asyncIds.add(model.idToString(info.id));
520 runSyncBatch(ids, asyncIds);
525 TransactionHelper.setTransactionRollbackOnly();
526 log.error("Rolling back sync");
528 TransactionHelper.commitOrRollbackTransaction();
532 runAsyncBatch(asyncIds);
534 // wait for async completion after transaction commit
535 Framework.getLocalService(EventService.class).waitForAsyncCompletion();
538 private List<String> quoteList(List<String> values) {
539 List<String> quoted = new ArrayList<String>(values.size());
541 for (String value : values) {
542 quoted.add("'" + value + "'");
548 private boolean batchStopFileExists() {
549 return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + BATCH_STOP_FILE).isFile());
552 private boolean docTypeStopFileExists() {
553 return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + DOCTYPE_STOP_FILE).isFile());
556 private static class StoppedException extends Exception {
557 private static final long serialVersionUID = 8813189331855935939L;
559 public StoppedException() {
565 * The code below this comment is copied from the nuxeo-reindex-fulltext
566 * module. The original copyright is below.
570 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors.
572 * All rights reserved. This program and the accompanying materials
573 * are made available under the terms of the GNU Lesser General Public License
574 * (LGPL) version 2.1 which accompanies this distribution, and is available at
575 * http://www.gnu.org/licenses/lgpl.html
577 * This library is distributed in the hope that it will be useful,
578 * but WITHOUT ANY WARRANTY; without even the implied warranty of
579 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
580 * Lesser General Public License for more details.
586 protected static class Info {
587 public final Serializable id;
589 public final String type;
591 public Info(Serializable id, String type) {
598 * This has to be called once the transaction has been started.
600 protected void getLowLevelSession() {
602 SQLSession s = (SQLSession) ((AbstractSession) coreSession).getSession();
603 Field f2 = SQLSession.class.getDeclaredField("session");
604 f2.setAccessible(true);
605 session = (Session) f2.get(s);
606 fulltextConfiguration = session.getModel().getFulltextConfiguration();
607 } catch (ReflectiveOperationException e) {
608 throw new NuxeoException(e);
612 protected void doBatch(List<Info> infos) {
616 // transaction for the sync batch
617 tx = TransactionHelper.startTransaction();
619 getLowLevelSession(); // for fulltextInfo
620 List<Serializable> ids = new ArrayList<Serializable>(infos.size());
621 Set<String> asyncIds = new HashSet<String>();
622 Model model = session.getModel();
623 for (Info info : infos) {
625 if (fulltextConfiguration.isFulltextIndexable(info.type)) {
626 asyncIds.add(model.idToString(info.id));
631 runSyncBatch(ids, asyncIds);
636 TransactionHelper.setTransactionRollbackOnly();
637 log.error("Rolling back sync");
639 TransactionHelper.commitOrRollbackTransaction();
643 runAsyncBatch(asyncIds);
645 // wait for async completion after transaction commit
646 Framework.getLocalService(EventService.class).waitForAsyncCompletion();
650 * Do this at the low-level session level because we may have to modify
651 * things like versions which aren't usually modifiable, and it's also good
652 * to bypass all listeners.
654 protected void runSyncBatch(List<Serializable> ids, Set<String> asyncIds) {
655 getLowLevelSession();
657 session.getNodesByIds(ids); // batch fetch
659 Map<Serializable, String> titles = new HashMap<Serializable, String>();
660 for (Serializable id : ids) {
661 Node node = session.getNodeById(id);
662 if (asyncIds.contains(id)) {
663 node.setSimpleProperty(Model.FULLTEXT_JOBID_PROP, id);
667 prop = node.getSimpleProperty(DC_TITLE);
668 } catch (IllegalArgumentException e) {
671 String title = (String) prop.getValue();
672 titles.put(id, title);
673 prop.setValue(title + " ");
677 for (Serializable id : ids) {
678 Node node = session.getNodeById(id);
681 prop = node.getSimpleProperty(DC_TITLE);
682 } catch (IllegalArgumentException e) {
685 prop.setValue(titles.get(id));
690 protected void runAsyncBatch(Set<String> asyncIds)
692 if (asyncIds.isEmpty()) {
695 String repositoryName = coreSession.getRepositoryName();
696 WorkManager workManager = Framework.getLocalService(WorkManager.class);
697 for (String id : asyncIds) {
698 Work work = new SQLFulltextExtractorWork(repositoryName, id);
699 // schedule immediately, we're outside a transaction
700 workManager.schedule(work, Scheduling.IF_NOT_SCHEDULED, false);