2 * This file contains code from Florent Guillame's nuxeo-reindex-fulltext module.
6 package org.collectionspace.services.batch.nuxeo;
9 import java.io.Serializable;
10 import java.lang.reflect.Field;
11 import java.security.Principal;
12 import java.util.ArrayList;
13 import java.util.Arrays;
14 import java.util.Collections;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.LinkedHashSet;
18 import java.util.List;
22 import org.apache.commons.lang.StringUtils;
23 import org.collectionspace.services.client.PoxPayloadIn;
24 import org.collectionspace.services.client.PoxPayloadOut;
25 import org.collectionspace.services.common.CollectionSpaceResource;
26 import org.collectionspace.services.common.NuxeoBasedResource;
27 import org.collectionspace.services.common.StoredValuesUriTemplate;
28 import org.collectionspace.services.common.UriTemplateFactory;
29 import org.collectionspace.services.common.UriTemplateRegistryKey;
30 import org.collectionspace.services.common.invocable.InvocationContext.ListCSIDs;
31 import org.collectionspace.services.common.invocable.InvocationContext.Params.Param;
32 import org.collectionspace.services.common.invocable.InvocationResults;
33 import org.collectionspace.services.common.vocabulary.AuthorityResource;
34 import org.collectionspace.services.nuxeo.util.ReindexFulltextRoot.ReindexInfo;
35 import org.nuxeo.ecm.core.api.AbstractSession;
36 import org.nuxeo.ecm.core.api.ClientException;
37 import org.nuxeo.ecm.core.api.CoreSession;
38 import org.nuxeo.ecm.core.api.IterableQueryResult;
39 import org.nuxeo.ecm.core.api.NuxeoException;
40 import org.nuxeo.ecm.core.api.NuxeoPrincipal;
41 import org.nuxeo.ecm.core.event.EventService;
42 import org.nuxeo.ecm.core.query.QueryFilter;
43 import org.nuxeo.ecm.core.query.sql.NXQL;
44 import org.nuxeo.ecm.core.storage.FulltextConfiguration;
45 import org.nuxeo.ecm.core.storage.sql.Model;
46 import org.nuxeo.ecm.core.storage.sql.Node;
47 import org.nuxeo.ecm.core.storage.sql.Session;
48 import org.nuxeo.ecm.core.storage.sql.SimpleProperty;
49 import org.nuxeo.ecm.core.storage.sql.coremodel.SQLFulltextExtractorWork;
50 import org.nuxeo.ecm.core.storage.sql.coremodel.SQLSession;
51 import org.nuxeo.ecm.core.work.api.Work;
52 import org.nuxeo.ecm.core.work.api.WorkManager;
53 import org.nuxeo.ecm.core.work.api.WorkManager.Scheduling;
54 import org.nuxeo.runtime.api.Framework;
55 import org.nuxeo.runtime.transaction.TransactionHelper;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 public class ReindexFullTextBatchJob extends AbstractBatchJob {
60 final Logger log = LoggerFactory.getLogger(ReindexFullTextBatchJob.class);
62 public static final String DC_TITLE = "dc:title";
63 public static final int DEFAULT_BATCH_SIZE = 1000;
64 public static final int DEFAULT_START_BATCH = 0;
65 public static final int DEFAULT_END_BATCH = 0;
66 public static final int DEFAULT_BATCH_PAUSE = 0;
67 public static final String BATCH_STOP_FILE = "stopBatch";
68 public static final String DOCTYPE_STOP_FILE = "stopDocType";
70 private int batchSize = DEFAULT_BATCH_SIZE;
71 private int batchPause = DEFAULT_BATCH_PAUSE;
72 private int startBatch = DEFAULT_START_BATCH;
73 private int endBatch = DEFAULT_END_BATCH;
74 private int numAffected = 0;
76 private String stopFileDirectory;
78 private CoreSession coreSession;
79 private Session session = null;
80 protected FulltextConfiguration fulltextConfiguration;
82 private Map<String, NuxeoBasedResource> resourcesByDocType;
84 public ReindexFullTextBatchJob() {
85 setSupportedInvocationModes(Arrays.asList(INVOCATION_MODE_NO_CONTEXT, INVOCATION_MODE_SINGLE, INVOCATION_MODE_LIST));
87 stopFileDirectory = System.getProperty("java.io.tmpdir") + File.separator + ReindexFullTextBatchJob.class.getName();
89 log.debug("stop file directory is " + stopFileDirectory);
94 setCompletionStatus(STATUS_MIN_PROGRESS);
98 // This is needed so that resource calls (which start transactions)
99 // will work. Otherwise, a javax.transaction.NotSupportedException
100 // ("Nested transactions are not supported") is thrown.
102 boolean isTransactionActive = TransactionHelper.isTransactionActive();
104 if (isTransactionActive) {
105 TransactionHelper.commitOrRollbackTransaction();
109 coreSession = getRepoSession().getCoreSession();
111 if (requestIsForInvocationModeSingle()) {
112 String csid = getInvocationContext().getSingleCSID();
115 throw new Exception("No singleCSID was supplied in invocation context.");
118 String docType = getInvocationContext().getDocType();
120 if (StringUtils.isEmpty(docType)) {
121 throw new Exception("No docType was supplied in invocation context.");
124 log.debug("Reindexing " + docType + " record with csid: " + csid);
126 reindexDocument(docType, csid);
128 else if (requestIsForInvocationModeList()) {
129 ListCSIDs list = getInvocationContext().getListCSIDs();
130 List<String> csids = list.getCsid();
132 if (csids == null || csids.size() == 0) {
133 throw new Exception("no listCSIDs were supplied");
136 String docType = getInvocationContext().getDocType();
138 if (StringUtils.isEmpty(docType)) {
139 throw new Exception("No docType was supplied in invocation context.");
142 log.debug("Reindexing " + csids.size() + " " + docType + " records with csids: " + csids.get(0) + ", ...");
144 if (log.isTraceEnabled()) {
145 log.trace(StringUtils.join(csids, ", "));
148 reindexDocuments(docType, csids);
150 else if (requestIsForInvocationModeNoContext()) {
151 Set<String> docTypes = new LinkedHashSet<String>();
154 docType = getInvocationContext().getDocType();
156 if (StringUtils.isNotEmpty(docType)) {
157 docTypes.add(docType);
160 // Read batch size, start and end batches, pause, and additional doctypes from params.
162 for (Param param : this.getParams()) {
163 if (param.getKey().equals("batchSize")) {
164 batchSize = Integer.parseInt(param.getValue());
166 else if (param.getKey().equals("startBatch")) {
167 startBatch = Integer.parseInt(param.getValue());
169 else if (param.getKey().equals("endBatch")) {
170 endBatch = Integer.parseInt(param.getValue());
172 else if (param.getKey().equals("batchPause")) {
173 batchPause = Integer.parseInt(param.getValue());
175 else if (param.getKey().equals("docType")) {
176 docType = param.getValue();
178 if (StringUtils.isNotEmpty(docType)) {
179 docTypes.add(docType);
185 reindexDocuments(docTypes);
188 log.debug("reindexing complete");
190 InvocationResults results = new InvocationResults();
191 results.setNumAffected(numAffected);
192 results.setUserNote("reindexed " + numAffected + " records");
195 setCompletionStatus(STATUS_COMPLETE);
197 catch(StoppedException e) {
198 log.debug("reindexing terminated by stop file");
200 InvocationResults results = new InvocationResults();
201 results.setNumAffected(numAffected);
202 results.setUserNote("reindexing terminated by stop file");
205 setCompletionStatus(STATUS_COMPLETE);
208 setErrorResult(e.getMessage());
211 // This is needed so that when the session is released after this
212 // batch job exits (in BatchDocumentModelHandler), there isn't an exception.
213 // Otherwise, a "Session invoked in a container without a transaction active"
214 // error is thrown from RepositoryJavaClientImpl.releaseRepositorySession.
216 if (isTransactionActive) {
217 TransactionHelper.startTransaction();
222 private void initResourceMap() {
223 resourcesByDocType = new HashMap<String, NuxeoBasedResource>();
225 for (CollectionSpaceResource<PoxPayloadIn, PoxPayloadOut> resource : getResourceMap().values()) {
226 Map<UriTemplateRegistryKey, StoredValuesUriTemplate> entries = resource.getUriRegistryEntries();
228 for (UriTemplateRegistryKey key : entries.keySet()) {
229 String docType = key.getDocType();
230 String tenantId = key.getTenantId();
232 if (getTenantId().equals(tenantId)) {
233 if (resourcesByDocType.containsKey(docType)) {
234 log.warn("multiple resources found for docType " + docType);
236 NuxeoBasedResource currentResource = resourcesByDocType.get(docType);
237 NuxeoBasedResource candidateResource = (NuxeoBasedResource) resource;
239 // Favor the resource that isn't an AuthorityResource. This
240 // is really just to deal with Contacts, which are handled
241 // by ContactResource, PersonAuthorityResource, and
242 // OrgAuthorityResource. We want to use ContactResource.
244 if (!(candidateResource instanceof AuthorityResource) && (currentResource instanceof AuthorityResource)) {
245 resourcesByDocType.put(docType, candidateResource);
248 log.warn("using " + resourcesByDocType.get(docType));
251 resourcesByDocType.put(docType, (NuxeoBasedResource) resource);
258 private void reindexDocuments(Set<String> docTypes) throws Exception {
259 if (docTypes == null) {
260 docTypes = new LinkedHashSet<String>();
263 // If no types are specified, do them all.
265 if (docTypes.size() == 0) {
266 docTypes.addAll(getAllDocTypes());
269 for (String docType : docTypes) {
270 reindexDocuments(docType);
274 private List<String> getAllDocTypes() {
275 List<String> docTypes = new ArrayList<String>(resourcesByDocType.keySet());
276 Collections.sort(docTypes);
278 log.debug("Call to getAllDocTypes() method found: " + StringUtils.join(docTypes, ", "));
283 private void reindexDocuments(String docType) throws Exception {
284 // Check for a stop file before reindexing the docType.
286 if (batchStopFileExists() || docTypeStopFileExists()) {
287 throw new StoppedException();
290 log.debug("reindexing docType " + docType);
292 NuxeoBasedResource resource = resourcesByDocType.get(docType);
294 if (resource == null) {
295 log.warn("No service resource found for docType " + docType);
298 boolean isAuthorityItem = false;
300 if (resource instanceof AuthorityResource) {
301 UriTemplateRegistryKey key = new UriTemplateRegistryKey(getTenantId(), docType);
302 StoredValuesUriTemplate uriTemplate = resource.getUriRegistryEntries().get(key);
304 log.debug("uriTemplateType=" + uriTemplate.getUriTemplateType());
306 if (uriTemplate.getUriTemplateType() == UriTemplateFactory.ITEM) {
307 isAuthorityItem = true;
311 int pageSize = batchSize;
313 // The supplied start and end batch numbers start with 1, but the page number starts with 0.
314 int startPage = (startBatch > 0) ? startBatch - 1 : 0;
315 int endPage = (endBatch > 0) ? endBatch - 1 : Integer.MAX_VALUE;
317 if (isAuthorityItem) {
318 List<String> vocabularyCsids = getVocabularyCsids((AuthorityResource<?, ?>) resource);
320 for (String vocabularyCsid : vocabularyCsids) {
321 int pageNum = startPage;
322 List<String> csids = null;
324 log.debug("Reindexing vocabulary of " + docType + " with csid " + vocabularyCsid);
327 // Check for a stop file before reindexing the batch.
329 if (batchStopFileExists()) {
330 throw new StoppedException();
333 csids = findAllAuthorityItems((AuthorityResource<?, ?>) resource, vocabularyCsid, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name");
335 if (csids.size() > 0) {
336 log.debug("reindexing vocabulary of " + docType +" with csid " + vocabularyCsid + ", batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0));
338 // Pause for the configured amount of time.
340 if (batchPause > 0) {
341 log.trace("pausing " + batchPause + " ms");
343 Thread.sleep(batchPause);
346 reindexDocuments(docType, csids);
351 while(csids.size() == pageSize && pageNum <= endPage);
354 int pageNum = startPage;
355 List<String> csids = null;
358 // Check for a stop file before reindexing the batch.
360 if (batchStopFileExists()) {
361 throw new StoppedException();
364 csids = findAll(resource, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name");
366 if (csids.size() > 0) {
367 log.debug("reindexing " + docType +" batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0));
369 // Pause for the configured amount of time.
371 if (batchPause > 0) {
372 log.trace("pausing " + batchPause + " ms");
374 Thread.sleep(batchPause);
377 reindexDocuments(docType, csids);
382 while(csids.size() == pageSize && pageNum <= endPage);
386 private void reindexDocument(String docType, String csid) throws Exception {
387 reindexDocuments(docType, Arrays.asList(csid));
390 private void reindexDocuments(String docType, List<String> csids) throws Exception {
391 // Convert the csids to structs of nuxeo id and type, as expected
394 if (csids == null || csids.size() == 0) {
398 getLowLevelSession();
399 List<ReindexInfo> infos = new ArrayList<ReindexInfo>();
401 String query = "SELECT ecm:uuid, ecm:primaryType FROM Document " +
402 "WHERE ecm:name IN (" + StringUtils.join(quoteList(csids), ',') + ") " +
403 "AND ecm:primaryType LIKE '" + docType + "%' " +
404 "AND ecm:isCheckedInVersion = 0 AND ecm:isProxy = 0";
405 IterableQueryResult result = session.queryAndFetch(query, NXQL.NXQL, QueryFilter.EMPTY);
408 for (Map<String, Serializable> map : result) {
409 String id = (String) map.get(NXQL.ECM_UUID);
410 String type = (String) map.get(NXQL.ECM_PRIMARYTYPE);
411 infos.add(new ReindexInfo(id, type));
417 if (csids.size() != infos.size()) {
418 log.warn("didn't find info for all the supplied csids: expected " + csids.size() + ", found " + infos.size());
421 if (log.isTraceEnabled()) {
422 for (ReindexInfo info : infos) {
423 log.trace(info.type + " " + info.id);
429 numAffected += infos.size();
432 private List<String> quoteList(List<String> values) {
433 List<String> quoted = new ArrayList<String>(values.size());
435 for (String value : values) {
436 quoted.add("'" + value + "'");
442 private boolean batchStopFileExists() {
443 return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + BATCH_STOP_FILE).isFile());
446 private boolean docTypeStopFileExists() {
447 return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + DOCTYPE_STOP_FILE).isFile());
450 private static class StoppedException extends Exception {
451 private static final long serialVersionUID = 8813189331855935939L;
453 public StoppedException() {
459 * The code below this comment is copied from the nuxeo-reindex-fulltext
460 * module. The original copyright is below.
464 * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors.
466 * All rights reserved. This program and the accompanying materials
467 * are made available under the terms of the GNU Lesser General Public License
468 * (LGPL) version 2.1 which accompanies this distribution, and is available at
469 * http://www.gnu.org/licenses/lgpl.html
471 * This library is distributed in the hope that it will be useful,
472 * but WITHOUT ANY WARRANTY; without even the implied warranty of
473 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
474 * Lesser General Public License for more details.
481 * Launches a fulltext reindexing of the database.
483 * @param batchSize the batch size, defaults to 100
484 * @param batch if present, the batch number to process instead of all
485 * batches; starts at 1
486 * @return when done, ok + the total number of docs
488 public String reindexFulltext(int batchSize, int batch, String query) throws Exception {
489 Principal principal = coreSession.getPrincipal();
490 if (!(principal instanceof NuxeoPrincipal)) {
491 return "unauthorized";
493 NuxeoPrincipal nuxeoPrincipal = (NuxeoPrincipal) principal;
494 if (!nuxeoPrincipal.isAdministrator()) {
495 return "unauthorized";
498 log("Reindexing starting");
499 if (batchSize <= 0) {
500 batchSize = DEFAULT_BATCH_SIZE;
504 // A default query that gets ALL the documents
507 query = "SELECT ecm:uuid, ecm:primaryType FROM Document"
508 + " WHERE ecm:isProxy = 0"
509 + " AND ecm:currentLifeCycleState <> 'deleted'"
510 + " ORDER BY ecm:uuid";
513 List<ReindexInfo> infos = getInfos(query);
514 int size = infos.size();
515 int numBatches = (size + batchSize - 1) / batchSize;
516 if (batch < 0 || batch > numBatches) {
521 log("Reindexing of %s documents, batch size: %s, number of batches: %s",
522 size, batchSize, numBatches);
524 log("Reindexing limited to batch: %s", batch + 1);
528 // Commit and close the transaction that was started by our standard request lifecycle.
530 boolean tx = TransactionHelper.isTransactionActive();
532 TransactionHelper.commitOrRollbackTransaction();
537 for (int i = 0; i < numBatches; i++) {
538 if (batch >= 0 && batch != i) {
541 int pos = i * batchSize;
542 int end = pos + batchSize;
546 List<ReindexInfo> batchInfos = infos.subList(pos, end);
547 log("Reindexing batch %s/%s, first id: %s", i + 1, numBatches,
548 batchInfos.get(0).id);
551 } catch (NuxeoException e) {
552 log.error("Error processing batch " + i + 1, e);
558 log("Reindexing done");
560 // Start a new transaction so our standard request lifecycle can complete.
563 TransactionHelper.startTransaction();
565 return "done: " + n + " total: " + size + " batch_errors: " + errs;
568 protected void log(String format, Object... args) {
569 log.warn(String.format(format, args));
573 * This has to be called once the transaction has been started.
575 protected void getLowLevelSession() throws Exception {
577 SQLSession s = (SQLSession) ((AbstractSession) coreSession).getSession();
578 Field f2 = SQLSession.class.getDeclaredField("session");
579 f2.setAccessible(true);
580 session = (Session) f2.get(s);
581 fulltextConfiguration = session.getModel().getFulltextConfiguration();
582 } catch (ReflectiveOperationException e) {
583 throw new NuxeoException(e);
587 protected List<ReindexInfo> getInfos(String query) throws Exception {
588 getLowLevelSession();
589 List<ReindexInfo> infos = new ArrayList<ReindexInfo>();
590 IterableQueryResult it = session.queryAndFetch(query, NXQL.NXQL,
593 for (Map<String, Serializable> map : it) {
594 Serializable id = map.get(NXQL.ECM_UUID);
595 String type = (String) map.get(NXQL.ECM_PRIMARYTYPE);
596 infos.add(new ReindexInfo(id, type));
604 protected void doBatch(List<ReindexInfo> infos) throws Exception {
608 // transaction for the sync batch
609 tx = TransactionHelper.startTransaction();
611 getLowLevelSession(); // for fulltextInfo
612 List<Serializable> ids = new ArrayList<Serializable>(infos.size());
613 Set<String> asyncIds = new HashSet<String>();
614 Model model = session.getModel();
615 for (ReindexInfo info : infos) {
617 if (fulltextConfiguration.isFulltextIndexable(info.type)) {
618 asyncIds.add(model.idToString(info.id));
623 runSyncBatch(ids, asyncIds);
628 TransactionHelper.setTransactionRollbackOnly();
629 log.error("Rolling back sync");
631 TransactionHelper.commitOrRollbackTransaction();
635 runAsyncBatch(asyncIds);
637 // wait for async completion after transaction commit
638 Framework.getLocalService(EventService.class).waitForAsyncCompletion();
642 * Do this at the low-level session level because we may have to modify
643 * things like versions which aren't usually modifiable, and it's also good
644 * to bypass all listeners.
646 protected void runSyncBatch(List<Serializable> ids, Set<String> asyncIds) throws Exception {
647 getLowLevelSession();
649 session.getNodesByIds(ids); // batch fetch
651 Map<Serializable, String> titles = new HashMap<Serializable, String>();
652 for (Serializable id : ids) {
653 Node node = session.getNodeById(id);
654 if (asyncIds.contains(id)) {
655 node.setSimpleProperty(Model.FULLTEXT_JOBID_PROP, id);
659 prop = node.getSimpleProperty(DC_TITLE);
660 } catch (IllegalArgumentException e) {
663 String title = (String) prop.getValue();
664 titles.put(id, title);
665 prop.setValue(title + " ");
669 for (Serializable id : ids) {
670 Node node = session.getNodeById(id);
673 prop = node.getSimpleProperty(DC_TITLE);
674 } catch (IllegalArgumentException e) {
677 prop.setValue(titles.get(id));
682 protected void runAsyncBatch(Set<String> asyncIds) {
683 if (asyncIds.isEmpty()) {
686 String repositoryName = coreSession.getRepositoryName();
687 WorkManager workManager = Framework.getLocalService(WorkManager.class);
688 for (String id : asyncIds) {
689 Work work = new SQLFulltextExtractorWork(repositoryName, id);
690 // schedule immediately, we're outside a transaction
691 workManager.schedule(work, Scheduling.IF_NOT_SCHEDULED, false);