2 * This file contains code from Florent Guillame's nuxeo-reindex-fulltext module.
6 package org.collectionspace.services.batch.nuxeo;
9 import java.io.Serializable;
10 import java.lang.reflect.Field;
11 import java.security.Principal;
12 import java.util.ArrayList;
13 import java.util.Arrays;
14 import java.util.Collections;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.LinkedHashSet;
18 import java.util.List;
22 import org.apache.commons.lang.StringUtils;
23 import org.collectionspace.services.common.CollectionSpaceResource;
24 import org.collectionspace.services.common.NuxeoBasedResource;
25 import org.collectionspace.services.common.StoredValuesUriTemplate;
26 import org.collectionspace.services.common.UriTemplateFactory;
27 import org.collectionspace.services.common.UriTemplateRegistryKey;
28 import org.collectionspace.services.common.invocable.InvocationContext.ListCSIDs;
29 import org.collectionspace.services.common.invocable.InvocationContext.Params.Param;
30 import org.collectionspace.services.common.invocable.InvocationResults;
31 import org.collectionspace.services.common.vocabulary.AuthorityResource;
32 import org.nuxeo.ecm.core.api.AbstractSession;
33 import org.nuxeo.ecm.core.api.CoreSession;
34 import org.nuxeo.ecm.core.api.IterableQueryResult;
35 import org.nuxeo.ecm.core.api.NuxeoException;
36 import org.nuxeo.ecm.core.api.NuxeoPrincipal;
37 import org.nuxeo.ecm.core.event.EventService;
38 import org.nuxeo.ecm.core.query.QueryFilter;
39 import org.nuxeo.ecm.core.query.sql.NXQL;
40 import org.nuxeo.ecm.core.storage.FulltextConfiguration;
41 import org.nuxeo.ecm.core.storage.sql.Model;
42 import org.nuxeo.ecm.core.storage.sql.Node;
43 import org.nuxeo.ecm.core.storage.sql.Session;
44 import org.nuxeo.ecm.core.storage.sql.SimpleProperty;
45 import org.nuxeo.ecm.core.storage.sql.coremodel.SQLFulltextExtractorWork;
46 import org.nuxeo.ecm.core.storage.sql.coremodel.SQLSession;
47 import org.nuxeo.ecm.core.work.api.Work;
48 import org.nuxeo.ecm.core.work.api.WorkManager;
49 import org.nuxeo.ecm.core.work.api.WorkManager.Scheduling;
50 import org.nuxeo.runtime.api.Framework;
51 import org.nuxeo.runtime.transaction.TransactionHelper;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 public class ReindexFullTextBatchJob extends AbstractBatchJob {
56 final Logger log = LoggerFactory.getLogger(ReindexFullTextBatchJob.class);
58 public static final String DC_TITLE = "dc:title";
59 public static final int DEFAULT_BATCH_SIZE = 1000;
60 public static final int DEFAULT_START_BATCH = 0;
61 public static final int DEFAULT_END_BATCH = 0;
62 public static final int DEFAULT_BATCH_PAUSE = 0;
63 public static final String BATCH_STOP_FILE = "stopBatch";
64 public static final String DOCTYPE_STOP_FILE = "stopDocType";
66 private int batchSize = DEFAULT_BATCH_SIZE;
67 private int batchPause = DEFAULT_BATCH_PAUSE;
68 private int startBatch = DEFAULT_START_BATCH;
69 private int endBatch = DEFAULT_END_BATCH;
70 private int numAffected = 0;
72 private String stopFileDirectory;
74 private CoreSession coreSession;
75 private Session session = null;
76 protected FulltextConfiguration fulltextConfiguration;
78 private Map<String, NuxeoBasedResource> resourcesByDocType;
80 public ReindexFullTextBatchJob() {
81 setSupportedInvocationModes(Arrays.asList(INVOCATION_MODE_NO_CONTEXT, INVOCATION_MODE_SINGLE, INVOCATION_MODE_LIST));
83 stopFileDirectory = System.getProperty("java.io.tmpdir") + File.separator + ReindexFullTextBatchJob.class.getName();
85 log.debug("stop file directory is " + stopFileDirectory);
90 setCompletionStatus(STATUS_MIN_PROGRESS);
94 boolean isTransactionActive = TransactionHelper.isTransactionActive();
96 // Commit and close the transaction that was started by the standard request lifecycle.
98 if (isTransactionActive) {
99 TransactionHelper.commitOrRollbackTransaction();
103 coreSession = getRepoSession().getCoreSession();
105 if (requestIsForInvocationModeSingle()) {
106 String csid = getInvocationContext().getSingleCSID();
109 throw new Exception("No singleCSID was supplied in invocation context.");
112 String docType = getInvocationContext().getDocType();
114 if (StringUtils.isEmpty(docType)) {
115 throw new Exception("No docType was supplied in invocation context.");
118 log.debug("Reindexing " + docType + " record with csid: " + csid);
120 reindexDocument(docType, csid);
122 else if (requestIsForInvocationModeList()) {
123 ListCSIDs list = getInvocationContext().getListCSIDs();
124 List<String> csids = list.getCsid();
126 if (csids == null || csids.size() == 0) {
127 throw new Exception("no listCSIDs were supplied");
130 String docType = getInvocationContext().getDocType();
132 if (StringUtils.isEmpty(docType)) {
133 throw new Exception("No docType was supplied in invocation context.");
136 log.debug("Reindexing " + csids.size() + " " + docType + " records with csids: " + csids.get(0) + ", ...");
138 if (log.isTraceEnabled()) {
139 log.trace(StringUtils.join(csids, ", "));
142 reindexDocuments(docType, csids);
144 else if (requestIsForInvocationModeNoContext()) {
145 Set<String> docTypes = new LinkedHashSet<String>();
148 docType = getInvocationContext().getDocType();
150 if (StringUtils.isNotEmpty(docType)) {
151 docTypes.add(docType);
154 // Read batch size, start and end batches, pause, and additional doctypes from params.
156 for (Param param : this.getParams()) {
157 if (param.getKey().equals("batchSize")) {
158 batchSize = Integer.parseInt(param.getValue());
160 else if (param.getKey().equals("startBatch")) {
161 startBatch = Integer.parseInt(param.getValue());
163 else if (param.getKey().equals("endBatch")) {
164 endBatch = Integer.parseInt(param.getValue());
166 else if (param.getKey().equals("batchPause")) {
167 batchPause = Integer.parseInt(param.getValue());
169 else if (param.getKey().equals("docType")) {
170 docType = param.getValue();
172 if (StringUtils.isNotEmpty(docType)) {
173 docTypes.add(docType);
179 reindexDocuments(docTypes);
182 log.debug("reindexing complete");
184 InvocationResults results = new InvocationResults();
185 results.setNumAffected(numAffected);
186 results.setUserNote("reindexed " + numAffected + " records");
189 setCompletionStatus(STATUS_COMPLETE);
191 catch(StoppedException e) {
192 log.debug("reindexing terminated by stop file");
194 InvocationResults results = new InvocationResults();
195 results.setNumAffected(numAffected);
196 results.setUserNote("reindexing terminated by stop file");
199 setCompletionStatus(STATUS_COMPLETE);
202 setErrorResult(e.getMessage());
205 // Start a new transaction so the standard request lifecycle can complete.
207 if (isTransactionActive) {
208 TransactionHelper.startTransaction();
213 private void initResourceMap() {
214 resourcesByDocType = new HashMap<String, NuxeoBasedResource>();
216 for (CollectionSpaceResource<?, ?> resource : getResourceMap().values()) {
217 Map<UriTemplateRegistryKey, StoredValuesUriTemplate> entries = resource.getUriRegistryEntries();
219 for (UriTemplateRegistryKey key : entries.keySet()) {
220 String docType = key.getDocType();
221 String tenantId = key.getTenantId();
223 if (getTenantId().equals(tenantId)) {
224 if (resourcesByDocType.containsKey(docType)) {
225 log.warn("multiple resources found for docType " + docType);
227 NuxeoBasedResource currentResource = resourcesByDocType.get(docType);
228 NuxeoBasedResource candidateResource = (NuxeoBasedResource) resource;
230 // Favor the resource that isn't an AuthorityResource. This
231 // is really just to deal with Contacts, which are handled
232 // by ContactResource, PersonAuthorityResource, and
233 // OrgAuthorityResource. We want to use ContactResource.
235 if (!(candidateResource instanceof AuthorityResource) && (currentResource instanceof AuthorityResource)) {
236 resourcesByDocType.put(docType, candidateResource);
239 log.warn("using " + resourcesByDocType.get(docType));
242 resourcesByDocType.put(docType, (NuxeoBasedResource) resource);
249 private void reindexDocuments(Set<String> docTypes) throws Exception {
250 if (docTypes == null) {
251 docTypes = new LinkedHashSet<String>();
254 // If no types are specified, do them all.
256 if (docTypes.size() == 0) {
257 docTypes.addAll(getAllDocTypes());
260 for (String docType : docTypes) {
261 reindexDocuments(docType);
265 private List<String> getAllDocTypes() {
266 List<String> docTypes = new ArrayList<String>(resourcesByDocType.keySet());
267 Collections.sort(docTypes);
269 log.debug("Call to getAllDocTypes() method found: " + StringUtils.join(docTypes, ", "));
274 private void reindexDocuments(String docType) throws Exception {
275 // Check for a stop file before reindexing the docType.
277 if (batchStopFileExists() || docTypeStopFileExists()) {
278 throw new StoppedException();
281 log.debug("reindexing docType " + docType);
283 NuxeoBasedResource resource = resourcesByDocType.get(docType);
285 if (resource == null) {
286 log.warn("No service resource found for docType " + docType);
289 boolean isAuthorityItem = false;
291 if (resource instanceof AuthorityResource) {
292 UriTemplateRegistryKey key = new UriTemplateRegistryKey(getTenantId(), docType);
293 StoredValuesUriTemplate uriTemplate = resource.getUriRegistryEntries().get(key);
295 log.debug("uriTemplateType=" + uriTemplate.getUriTemplateType());
297 if (uriTemplate.getUriTemplateType() == UriTemplateFactory.ITEM) {
298 isAuthorityItem = true;
302 int pageSize = batchSize;
304 // The supplied start and end batch numbers start with 1, but the page number starts with 0.
305 int startPage = (startBatch > 0) ? startBatch - 1 : 0;
306 int endPage = (endBatch > 0) ? endBatch - 1 : Integer.MAX_VALUE;
308 if (isAuthorityItem) {
309 List<String> vocabularyCsids = getVocabularyCsids((AuthorityResource<?, ?>) resource);
311 for (String vocabularyCsid : vocabularyCsids) {
312 int pageNum = startPage;
313 List<String> csids = null;
315 log.debug("Reindexing vocabulary of " + docType + " with csid " + vocabularyCsid);
318 // Check for a stop file before reindexing the batch.
320 if (batchStopFileExists()) {
321 throw new StoppedException();
324 csids = findAllAuthorityItems((AuthorityResource<?, ?>) resource, vocabularyCsid, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name");
326 if (csids.size() > 0) {
327 log.debug("reindexing vocabulary of " + docType +" with csid " + vocabularyCsid + ", batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0));
329 // Pause for the configured amount of time.
331 if (batchPause > 0) {
332 log.trace("pausing " + batchPause + " ms");
334 Thread.sleep(batchPause);
337 reindexDocuments(docType, csids);
342 while(csids.size() == pageSize && pageNum <= endPage);
345 int pageNum = startPage;
346 List<String> csids = null;
349 // Check for a stop file before reindexing the batch.
351 if (batchStopFileExists()) {
352 throw new StoppedException();
355 csids = findAll(resource, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name");
357 if (csids.size() > 0) {
358 log.debug("reindexing " + docType +" batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0));
360 // Pause for the configured amount of time.
362 if (batchPause > 0) {
363 log.trace("pausing " + batchPause + " ms");
365 Thread.sleep(batchPause);
368 reindexDocuments(docType, csids);
373 while(csids.size() == pageSize && pageNum <= endPage);
377 private void reindexDocument(String docType, String csid) throws Exception {
378 reindexDocuments(docType, Arrays.asList(csid));
381 private void reindexDocuments(String docType, List<String> csids) throws Exception {
382 // Convert the csids to structs of nuxeo id and type, as expected
385 if (csids == null || csids.size() == 0) {
389 // Transaction for the batch
390 boolean tx = TransactionHelper.startTransaction();
392 getLowLevelSession();
394 List<Info> infos = new ArrayList<Info>();
396 String query = "SELECT ecm:uuid, ecm:primaryType FROM Document " +
397 "WHERE ecm:name IN (" + StringUtils.join(quoteList(csids), ',') + ") " +
398 "AND ecm:primaryType LIKE '" + docType + "%' " +
399 "AND ecm:isCheckedInVersion = 0 AND ecm:isProxy = 0";
400 IterableQueryResult result = session.queryAndFetch(query, NXQL.NXQL, QueryFilter.EMPTY);
403 for (Map<String, Serializable> map : result) {
404 String id = (String) map.get(NXQL.ECM_UUID);
405 String type = (String) map.get(NXQL.ECM_PRIMARYTYPE);
406 infos.add(new Info(id, type));
412 if (csids.size() != infos.size()) {
413 log.warn("didn't find info for all the supplied csids: expected " + csids.size() + ", found " + infos.size());
416 if (log.isTraceEnabled()) {
417 for (Info info : infos) {
418 log.trace(info.type + " " + info.id);
422 numAffected += infos.size();
424 // Below code copied from the doBatch function.
428 List<Serializable> ids = new ArrayList<Serializable>(infos.size());
429 Set<String> asyncIds = new HashSet<String>();
430 Model model = session.getModel();
431 for (Info info : infos) {
433 if (fulltextConfiguration.isFulltextIndexable(info.type)) {
434 asyncIds.add(model.idToString(info.id));
439 runSyncBatch(ids, asyncIds);
444 TransactionHelper.setTransactionRollbackOnly();
445 log.error("Rolling back sync");
447 TransactionHelper.commitOrRollbackTransaction();
451 runAsyncBatch(asyncIds);
453 // wait for async completion after transaction commit
454 Framework.getLocalService(EventService.class).waitForAsyncCompletion();
457 private List<String> quoteList(List<String> values) {
458 List<String> quoted = new ArrayList<String>(values.size());
460 for (String value : values) {
461 quoted.add("'" + value + "'");
467 private boolean batchStopFileExists() {
468 return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + BATCH_STOP_FILE).isFile());
471 private boolean docTypeStopFileExists() {
472 return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + DOCTYPE_STOP_FILE).isFile());
475 private static class StoppedException extends Exception {
476 private static final long serialVersionUID = 8813189331855935939L;
478 public StoppedException() {
484 * The code below this comment is copied from the nuxeo-reindex-fulltext
485 * module. The original copyright is below.
489 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors.
491 * All rights reserved. This program and the accompanying materials
492 * are made available under the terms of the GNU Lesser General Public License
493 * (LGPL) version 2.1 which accompanies this distribution, and is available at
494 * http://www.gnu.org/licenses/lgpl.html
496 * This library is distributed in the hope that it will be useful,
497 * but WITHOUT ANY WARRANTY; without even the implied warranty of
498 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
499 * Lesser General Public License for more details.
505 protected static class Info {
506 public final Serializable id;
508 public final String type;
510 public Info(Serializable id, String type) {
517 * This has to be called once the transaction has been started.
519 protected void getLowLevelSession() {
521 SQLSession s = (SQLSession) ((AbstractSession) coreSession).getSession();
522 Field f2 = SQLSession.class.getDeclaredField("session");
523 f2.setAccessible(true);
524 session = (Session) f2.get(s);
525 fulltextConfiguration = session.getModel().getFulltextConfiguration();
526 } catch (ReflectiveOperationException e) {
527 throw new NuxeoException(e);
531 protected void doBatch(List<Info> infos) {
535 // transaction for the sync batch
536 tx = TransactionHelper.startTransaction();
538 getLowLevelSession(); // for fulltextInfo
539 List<Serializable> ids = new ArrayList<Serializable>(infos.size());
540 Set<String> asyncIds = new HashSet<String>();
541 Model model = session.getModel();
542 for (Info info : infos) {
544 if (fulltextConfiguration.isFulltextIndexable(info.type)) {
545 asyncIds.add(model.idToString(info.id));
550 runSyncBatch(ids, asyncIds);
555 TransactionHelper.setTransactionRollbackOnly();
556 log.error("Rolling back sync");
558 TransactionHelper.commitOrRollbackTransaction();
562 runAsyncBatch(asyncIds);
564 // wait for async completion after transaction commit
565 Framework.getLocalService(EventService.class).waitForAsyncCompletion();
569 * Do this at the low-level session level because we may have to modify
570 * things like versions which aren't usually modifiable, and it's also good
571 * to bypass all listeners.
573 protected void runSyncBatch(List<Serializable> ids, Set<String> asyncIds) {
574 getLowLevelSession();
576 session.getNodesByIds(ids); // batch fetch
578 Map<Serializable, String> titles = new HashMap<Serializable, String>();
579 for (Serializable id : ids) {
580 Node node = session.getNodeById(id);
581 if (asyncIds.contains(id)) {
582 node.setSimpleProperty(Model.FULLTEXT_JOBID_PROP, id);
586 prop = node.getSimpleProperty(DC_TITLE);
587 } catch (IllegalArgumentException e) {
590 String title = (String) prop.getValue();
591 titles.put(id, title);
592 prop.setValue(title + " ");
596 for (Serializable id : ids) {
597 Node node = session.getNodeById(id);
600 prop = node.getSimpleProperty(DC_TITLE);
601 } catch (IllegalArgumentException e) {
604 prop.setValue(titles.get(id));
609 protected void runAsyncBatch(Set<String> asyncIds)
611 if (asyncIds.isEmpty()) {
614 String repositoryName = coreSession.getRepositoryName();
615 WorkManager workManager = Framework.getLocalService(WorkManager.class);
616 for (String id : asyncIds) {
617 Work work = new SQLFulltextExtractorWork(repositoryName, id);
618 // schedule immediately, we're outside a transaction
619 workManager.schedule(work, Scheduling.IF_NOT_SCHEDULED, false);