From: Ray Lee Date: Thu, 28 Jun 2018 21:53:32 +0000 (-0700) Subject: NOJIRA: Update ReindexFullTextBatchJob for 5.0. X-Git-Url: https://git.aero2k.de/?a=commitdiff_plain;h=bfc97a46c4837fa5a6ee4e438347fdcb920a816e;p=tmp%2Fjakarta-migration.git NOJIRA: Update ReindexFullTextBatchJob for 5.0. --- diff --git a/services/batch/service/src/main/java/org/collectionspace/services/batch/nuxeo/ReindexFullTextBatchJob.java b/services/batch/service/src/main/java/org/collectionspace/services/batch/nuxeo/ReindexFullTextBatchJob.java index bd388c7f2..1d662e629 100644 --- a/services/batch/service/src/main/java/org/collectionspace/services/batch/nuxeo/ReindexFullTextBatchJob.java +++ b/services/batch/service/src/main/java/org/collectionspace/services/batch/nuxeo/ReindexFullTextBatchJob.java @@ -1,6 +1,6 @@ /* * This file contains code from Florent Guillame's nuxeo-reindex-fulltext module. - * + * */ package org.collectionspace.services.batch.nuxeo; @@ -29,7 +29,6 @@ import org.collectionspace.services.common.invocable.InvocationContext.ListCSIDs import org.collectionspace.services.common.invocable.InvocationContext.Params.Param; import org.collectionspace.services.common.invocable.InvocationResults; import org.collectionspace.services.common.vocabulary.AuthorityResource; -import org.collectionspace.services.nuxeo.util.ReindexFulltextRoot.ReindexInfo; import org.nuxeo.ecm.core.api.AbstractSession; import org.nuxeo.ecm.core.api.CoreSession; import org.nuxeo.ecm.core.api.IterableQueryResult; @@ -63,69 +62,67 @@ public class ReindexFullTextBatchJob extends AbstractBatchJob { public static final int DEFAULT_BATCH_PAUSE = 0; public static final String BATCH_STOP_FILE = "stopBatch"; public static final String DOCTYPE_STOP_FILE = "stopDocType"; - + private int batchSize = DEFAULT_BATCH_SIZE; private int batchPause = DEFAULT_BATCH_PAUSE; private int startBatch = DEFAULT_START_BATCH; private int endBatch = DEFAULT_END_BATCH; private int numAffected = 0; - + private String stopFileDirectory; private CoreSession coreSession; private Session session = null; protected FulltextConfiguration fulltextConfiguration; - + private Map resourcesByDocType; public ReindexFullTextBatchJob() { setSupportedInvocationModes(Arrays.asList(INVOCATION_MODE_NO_CONTEXT, INVOCATION_MODE_SINGLE, INVOCATION_MODE_LIST)); - + stopFileDirectory = System.getProperty("java.io.tmpdir") + File.separator + ReindexFullTextBatchJob.class.getName(); - + log.debug("stop file directory is " + stopFileDirectory); } - + @Override public void run() { setCompletionStatus(STATUS_MIN_PROGRESS); - + numAffected = 0; - - // This is needed so that resource calls (which start transactions) - // will work. Otherwise, a javax.transaction.NotSupportedException - // ("Nested transactions are not supported") is thrown. - + boolean isTransactionActive = TransactionHelper.isTransactionActive(); - + + // Commit and close the transaction that was started by the standard request lifecycle. + if (isTransactionActive) { TransactionHelper.commitOrRollbackTransaction(); } - + try { coreSession = getRepoSession().getCoreSession(); if (requestIsForInvocationModeSingle()) { String csid = getInvocationContext().getSingleCSID(); - + if (csid == null) { throw new Exception("No singleCSID was supplied in invocation context."); } String docType = getInvocationContext().getDocType(); - + if (StringUtils.isEmpty(docType)) { throw new Exception("No docType was supplied in invocation context."); } log.debug("Reindexing " + docType + " record with csid: " + csid); - + reindexDocument(docType, csid); } else if (requestIsForInvocationModeList()) { ListCSIDs list = getInvocationContext().getListCSIDs(); List csids = list.getCsid(); - + if (csids == null || csids.size() == 0) { throw new Exception("no listCSIDs were supplied"); } @@ -137,25 +134,25 @@ public class ReindexFullTextBatchJob extends AbstractBatchJob { } log.debug("Reindexing " + csids.size() + " " + docType + " records with csids: " + csids.get(0) + ", ..."); - + if (log.isTraceEnabled()) { log.trace(StringUtils.join(csids, ", ")); } - + reindexDocuments(docType, csids); } else if (requestIsForInvocationModeNoContext()) { Set docTypes = new LinkedHashSet(); String docType; - + docType = getInvocationContext().getDocType(); if (StringUtils.isNotEmpty(docType)) { docTypes.add(docType); } - + // Read batch size, start and end batches, pause, and additional doctypes from params. - + for (Param param : this.getParams()) { if (param.getKey().equals("batchSize")) { batchSize = Integer.parseInt(param.getValue()); @@ -171,144 +168,141 @@ public class ReindexFullTextBatchJob extends AbstractBatchJob { } else if (param.getKey().equals("docType")) { docType = param.getValue(); - + if (StringUtils.isNotEmpty(docType)) { docTypes.add(docType); } } } - + initResourceMap(); reindexDocuments(docTypes); } - + log.debug("reindexing complete"); - + InvocationResults results = new InvocationResults(); results.setNumAffected(numAffected); results.setUserNote("reindexed " + numAffected + " records"); - + setResults(results); setCompletionStatus(STATUS_COMPLETE); } catch(StoppedException e) { log.debug("reindexing terminated by stop file"); - + InvocationResults results = new InvocationResults(); results.setNumAffected(numAffected); results.setUserNote("reindexing terminated by stop file"); - + setResults(results); setCompletionStatus(STATUS_COMPLETE); } catch(Exception e) { setErrorResult(e.getMessage()); - } - finally { - // This is needed so that when the session is released after this - // batch job exits (in BatchDocumentModelHandler), there isn't an exception. - // Otherwise, a "Session invoked in a container without a transaction active" - // error is thrown from RepositoryJavaClientImpl.releaseRepositorySession. + } + finally { + // Start a new transaction so the standard request lifecycle can complete. if (isTransactionActive) { TransactionHelper.startTransaction(); } } } - + private void initResourceMap() { resourcesByDocType = new HashMap(); for (CollectionSpaceResource resource : getResourceMap().values()) { Map entries = resource.getUriRegistryEntries(); - + for (UriTemplateRegistryKey key : entries.keySet()) { String docType = key.getDocType(); String tenantId = key.getTenantId(); - + if (getTenantId().equals(tenantId)) { if (resourcesByDocType.containsKey(docType)) { log.warn("multiple resources found for docType " + docType); - + NuxeoBasedResource currentResource = resourcesByDocType.get(docType); NuxeoBasedResource candidateResource = (NuxeoBasedResource) resource; - + // Favor the resource that isn't an AuthorityResource. This // is really just to deal with Contacts, which are handled // by ContactResource, PersonAuthorityResource, and // OrgAuthorityResource. We want to use ContactResource. - + if (!(candidateResource instanceof AuthorityResource) && (currentResource instanceof AuthorityResource)) { resourcesByDocType.put(docType, candidateResource); } - + log.warn("using " + resourcesByDocType.get(docType)); } else { resourcesByDocType.put(docType, (NuxeoBasedResource) resource); } - } + } } } } - + private void reindexDocuments(Set docTypes) throws Exception { if (docTypes == null) { docTypes = new LinkedHashSet(); } // If no types are specified, do them all. - + if (docTypes.size() == 0) { docTypes.addAll(getAllDocTypes()); } - + for (String docType : docTypes) { reindexDocuments(docType); } } - + private List getAllDocTypes() { List docTypes = new ArrayList(resourcesByDocType.keySet()); Collections.sort(docTypes); log.debug("Call to getAllDocTypes() method found: " + StringUtils.join(docTypes, ", ")); - + return docTypes; } - + private void reindexDocuments(String docType) throws Exception { // Check for a stop file before reindexing the docType. - + if (batchStopFileExists() || docTypeStopFileExists()) { throw new StoppedException(); } - + log.debug("reindexing docType " + docType); - + NuxeoBasedResource resource = resourcesByDocType.get(docType); - + if (resource == null) { log.warn("No service resource found for docType " + docType); } - + boolean isAuthorityItem = false; - + if (resource instanceof AuthorityResource) { UriTemplateRegistryKey key = new UriTemplateRegistryKey(getTenantId(), docType); StoredValuesUriTemplate uriTemplate = resource.getUriRegistryEntries().get(key); - + log.debug("uriTemplateType=" + uriTemplate.getUriTemplateType()); - + if (uriTemplate.getUriTemplateType() == UriTemplateFactory.ITEM) { isAuthorityItem = true; } } - + int pageSize = batchSize; - + // The supplied start and end batch numbers start with 1, but the page number starts with 0. - int startPage = (startBatch > 0) ? startBatch - 1 : 0; + int startPage = (startBatch > 0) ? startBatch - 1 : 0; int endPage = (endBatch > 0) ? endBatch - 1 : Integer.MAX_VALUE; if (isAuthorityItem) { @@ -319,30 +313,30 @@ public class ReindexFullTextBatchJob extends AbstractBatchJob { List csids = null; log.debug("Reindexing vocabulary of " + docType + " with csid " + vocabularyCsid); - + do { // Check for a stop file before reindexing the batch. - + if (batchStopFileExists()) { throw new StoppedException(); } csids = findAllAuthorityItems((AuthorityResource) resource, vocabularyCsid, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name"); - + if (csids.size() > 0) { log.debug("reindexing vocabulary of " + docType +" with csid " + vocabularyCsid + ", batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0)); - + // Pause for the configured amount of time. - + if (batchPause > 0) { log.trace("pausing " + batchPause + " ms"); - + Thread.sleep(batchPause); } - + reindexDocuments(docType, csids); } - + pageNum++; } while(csids.size() == pageSize && pageNum <= endPage); @@ -353,37 +347,37 @@ public class ReindexFullTextBatchJob extends AbstractBatchJob { do { // Check for a stop file before reindexing the batch. - + if (batchStopFileExists()) { throw new StoppedException(); } - + csids = findAll(resource, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name"); - + if (csids.size() > 0) { log.debug("reindexing " + docType +" batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0)); - + // Pause for the configured amount of time. - + if (batchPause > 0) { log.trace("pausing " + batchPause + " ms"); - + Thread.sleep(batchPause); } reindexDocuments(docType, csids); } - + pageNum++; } while(csids.size() == pageSize && pageNum <= endPage); } } - + private void reindexDocument(String docType, String csid) throws Exception { reindexDocuments(docType, Arrays.asList(csid)); } - + private void reindexDocuments(String docType, List csids) throws Exception { // Convert the csids to structs of nuxeo id and type, as expected // by doBatch. @@ -391,51 +385,85 @@ public class ReindexFullTextBatchJob extends AbstractBatchJob { if (csids == null || csids.size() == 0) { return; } - - getLowLevelSession(); - List infos = new ArrayList(); + + // Transaction for the batch + boolean tx = TransactionHelper.startTransaction(); + + getLowLevelSession(); + + List infos = new ArrayList(); String query = "SELECT ecm:uuid, ecm:primaryType FROM Document " + "WHERE ecm:name IN (" + StringUtils.join(quoteList(csids), ',') + ") " + "AND ecm:primaryType LIKE '" + docType + "%' " + "AND ecm:isCheckedInVersion = 0 AND ecm:isProxy = 0"; IterableQueryResult result = session.queryAndFetch(query, NXQL.NXQL, QueryFilter.EMPTY); - + try { for (Map map : result) { String id = (String) map.get(NXQL.ECM_UUID); String type = (String) map.get(NXQL.ECM_PRIMARYTYPE); - infos.add(new ReindexInfo(id, type)); + infos.add(new Info(id, type)); } } finally { result.close(); } - + if (csids.size() != infos.size()) { log.warn("didn't find info for all the supplied csids: expected " + csids.size() + ", found " + infos.size()); } - + if (log.isTraceEnabled()) { - for (ReindexInfo info : infos) { + for (Info info : infos) { log.trace(info.type + " " + info.id); } } - - doBatch(infos); - + numAffected += infos.size(); + + // Below code copied from the doBatch function. + + boolean ok; + + List ids = new ArrayList(infos.size()); + Set asyncIds = new HashSet(); + Model model = session.getModel(); + for (Info info : infos) { + ids.add(info.id); + if (fulltextConfiguration.isFulltextIndexable(info.type)) { + asyncIds.add(model.idToString(info.id)); + } + } + ok = false; + try { + runSyncBatch(ids, asyncIds); + ok = true; + } finally { + if (tx) { + if (!ok) { + TransactionHelper.setTransactionRollbackOnly(); + log.error("Rolling back sync"); + } + TransactionHelper.commitOrRollbackTransaction(); + } + } + + runAsyncBatch(asyncIds); + + // wait for async completion after transaction commit + Framework.getLocalService(EventService.class).waitForAsyncCompletion(); } - + private List quoteList(List values) { List quoted = new ArrayList(values.size()); - + for (String value : values) { quoted.add("'" + value + "'"); } - + return quoted; } - + private boolean batchStopFileExists() { return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + BATCH_STOP_FILE).isFile()); } @@ -443,20 +471,20 @@ public class ReindexFullTextBatchJob extends AbstractBatchJob { private boolean docTypeStopFileExists() { return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + DOCTYPE_STOP_FILE).isFile()); } - + private static class StoppedException extends Exception { private static final long serialVersionUID = 8813189331855935939L; public StoppedException() { - + } } - + /* * The code below this comment is copied from the nuxeo-reindex-fulltext * module. The original copyright is below. */ - + /* * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors. * @@ -471,221 +499,124 @@ public class ReindexFullTextBatchJob extends AbstractBatchJob { * Lesser General Public License for more details. * * Contributors: - * Florent Guillaume - */ - - /** - * Launches a fulltext reindexing of the database. - * - * @param batchSize the batch size, defaults to 100 - * @param batch if present, the batch number to process instead of all - * batches; starts at 1 - * @return when done, ok + the total number of docs + * Florent Guillaume */ - public String reindexFulltext(int batchSize, int batch, String query) throws Exception { - Principal principal = coreSession.getPrincipal(); - if (!(principal instanceof NuxeoPrincipal)) { - return "unauthorized"; - } - NuxeoPrincipal nuxeoPrincipal = (NuxeoPrincipal) principal; - if (!nuxeoPrincipal.isAdministrator()) { - return "unauthorized"; - } - - log("Reindexing starting"); - if (batchSize <= 0) { - batchSize = DEFAULT_BATCH_SIZE; - } - - // - // A default query that gets ALL the documents - // - if (query == null) { - query = "SELECT ecm:uuid, ecm:primaryType FROM Document" - + " WHERE ecm:isProxy = 0" - + " AND ecm:currentLifeCycleState <> 'deleted'" - + " ORDER BY ecm:uuid"; - } - - List infos = getInfos(query); - int size = infos.size(); - int numBatches = (size + batchSize - 1) / batchSize; - if (batch < 0 || batch > numBatches) { - batch = 0; // all - } - batch--; - - log("Reindexing of %s documents, batch size: %s, number of batches: %s", - size, batchSize, numBatches); - if (batch >= 0) { - log("Reindexing limited to batch: %s", batch + 1); - } - - // - // Commit and close the transaction that was started by our standard request lifecycle. - // - boolean tx = TransactionHelper.isTransactionActive(); - if (tx) { - TransactionHelper.commitOrRollbackTransaction(); - } - - int n = 0; - int errs = 0; - for (int i = 0; i < numBatches; i++) { - if (batch >= 0 && batch != i) { - continue; - } - int pos = i * batchSize; - int end = pos + batchSize; - if (end > size) { - end = size; - } - List batchInfos = infos.subList(pos, end); - log("Reindexing batch %s/%s, first id: %s", i + 1, numBatches, - batchInfos.get(0).id); - try { - doBatch(batchInfos); - } catch (NuxeoException e) { - log.error("Error processing batch " + i + 1, e); - errs++; - } - n += end - pos; - } - - log("Reindexing done"); - // - // Start a new transaction so our standard request lifecycle can complete. - // - if (tx) { - TransactionHelper.startTransaction(); - } - return "done: " + n + " total: " + size + " batch_errors: " + errs; - } - - protected void log(String format, Object... args) { - log.warn(String.format(format, args)); + + protected static class Info { + public final Serializable id; + + public final String type; + + public Info(Serializable id, String type) { + this.id = id; + this.type = type; + } } /** * This has to be called once the transaction has been started. */ - protected void getLowLevelSession() throws Exception { - try { - SQLSession s = (SQLSession) ((AbstractSession) coreSession).getSession(); - Field f2 = SQLSession.class.getDeclaredField("session"); - f2.setAccessible(true); - session = (Session) f2.get(s); - fulltextConfiguration = session.getModel().getFulltextConfiguration(); - } catch (ReflectiveOperationException e) { - throw new NuxeoException(e); - } - } - - protected List getInfos(String query) throws Exception { - getLowLevelSession(); - List infos = new ArrayList(); - IterableQueryResult it = session.queryAndFetch(query, NXQL.NXQL, - QueryFilter.EMPTY); - try { - for (Map map : it) { - Serializable id = map.get(NXQL.ECM_UUID); - String type = (String) map.get(NXQL.ECM_PRIMARYTYPE); - infos.add(new ReindexInfo(id, type)); - } - } finally { - it.close(); - } - return infos; - } - - protected void doBatch(List infos) throws Exception { - boolean tx; - boolean ok; - - // transaction for the sync batch - tx = TransactionHelper.startTransaction(); - - getLowLevelSession(); // for fulltextInfo - List ids = new ArrayList(infos.size()); - Set asyncIds = new HashSet(); - Model model = session.getModel(); - for (ReindexInfo info : infos) { - ids.add(info.id); - if (fulltextConfiguration.isFulltextIndexable(info.type)) { - asyncIds.add(model.idToString(info.id)); - } - } - ok = false; - try { - runSyncBatch(ids, asyncIds); - ok = true; - } finally { - if (tx) { - if (!ok) { - TransactionHelper.setTransactionRollbackOnly(); - log.error("Rolling back sync"); - } - TransactionHelper.commitOrRollbackTransaction(); - } - } - - runAsyncBatch(asyncIds); - - // wait for async completion after transaction commit - Framework.getLocalService(EventService.class).waitForAsyncCompletion(); - } + protected void getLowLevelSession() { + try { + SQLSession s = (SQLSession) ((AbstractSession) coreSession).getSession(); + Field f2 = SQLSession.class.getDeclaredField("session"); + f2.setAccessible(true); + session = (Session) f2.get(s); + fulltextConfiguration = session.getModel().getFulltextConfiguration(); + } catch (ReflectiveOperationException e) { + throw new NuxeoException(e); + } + } + + protected void doBatch(List infos) { + boolean tx; + boolean ok; + + // transaction for the sync batch + tx = TransactionHelper.startTransaction(); + + getLowLevelSession(); // for fulltextInfo + List ids = new ArrayList(infos.size()); + Set asyncIds = new HashSet(); + Model model = session.getModel(); + for (Info info : infos) { + ids.add(info.id); + if (fulltextConfiguration.isFulltextIndexable(info.type)) { + asyncIds.add(model.idToString(info.id)); + } + } + ok = false; + try { + runSyncBatch(ids, asyncIds); + ok = true; + } finally { + if (tx) { + if (!ok) { + TransactionHelper.setTransactionRollbackOnly(); + log.error("Rolling back sync"); + } + TransactionHelper.commitOrRollbackTransaction(); + } + } + + runAsyncBatch(asyncIds); + + // wait for async completion after transaction commit + Framework.getLocalService(EventService.class).waitForAsyncCompletion(); + } /* - * Do this at the low-level session level because we may have to modify - * things like versions which aren't usually modifiable, and it's also good - * to bypass all listeners. - */ - protected void runSyncBatch(List ids, Set asyncIds) throws Exception { - getLowLevelSession(); - - session.getNodesByIds(ids); // batch fetch - - Map titles = new HashMap(); - for (Serializable id : ids) { - Node node = session.getNodeById(id); - if (asyncIds.contains(id)) { - node.setSimpleProperty(Model.FULLTEXT_JOBID_PROP, id); - } - SimpleProperty prop; - try { - prop = node.getSimpleProperty(DC_TITLE); - } catch (IllegalArgumentException e) { - continue; - } - String title = (String) prop.getValue(); - titles.put(id, title); - prop.setValue(title + " "); - } - session.save(); - - for (Serializable id : ids) { - Node node = session.getNodeById(id); - SimpleProperty prop; - try { - prop = node.getSimpleProperty(DC_TITLE); - } catch (IllegalArgumentException e) { - continue; - } - prop.setValue(titles.get(id)); - } - session.save(); - } - - protected void runAsyncBatch(Set asyncIds) { - if (asyncIds.isEmpty()) { - return; - } - String repositoryName = coreSession.getRepositoryName(); - WorkManager workManager = Framework.getLocalService(WorkManager.class); - for (String id : asyncIds) { - Work work = new SQLFulltextExtractorWork(repositoryName, id); - // schedule immediately, we're outside a transaction - workManager.schedule(work, Scheduling.IF_NOT_SCHEDULED, false); - } - } -} + * Do this at the low-level session level because we may have to modify + * things like versions which aren't usually modifiable, and it's also good + * to bypass all listeners. + */ + protected void runSyncBatch(List ids, Set asyncIds) { + getLowLevelSession(); + + session.getNodesByIds(ids); // batch fetch + + Map titles = new HashMap(); + for (Serializable id : ids) { + Node node = session.getNodeById(id); + if (asyncIds.contains(id)) { + node.setSimpleProperty(Model.FULLTEXT_JOBID_PROP, id); + } + SimpleProperty prop; + try { + prop = node.getSimpleProperty(DC_TITLE); + } catch (IllegalArgumentException e) { + continue; + } + String title = (String) prop.getValue(); + titles.put(id, title); + prop.setValue(title + " "); + } + session.save(); + + for (Serializable id : ids) { + Node node = session.getNodeById(id); + SimpleProperty prop; + try { + prop = node.getSimpleProperty(DC_TITLE); + } catch (IllegalArgumentException e) { + continue; + } + prop.setValue(titles.get(id)); + } + session.save(); + } + + protected void runAsyncBatch(Set asyncIds) + { + if (asyncIds.isEmpty()) { + return; + } + String repositoryName = coreSession.getRepositoryName(); + WorkManager workManager = Framework.getLocalService(WorkManager.class); + for (String id : asyncIds) { + Work work = new SQLFulltextExtractorWork(repositoryName, id); + // schedule immediately, we're outside a transaction + workManager.schedule(work, Scheduling.IF_NOT_SCHEDULED, false); + } + } +} \ No newline at end of file