]> git.aero2k.de Git - tmp/jakarta-migration.git/blob
1d662e62944cc02d94aadedf3cd9b43e4c027fc4
[tmp/jakarta-migration.git] /
1 /*
2  * This file contains code from Florent Guillame's nuxeo-reindex-fulltext module.
3  *
4  */
5
6 package org.collectionspace.services.batch.nuxeo;
7
8 import java.io.File;
9 import java.io.Serializable;
10 import java.lang.reflect.Field;
11 import java.security.Principal;
12 import java.util.ArrayList;
13 import java.util.Arrays;
14 import java.util.Collections;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.LinkedHashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Set;
21
22 import org.apache.commons.lang.StringUtils;
23 import org.collectionspace.services.common.CollectionSpaceResource;
24 import org.collectionspace.services.common.NuxeoBasedResource;
25 import org.collectionspace.services.common.StoredValuesUriTemplate;
26 import org.collectionspace.services.common.UriTemplateFactory;
27 import org.collectionspace.services.common.UriTemplateRegistryKey;
28 import org.collectionspace.services.common.invocable.InvocationContext.ListCSIDs;
29 import org.collectionspace.services.common.invocable.InvocationContext.Params.Param;
30 import org.collectionspace.services.common.invocable.InvocationResults;
31 import org.collectionspace.services.common.vocabulary.AuthorityResource;
32 import org.nuxeo.ecm.core.api.AbstractSession;
33 import org.nuxeo.ecm.core.api.CoreSession;
34 import org.nuxeo.ecm.core.api.IterableQueryResult;
35 import org.nuxeo.ecm.core.api.NuxeoException;
36 import org.nuxeo.ecm.core.api.NuxeoPrincipal;
37 import org.nuxeo.ecm.core.event.EventService;
38 import org.nuxeo.ecm.core.query.QueryFilter;
39 import org.nuxeo.ecm.core.query.sql.NXQL;
40 import org.nuxeo.ecm.core.storage.FulltextConfiguration;
41 import org.nuxeo.ecm.core.storage.sql.Model;
42 import org.nuxeo.ecm.core.storage.sql.Node;
43 import org.nuxeo.ecm.core.storage.sql.Session;
44 import org.nuxeo.ecm.core.storage.sql.SimpleProperty;
45 import org.nuxeo.ecm.core.storage.sql.coremodel.SQLFulltextExtractorWork;
46 import org.nuxeo.ecm.core.storage.sql.coremodel.SQLSession;
47 import org.nuxeo.ecm.core.work.api.Work;
48 import org.nuxeo.ecm.core.work.api.WorkManager;
49 import org.nuxeo.ecm.core.work.api.WorkManager.Scheduling;
50 import org.nuxeo.runtime.api.Framework;
51 import org.nuxeo.runtime.transaction.TransactionHelper;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 public class ReindexFullTextBatchJob extends AbstractBatchJob {
56         final Logger log = LoggerFactory.getLogger(ReindexFullTextBatchJob.class);
57
58         public static final String DC_TITLE = "dc:title";
59         public static final int DEFAULT_BATCH_SIZE = 1000;
60         public static final int DEFAULT_START_BATCH = 0;
61         public static final int DEFAULT_END_BATCH = 0;
62         public static final int DEFAULT_BATCH_PAUSE = 0;
63         public static final String BATCH_STOP_FILE = "stopBatch";
64         public static final String DOCTYPE_STOP_FILE = "stopDocType";
65
66         private int batchSize = DEFAULT_BATCH_SIZE;
67         private int batchPause = DEFAULT_BATCH_PAUSE;
68         private int startBatch = DEFAULT_START_BATCH;
69         private int endBatch = DEFAULT_END_BATCH;
70         private int numAffected = 0;
71
72         private String stopFileDirectory;
73
74         private CoreSession coreSession;
75         private Session session = null;
76     protected FulltextConfiguration fulltextConfiguration;
77
78         private Map<String, NuxeoBasedResource> resourcesByDocType;
79
80         public ReindexFullTextBatchJob() {
81                 setSupportedInvocationModes(Arrays.asList(INVOCATION_MODE_NO_CONTEXT, INVOCATION_MODE_SINGLE, INVOCATION_MODE_LIST));
82
83                 stopFileDirectory = System.getProperty("java.io.tmpdir") + File.separator + ReindexFullTextBatchJob.class.getName();
84
85                 log.debug("stop file directory is " + stopFileDirectory);
86         }
87
88         @Override
89         public void run() {
90                 setCompletionStatus(STATUS_MIN_PROGRESS);
91
92                 numAffected = 0;
93
94                 boolean isTransactionActive = TransactionHelper.isTransactionActive();
95
96                 // Commit and close the transaction that was started by the standard request lifecycle.
97
98                 if (isTransactionActive) {
99                         TransactionHelper.commitOrRollbackTransaction();
100                 }
101
102                 try {
103                         coreSession = getRepoSession().getCoreSession();
104
105                         if (requestIsForInvocationModeSingle()) {
106                                 String csid = getInvocationContext().getSingleCSID();
107
108                                 if (csid == null) {
109                                         throw new Exception("No singleCSID was supplied in invocation context.");
110                                 }
111
112                                 String docType = getInvocationContext().getDocType();
113
114                                 if (StringUtils.isEmpty(docType)) {
115                                         throw new Exception("No docType was supplied in invocation context.");
116                                 }
117
118                                 log.debug("Reindexing " + docType + " record with csid: " + csid);
119
120                                 reindexDocument(docType, csid);
121                         }
122                         else if (requestIsForInvocationModeList()) {
123                                 ListCSIDs list = getInvocationContext().getListCSIDs();
124                                 List<String> csids = list.getCsid();
125
126                                 if (csids == null || csids.size() == 0) {
127                                         throw new Exception("no listCSIDs were supplied");
128                                 }
129
130                                 String docType = getInvocationContext().getDocType();
131
132                                 if (StringUtils.isEmpty(docType)) {
133                                         throw new Exception("No docType was supplied in invocation context.");
134                                 }
135
136                                 log.debug("Reindexing " + csids.size() + " " + docType + " records with csids: " + csids.get(0) + ", ...");
137
138                                 if (log.isTraceEnabled()) {
139                                         log.trace(StringUtils.join(csids, ", "));
140                                 }
141
142                                 reindexDocuments(docType, csids);
143                         }
144                         else if (requestIsForInvocationModeNoContext()) {
145                                 Set<String> docTypes = new LinkedHashSet<String>();
146                                 String docType;
147
148                                 docType = getInvocationContext().getDocType();
149
150                                 if (StringUtils.isNotEmpty(docType)) {
151                                         docTypes.add(docType);
152                                 }
153
154                                 // Read batch size, start and end batches, pause, and additional doctypes from params.
155
156                                 for (Param param : this.getParams()) {
157                                         if (param.getKey().equals("batchSize")) {
158                                                 batchSize = Integer.parseInt(param.getValue());
159                                         }
160                                         else if (param.getKey().equals("startBatch")) {
161                                                 startBatch = Integer.parseInt(param.getValue());
162                                         }
163                                         else if (param.getKey().equals("endBatch")) {
164                                                 endBatch = Integer.parseInt(param.getValue());
165                                         }
166                                         else if (param.getKey().equals("batchPause")) {
167                                                 batchPause = Integer.parseInt(param.getValue());
168                                         }
169                                         else if (param.getKey().equals("docType")) {
170                                                 docType = param.getValue();
171
172                                                 if (StringUtils.isNotEmpty(docType)) {
173                                                         docTypes.add(docType);
174                                                 }
175                                         }
176                                 }
177
178                                 initResourceMap();
179                                 reindexDocuments(docTypes);
180                         }
181
182                         log.debug("reindexing complete");
183
184                         InvocationResults results = new InvocationResults();
185                         results.setNumAffected(numAffected);
186                         results.setUserNote("reindexed " + numAffected + " records");
187
188                         setResults(results);
189                         setCompletionStatus(STATUS_COMPLETE);
190                 }
191                 catch(StoppedException e) {
192                         log.debug("reindexing terminated by stop file");
193
194                         InvocationResults results = new InvocationResults();
195                         results.setNumAffected(numAffected);
196                         results.setUserNote("reindexing terminated by stop file");
197
198                         setResults(results);
199                         setCompletionStatus(STATUS_COMPLETE);
200                 }
201                 catch(Exception e) {
202                         setErrorResult(e.getMessage());
203                 }
204                 finally {
205                         // Start a new transaction so the standard request lifecycle can complete.
206
207                         if (isTransactionActive) {
208                                 TransactionHelper.startTransaction();
209                         }
210                 }
211         }
212
213         private void initResourceMap() {
214                 resourcesByDocType = new HashMap<String, NuxeoBasedResource>();
215
216                 for (CollectionSpaceResource<?, ?> resource : getResourceMap().values()) {
217                         Map<UriTemplateRegistryKey, StoredValuesUriTemplate> entries = resource.getUriRegistryEntries();
218
219                         for (UriTemplateRegistryKey key : entries.keySet()) {
220                                 String docType = key.getDocType();
221                                 String tenantId = key.getTenantId();
222
223                                 if (getTenantId().equals(tenantId)) {
224                                         if (resourcesByDocType.containsKey(docType)) {
225                                                 log.warn("multiple resources found for docType " + docType);
226
227                                                 NuxeoBasedResource currentResource = resourcesByDocType.get(docType);
228                                                 NuxeoBasedResource candidateResource = (NuxeoBasedResource) resource;
229
230                                                 // Favor the resource that isn't an AuthorityResource. This
231                                                 // is really just to deal with Contacts, which are handled
232                                                 // by ContactResource, PersonAuthorityResource, and
233                                                 // OrgAuthorityResource. We want to use ContactResource.
234
235                                                 if (!(candidateResource instanceof AuthorityResource) && (currentResource instanceof AuthorityResource)) {
236                                                         resourcesByDocType.put(docType, candidateResource);
237                                                 }
238
239                                                 log.warn("using " + resourcesByDocType.get(docType));
240                                         }
241                                         else {
242                                                 resourcesByDocType.put(docType, (NuxeoBasedResource) resource);
243                                         }
244                                 }
245                         }
246                 }
247         }
248
249         private void reindexDocuments(Set<String> docTypes) throws Exception {
250                 if (docTypes == null) {
251                         docTypes = new LinkedHashSet<String>();
252                 }
253
254                 // If no types are specified, do them all.
255
256                 if (docTypes.size() == 0) {
257                         docTypes.addAll(getAllDocTypes());
258                 }
259
260                 for (String docType : docTypes) {
261                         reindexDocuments(docType);
262                 }
263         }
264
265         private List<String> getAllDocTypes() {
266                 List<String> docTypes = new ArrayList<String>(resourcesByDocType.keySet());
267                 Collections.sort(docTypes);
268
269                 log.debug("Call to getAllDocTypes() method found: " + StringUtils.join(docTypes, ", "));
270
271                 return docTypes;
272         }
273
274         private void reindexDocuments(String docType) throws Exception {
275                 // Check for a stop file before reindexing the docType.
276
277                 if (batchStopFileExists() || docTypeStopFileExists()) {
278                         throw new StoppedException();
279                 }
280
281                 log.debug("reindexing docType " + docType);
282
283                 NuxeoBasedResource resource = resourcesByDocType.get(docType);
284
285                 if (resource == null) {
286                         log.warn("No service resource found for docType " + docType);
287                 }
288
289                 boolean isAuthorityItem = false;
290
291                 if (resource instanceof AuthorityResource) {
292                         UriTemplateRegistryKey key = new UriTemplateRegistryKey(getTenantId(), docType);
293                         StoredValuesUriTemplate uriTemplate = resource.getUriRegistryEntries().get(key);
294
295                         log.debug("uriTemplateType=" + uriTemplate.getUriTemplateType());
296
297                         if (uriTemplate.getUriTemplateType() == UriTemplateFactory.ITEM) {
298                                 isAuthorityItem = true;
299                         }
300                 }
301
302                 int pageSize = batchSize;
303
304                 // The supplied start and end batch numbers start with 1, but the page number starts with 0.
305                 int startPage = (startBatch > 0) ? startBatch - 1 : 0;
306                 int endPage = (endBatch > 0) ? endBatch - 1 : Integer.MAX_VALUE;
307
308                 if (isAuthorityItem) {
309                         List<String> vocabularyCsids = getVocabularyCsids((AuthorityResource<?, ?>) resource);
310
311                         for (String vocabularyCsid : vocabularyCsids) {
312                                 int pageNum = startPage;
313                                 List<String> csids = null;
314
315                                 log.debug("Reindexing vocabulary of " + docType + " with csid " + vocabularyCsid);
316
317                                 do {
318                                         // Check for a stop file before reindexing the batch.
319
320                                         if (batchStopFileExists()) {
321                                                 throw new StoppedException();
322                                         }
323
324                                         csids = findAllAuthorityItems((AuthorityResource<?, ?>) resource, vocabularyCsid, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name");
325
326                                         if (csids.size() > 0) {
327                                                 log.debug("reindexing vocabulary of " + docType +" with csid " + vocabularyCsid + ", batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0));
328
329                                                 // Pause for the configured amount of time.
330
331                                                 if (batchPause > 0) {
332                                                         log.trace("pausing " + batchPause + " ms");
333
334                                                         Thread.sleep(batchPause);
335                                                 }
336
337                                                 reindexDocuments(docType, csids);
338                                         }
339
340                                         pageNum++;
341                                 }
342                                 while(csids.size() == pageSize && pageNum <= endPage);
343                         }
344                 } else {
345                         int pageNum = startPage;
346                         List<String> csids = null;
347
348                         do {
349                                 // Check for a stop file before reindexing the batch.
350
351                                 if (batchStopFileExists()) {
352                                         throw new StoppedException();
353                                 }
354
355                                 csids = findAll(resource, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name");
356
357                                 if (csids.size() > 0) {
358                                         log.debug("reindexing " + docType +" batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0));
359
360                                         // Pause for the configured amount of time.
361
362                                         if (batchPause > 0) {
363                                                 log.trace("pausing " + batchPause + " ms");
364
365                                                 Thread.sleep(batchPause);
366                                         }
367
368                                         reindexDocuments(docType, csids);
369                                 }
370
371                                 pageNum++;
372                         }
373                         while(csids.size() == pageSize && pageNum <= endPage);
374                 }
375         }
376
377         private void reindexDocument(String docType, String csid) throws Exception {
378                 reindexDocuments(docType, Arrays.asList(csid));
379         }
380
381         private void reindexDocuments(String docType, List<String> csids) throws Exception {
382                 // Convert the csids to structs of nuxeo id and type, as expected
383                 // by doBatch.
384
385                 if (csids == null || csids.size() == 0) {
386                         return;
387                 }
388
389                 // Transaction for the batch
390                 boolean tx = TransactionHelper.startTransaction();
391
392                 getLowLevelSession();
393
394                 List<Info> infos = new ArrayList<Info>();
395
396                 String query = "SELECT ecm:uuid, ecm:primaryType FROM Document " +
397                                            "WHERE ecm:name IN (" + StringUtils.join(quoteList(csids), ',') + ") " +
398                                            "AND ecm:primaryType LIKE '" + docType + "%' " +
399                                            "AND ecm:isCheckedInVersion = 0 AND ecm:isProxy = 0";
400                 IterableQueryResult result = session.queryAndFetch(query, NXQL.NXQL, QueryFilter.EMPTY);
401
402                 try {
403                         for (Map<String, Serializable> map : result) {
404                                 String id = (String) map.get(NXQL.ECM_UUID);
405                                 String type = (String) map.get(NXQL.ECM_PRIMARYTYPE);
406                                 infos.add(new Info(id, type));
407                         }
408                 } finally {
409                         result.close();
410                 }
411
412                 if (csids.size() != infos.size()) {
413                         log.warn("didn't find info for all the supplied csids: expected " + csids.size() + ", found " + infos.size());
414                 }
415
416                 if (log.isTraceEnabled()) {
417                         for (Info info : infos) {
418                                 log.trace(info.type + " " + info.id);
419                         }
420                 }
421
422                 numAffected += infos.size();
423
424                 // Below code copied from the doBatch function.
425
426                 boolean ok;
427
428                 List<Serializable> ids = new ArrayList<Serializable>(infos.size());
429                 Set<String> asyncIds = new HashSet<String>();
430                 Model model = session.getModel();
431                 for (Info info : infos) {
432                         ids.add(info.id);
433                         if (fulltextConfiguration.isFulltextIndexable(info.type)) {
434                                 asyncIds.add(model.idToString(info.id));
435                         }
436                 }
437                 ok = false;
438                 try {
439                         runSyncBatch(ids, asyncIds);
440                         ok = true;
441                 } finally {
442                         if (tx) {
443                                 if (!ok) {
444                                         TransactionHelper.setTransactionRollbackOnly();
445                                         log.error("Rolling back sync");
446                                 }
447                                 TransactionHelper.commitOrRollbackTransaction();
448                         }
449                 }
450
451                 runAsyncBatch(asyncIds);
452
453                 // wait for async completion after transaction commit
454                 Framework.getLocalService(EventService.class).waitForAsyncCompletion();
455         }
456
457         private List<String> quoteList(List<String> values) {
458                 List<String> quoted = new ArrayList<String>(values.size());
459
460                 for (String value : values) {
461                         quoted.add("'" + value + "'");
462                 }
463
464                 return quoted;
465         }
466
467         private boolean batchStopFileExists() {
468                 return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + BATCH_STOP_FILE).isFile());
469         }
470
471         private boolean docTypeStopFileExists() {
472                 return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + DOCTYPE_STOP_FILE).isFile());
473         }
474
475         private static class StoppedException extends Exception {
476                 private static final long serialVersionUID = 8813189331855935939L;
477
478                 public StoppedException() {
479
480                 }
481         }
482
483         /*
484          * The code below this comment is copied from the nuxeo-reindex-fulltext
485          * module. The original copyright is below.
486          */
487
488         /*
489          * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors.
490          *
491          * All rights reserved. This program and the accompanying materials
492          * are made available under the terms of the GNU Lesser General Public License
493          * (LGPL) version 2.1 which accompanies this distribution, and is available at
494          * http://www.gnu.org/licenses/lgpl.html
495          *
496          * This library is distributed in the hope that it will be useful,
497          * but WITHOUT ANY WARRANTY; without even the implied warranty of
498          * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
499          * Lesser General Public License for more details.
500          *
501          * Contributors:
502          *     Florent Guillaume
503          */
504
505         protected static class Info {
506                 public final Serializable id;
507
508                 public final String type;
509
510                 public Info(Serializable id, String type) {
511                         this.id = id;
512                         this.type = type;
513                 }
514         }
515
516         /**
517          * This has to be called once the transaction has been started.
518          */
519         protected void getLowLevelSession() {
520                 try {
521                         SQLSession s = (SQLSession) ((AbstractSession) coreSession).getSession();
522                         Field f2 = SQLSession.class.getDeclaredField("session");
523                         f2.setAccessible(true);
524                         session = (Session) f2.get(s);
525                         fulltextConfiguration = session.getModel().getFulltextConfiguration();
526                 } catch (ReflectiveOperationException e) {
527                         throw new NuxeoException(e);
528                 }
529         }
530
531         protected void doBatch(List<Info> infos) {
532                 boolean tx;
533                 boolean ok;
534
535                 // transaction for the sync batch
536                 tx = TransactionHelper.startTransaction();
537
538                 getLowLevelSession(); // for fulltextInfo
539                 List<Serializable> ids = new ArrayList<Serializable>(infos.size());
540                 Set<String> asyncIds = new HashSet<String>();
541                 Model model = session.getModel();
542                 for (Info info : infos) {
543                         ids.add(info.id);
544                         if (fulltextConfiguration.isFulltextIndexable(info.type)) {
545                                 asyncIds.add(model.idToString(info.id));
546                         }
547                 }
548                 ok = false;
549                 try {
550                         runSyncBatch(ids, asyncIds);
551                         ok = true;
552                 } finally {
553                         if (tx) {
554                                 if (!ok) {
555                                         TransactionHelper.setTransactionRollbackOnly();
556                                         log.error("Rolling back sync");
557                                 }
558                                 TransactionHelper.commitOrRollbackTransaction();
559                         }
560                 }
561
562                 runAsyncBatch(asyncIds);
563
564                 // wait for async completion after transaction commit
565                 Framework.getLocalService(EventService.class).waitForAsyncCompletion();
566         }
567
568         /*
569                 * Do this at the low-level session level because we may have to modify
570                 * things like versions which aren't usually modifiable, and it's also good
571                 * to bypass all listeners.
572                 */
573         protected void runSyncBatch(List<Serializable> ids, Set<String> asyncIds) {
574                 getLowLevelSession();
575
576                 session.getNodesByIds(ids); // batch fetch
577
578                 Map<Serializable, String> titles = new HashMap<Serializable, String>();
579                 for (Serializable id : ids) {
580                         Node node = session.getNodeById(id);
581                         if (asyncIds.contains(id)) {
582                                 node.setSimpleProperty(Model.FULLTEXT_JOBID_PROP, id);
583                         }
584                         SimpleProperty prop;
585                         try {
586                                 prop = node.getSimpleProperty(DC_TITLE);
587                         } catch (IllegalArgumentException e) {
588                                 continue;
589                         }
590                         String title = (String) prop.getValue();
591                         titles.put(id, title);
592                         prop.setValue(title + " ");
593                 }
594                 session.save();
595
596                 for (Serializable id : ids) {
597                         Node node = session.getNodeById(id);
598                         SimpleProperty prop;
599                         try {
600                                 prop = node.getSimpleProperty(DC_TITLE);
601                         } catch (IllegalArgumentException e) {
602                                 continue;
603                         }
604                         prop.setValue(titles.get(id));
605                 }
606                 session.save();
607         }
608
609         protected void runAsyncBatch(Set<String> asyncIds)
610                         {
611                 if (asyncIds.isEmpty()) {
612                         return;
613                 }
614                 String repositoryName = coreSession.getRepositoryName();
615                 WorkManager workManager = Framework.getLocalService(WorkManager.class);
616                 for (String id : asyncIds) {
617                         Work work = new SQLFulltextExtractorWork(repositoryName, id);
618                         // schedule immediately, we're outside a transaction
619                         workManager.schedule(work, Scheduling.IF_NOT_SCHEDULED, false);
620                 }
621         }
622 }