]> git.aero2k.de Git - tmp/jakarta-migration.git/blob
d4350e085e5c6124f76f9f912e56396e1bc2ac32
[tmp/jakarta-migration.git] /
1 /*
2  * This file contains code from Florent Guillame's nuxeo-reindex-fulltext module.
3  * 
4  */
5
6 package org.collectionspace.services.batch.nuxeo;
7
8 import java.io.File;
9 import java.io.Serializable;
10 import java.lang.reflect.Field;
11 import java.security.Principal;
12 import java.util.ArrayList;
13 import java.util.Arrays;
14 import java.util.Collections;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.LinkedHashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Set;
21
22 import org.apache.commons.lang.StringUtils;
23 import org.collectionspace.services.client.PoxPayloadIn;
24 import org.collectionspace.services.client.PoxPayloadOut;
25 import org.collectionspace.services.common.CollectionSpaceResource;
26 import org.collectionspace.services.common.NuxeoBasedResource;
27 import org.collectionspace.services.common.StoredValuesUriTemplate;
28 import org.collectionspace.services.common.UriTemplateFactory;
29 import org.collectionspace.services.common.UriTemplateRegistryKey;
30 import org.collectionspace.services.common.invocable.InvocationContext.ListCSIDs;
31 import org.collectionspace.services.common.invocable.InvocationContext.Params.Param;
32 import org.collectionspace.services.common.invocable.InvocationResults;
33 import org.collectionspace.services.common.vocabulary.AuthorityResource;
34 import org.collectionspace.services.nuxeo.util.ReindexFulltextRoot.ReindexInfo;
35 import org.nuxeo.ecm.core.api.AbstractSession;
36 import org.nuxeo.ecm.core.api.ClientException;
37 import org.nuxeo.ecm.core.api.CoreSession;
38 import org.nuxeo.ecm.core.api.IterableQueryResult;
39 import org.nuxeo.ecm.core.api.NuxeoException;
40 import org.nuxeo.ecm.core.api.NuxeoPrincipal;
41 import org.nuxeo.ecm.core.event.EventService;
42 import org.nuxeo.ecm.core.query.QueryFilter;
43 import org.nuxeo.ecm.core.query.sql.NXQL;
44 import org.nuxeo.ecm.core.storage.FulltextConfiguration;
45 import org.nuxeo.ecm.core.storage.sql.Model;
46 import org.nuxeo.ecm.core.storage.sql.Node;
47 import org.nuxeo.ecm.core.storage.sql.Session;
48 import org.nuxeo.ecm.core.storage.sql.SimpleProperty;
49 import org.nuxeo.ecm.core.storage.sql.coremodel.SQLFulltextExtractorWork;
50 import org.nuxeo.ecm.core.storage.sql.coremodel.SQLSession;
51 import org.nuxeo.ecm.core.work.api.Work;
52 import org.nuxeo.ecm.core.work.api.WorkManager;
53 import org.nuxeo.ecm.core.work.api.WorkManager.Scheduling;
54 import org.nuxeo.runtime.api.Framework;
55 import org.nuxeo.runtime.transaction.TransactionHelper;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 public class ReindexFullTextBatchJob extends AbstractBatchJob {
60         final Logger log = LoggerFactory.getLogger(ReindexFullTextBatchJob.class);
61
62         public static final String DC_TITLE = "dc:title";
63         public static final int DEFAULT_BATCH_SIZE = 1000;
64         public static final int DEFAULT_START_BATCH = 0;
65         public static final int DEFAULT_END_BATCH = 0;
66         public static final int DEFAULT_BATCH_PAUSE = 0;
67         public static final String BATCH_STOP_FILE = "stopBatch";
68         public static final String DOCTYPE_STOP_FILE = "stopDocType";
69         
70         private int batchSize = DEFAULT_BATCH_SIZE;
71         private int batchPause = DEFAULT_BATCH_PAUSE;
72         private int startBatch = DEFAULT_START_BATCH;
73         private int endBatch = DEFAULT_END_BATCH;
74         private int numAffected = 0;
75         
76         private String stopFileDirectory;
77
78         private CoreSession coreSession;
79         private Session session = null;
80     protected FulltextConfiguration fulltextConfiguration;
81         
82         private Map<String, NuxeoBasedResource> resourcesByDocType;
83
84         public ReindexFullTextBatchJob() {
85                 setSupportedInvocationModes(Arrays.asList(INVOCATION_MODE_NO_CONTEXT, INVOCATION_MODE_SINGLE, INVOCATION_MODE_LIST));
86                 
87                 stopFileDirectory = System.getProperty("java.io.tmpdir") + File.separator + ReindexFullTextBatchJob.class.getName();
88                 
89                 log.debug("stop file directory is " + stopFileDirectory);
90         }
91         
92         @Override
93         public void run() {
94                 setCompletionStatus(STATUS_MIN_PROGRESS);
95                 
96                 numAffected = 0;
97                 
98                 // This is needed so that resource calls (which start transactions)
99                 // will work. Otherwise, a javax.transaction.NotSupportedException 
100                 // ("Nested transactions are not supported") is thrown.
101                 
102                 boolean isTransactionActive = TransactionHelper.isTransactionActive();
103                 
104                 if (isTransactionActive) {
105                         TransactionHelper.commitOrRollbackTransaction();
106                 }
107                 
108                 try {
109                         coreSession = getRepoSession().getCoreSession();
110
111                         if (requestIsForInvocationModeSingle()) {
112                                 String csid = getInvocationContext().getSingleCSID();
113                                 
114                                 if (csid == null) {
115                                         throw new Exception("No singleCSID was supplied in invocation context.");
116                                 }
117
118                                 String docType = getInvocationContext().getDocType();
119                                 
120                                 if (StringUtils.isEmpty(docType)) {
121                                         throw new Exception("No docType was supplied in invocation context.");
122                                 }
123
124                                 log.debug("Reindexing " + docType + " record with csid: " + csid);
125                                 
126                                 reindexDocument(docType, csid);
127                         }
128                         else if (requestIsForInvocationModeList()) {
129                                 ListCSIDs list = getInvocationContext().getListCSIDs();
130                                 List<String> csids = list.getCsid();
131                                 
132                                 if (csids == null || csids.size() == 0) {
133                                         throw new Exception("no listCSIDs were supplied");
134                                 }
135
136                                 String docType = getInvocationContext().getDocType();
137
138                                 if (StringUtils.isEmpty(docType)) {
139                                         throw new Exception("No docType was supplied in invocation context.");
140                                 }
141
142                                 log.debug("Reindexing " + csids.size() + " " + docType + " records with csids: " + csids.get(0) + ", ...");
143                                 
144                                 if (log.isTraceEnabled()) {
145                                         log.trace(StringUtils.join(csids, ", "));
146                                 }
147                                 
148                                 reindexDocuments(docType, csids);
149                         }
150                         else if (requestIsForInvocationModeNoContext()) {
151                                 Set<String> docTypes = new LinkedHashSet<String>();
152                                 String docType;
153                                 
154                                 docType = getInvocationContext().getDocType();
155
156                                 if (StringUtils.isNotEmpty(docType)) {
157                                         docTypes.add(docType);
158                                 }
159                                 
160                                 // Read batch size, start and end batches, pause, and additional doctypes from params.
161                                 
162                                 for (Param param : this.getParams()) {
163                                         if (param.getKey().equals("batchSize")) {
164                                                 batchSize = Integer.parseInt(param.getValue());
165                                         }
166                                         else if (param.getKey().equals("startBatch")) {
167                                                 startBatch = Integer.parseInt(param.getValue());
168                                         }
169                                         else if (param.getKey().equals("endBatch")) {
170                                                 endBatch = Integer.parseInt(param.getValue());
171                                         }
172                                         else if (param.getKey().equals("batchPause")) {
173                                                 batchPause = Integer.parseInt(param.getValue());
174                                         }
175                                         else if (param.getKey().equals("docType")) {
176                                                 docType = param.getValue();
177                                                 
178                                                 if (StringUtils.isNotEmpty(docType)) {
179                                                         docTypes.add(docType);
180                                                 }
181                                         }
182                                 }
183                                 
184                                 initResourceMap();
185                                 reindexDocuments(docTypes);
186                         }
187                         
188                         log.debug("reindexing complete");
189                         
190                         InvocationResults results = new InvocationResults();
191                         results.setNumAffected(numAffected);
192                         results.setUserNote("reindexed " + numAffected + " records");
193                         
194                         setResults(results);
195                         setCompletionStatus(STATUS_COMPLETE);
196                 }
197                 catch(StoppedException e) {
198                         log.debug("reindexing terminated by stop file");
199                         
200                         InvocationResults results = new InvocationResults();
201                         results.setNumAffected(numAffected);
202                         results.setUserNote("reindexing terminated by stop file");
203                         
204                         setResults(results);
205                         setCompletionStatus(STATUS_COMPLETE);
206                 }
207                 catch(Exception e) {
208                         setErrorResult(e.getMessage());
209                 }       
210                 finally {               
211                         // This is needed so that when the session is released after this
212                         // batch job exits (in BatchDocumentModelHandler), there isn't an exception.
213                         // Otherwise, a "Session invoked in a container without a transaction active"
214                         // error is thrown from RepositoryJavaClientImpl.releaseRepositorySession.
215
216                         if (isTransactionActive) {
217                                 TransactionHelper.startTransaction();
218                         }
219                 }
220         }
221         
222         private void initResourceMap() {
223                 resourcesByDocType = new HashMap<String, NuxeoBasedResource>();
224
225                 for (CollectionSpaceResource<PoxPayloadIn, PoxPayloadOut> resource : getResourceMap().values()) {
226                         Map<UriTemplateRegistryKey, StoredValuesUriTemplate> entries = resource.getUriRegistryEntries();
227                         
228                         for (UriTemplateRegistryKey key : entries.keySet()) {
229                                 String docType = key.getDocType();
230                                 String tenantId = key.getTenantId();
231                                 
232                                 if (getTenantId().equals(tenantId)) {
233                                         if (resourcesByDocType.containsKey(docType)) {
234                                                 log.warn("multiple resources found for docType " + docType);
235                                                 
236                                                 NuxeoBasedResource currentResource = resourcesByDocType.get(docType);
237                                                 NuxeoBasedResource candidateResource = (NuxeoBasedResource) resource;
238                                                 
239                                                 // Favor the resource that isn't an AuthorityResource. This
240                                                 // is really just to deal with Contacts, which are handled
241                                                 // by ContactResource, PersonAuthorityResource, and
242                                                 // OrgAuthorityResource. We want to use ContactResource.
243                                                 
244                                                 if (!(candidateResource instanceof AuthorityResource) && (currentResource instanceof AuthorityResource)) {
245                                                         resourcesByDocType.put(docType, candidateResource);
246                                                 }
247                                                 
248                                                 log.warn("using " + resourcesByDocType.get(docType));
249                                         }
250                                         else {
251                                                 resourcesByDocType.put(docType, (NuxeoBasedResource) resource);
252                                         }
253                                 }                               
254                         }
255                 }
256         }
257         
258         private void reindexDocuments(Set<String> docTypes) throws Exception {
259                 if (docTypes == null) {
260                         docTypes = new LinkedHashSet<String>();
261                 }
262
263                 // If no types are specified, do them all.
264                 
265                 if (docTypes.size() == 0) {
266                         docTypes.addAll(getAllDocTypes());
267                 }
268                                 
269                 for (String docType : docTypes) {
270                         reindexDocuments(docType);
271                 }
272         }
273         
274         private List<String> getAllDocTypes() {
275                 List<String> docTypes = new ArrayList<String>(resourcesByDocType.keySet());
276                 Collections.sort(docTypes);
277
278                 log.debug("Call to getAllDocTypes() method found: " + StringUtils.join(docTypes, ", "));
279                 
280                 return docTypes;
281         }
282         
283         private void reindexDocuments(String docType) throws Exception {
284                 // Check for a stop file before reindexing the docType.
285                 
286                 if (batchStopFileExists() || docTypeStopFileExists()) {
287                         throw new StoppedException();
288                 }
289                 
290                 log.debug("reindexing docType " + docType);
291                 
292                 NuxeoBasedResource resource = resourcesByDocType.get(docType);
293                 
294                 if (resource == null) {
295                         log.warn("No service resource found for docType " + docType);
296                 }
297                 
298                 boolean isAuthorityItem = false;
299                 
300                 if (resource instanceof AuthorityResource) {
301                         UriTemplateRegistryKey key = new UriTemplateRegistryKey(getTenantId(), docType);
302                         StoredValuesUriTemplate uriTemplate = resource.getUriRegistryEntries().get(key);
303                         
304                         log.debug("uriTemplateType=" + uriTemplate.getUriTemplateType());
305                         
306                         if (uriTemplate.getUriTemplateType() == UriTemplateFactory.ITEM) {
307                                 isAuthorityItem = true;
308                         }
309                 }
310         
311                 int pageSize = batchSize;
312                 
313                 // The supplied start and end batch numbers start with 1, but the page number starts with 0.
314                 int startPage = (startBatch > 0) ? startBatch - 1 : 0; 
315                 int endPage = (endBatch > 0) ? endBatch - 1 : Integer.MAX_VALUE;
316
317                 if (isAuthorityItem) {
318                         List<String> vocabularyCsids = getVocabularyCsids((AuthorityResource<?, ?>) resource);
319
320                         for (String vocabularyCsid : vocabularyCsids) {
321                                 int pageNum = startPage;
322                                 List<String> csids = null;
323
324                                 log.debug("Reindexing vocabulary of " + docType + " with csid " + vocabularyCsid);
325                                 
326                                 do {
327                                         // Check for a stop file before reindexing the batch.
328                                         
329                                         if (batchStopFileExists()) {
330                                                 throw new StoppedException();
331                                         }
332
333                                         csids = findAllAuthorityItems((AuthorityResource<?, ?>) resource, vocabularyCsid, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name");
334                                         
335                                         if (csids.size() > 0) {
336                                                 log.debug("reindexing vocabulary of " + docType +" with csid " + vocabularyCsid + ", batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0));
337                                                 
338                                                 // Pause for the configured amount of time.
339                                                 
340                                                 if (batchPause > 0) {
341                                                         log.trace("pausing " + batchPause + " ms");
342                                                         
343                                                         Thread.sleep(batchPause);
344                                                 }
345                                                 
346                                                 reindexDocuments(docType, csids);
347                                         }
348                                         
349                                         pageNum++;
350                                 }
351                                 while(csids.size() == pageSize && pageNum <= endPage);
352                         }
353                 } else {
354                         int pageNum = startPage;
355                         List<String> csids = null;
356
357                         do {
358                                 // Check for a stop file before reindexing the batch.
359                                 
360                                 if (batchStopFileExists()) {
361                                         throw new StoppedException();
362                                 }
363                                 
364                                 csids = findAll(resource, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name");
365                                 
366                                 if (csids.size() > 0) {
367                                         log.debug("reindexing " + docType +" batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0));
368                                         
369                                         // Pause for the configured amount of time.
370                                         
371                                         if (batchPause > 0) {
372                                                 log.trace("pausing " + batchPause + " ms");
373                                                 
374                                                 Thread.sleep(batchPause);
375                                         }
376
377                                         reindexDocuments(docType, csids);
378                                 }
379                                 
380                                 pageNum++;
381                         }
382                         while(csids.size() == pageSize && pageNum <= endPage);
383                 }
384         }
385         
386         private void reindexDocument(String docType, String csid) throws Exception {
387                 reindexDocuments(docType, Arrays.asList(csid));
388         }
389         
390         private void reindexDocuments(String docType, List<String> csids) throws Exception {
391                 // Convert the csids to structs of nuxeo id and type, as expected
392                 // by doBatch.
393
394                 if (csids == null || csids.size() == 0) {
395                         return;
396                 }
397                 
398                 getLowLevelSession();           
399                 List<ReindexInfo> infos = new ArrayList<ReindexInfo>();
400
401                 String query = "SELECT ecm:uuid, ecm:primaryType FROM Document " +
402                                            "WHERE ecm:name IN (" + StringUtils.join(quoteList(csids), ',') + ") " +
403                                            "AND ecm:primaryType LIKE '" + docType + "%' " +
404                                            "AND ecm:isCheckedInVersion = 0 AND ecm:isProxy = 0";
405                 IterableQueryResult result = session.queryAndFetch(query, NXQL.NXQL, QueryFilter.EMPTY);
406                 
407                 try {
408                         for (Map<String, Serializable> map : result) {
409                                 String id = (String) map.get(NXQL.ECM_UUID);
410                                 String type = (String) map.get(NXQL.ECM_PRIMARYTYPE);
411                                 infos.add(new ReindexInfo(id, type));
412                         }
413                 } finally {
414                         result.close();
415                 }
416                 
417                 if (csids.size() != infos.size()) {
418                         log.warn("didn't find info for all the supplied csids: expected " + csids.size() + ", found " + infos.size());
419                 }
420                 
421                 if (log.isTraceEnabled()) {
422                         for (ReindexInfo info : infos) {
423                                 log.trace(info.type + " " + info.id);
424                         }
425                 }
426                 
427                 doBatch(infos);
428                 
429                 numAffected += infos.size();
430         }
431         
432         private List<String> quoteList(List<String> values) {
433                 List<String> quoted = new ArrayList<String>(values.size());
434                 
435                 for (String value : values) {
436                         quoted.add("'" + value + "'");
437                 }
438                 
439                 return quoted;
440         }
441         
442         private boolean batchStopFileExists() {
443                 return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + BATCH_STOP_FILE).isFile());
444         }
445
446         private boolean docTypeStopFileExists() {
447                 return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + DOCTYPE_STOP_FILE).isFile());
448         }
449         
450         private static class StoppedException extends Exception {
451                 private static final long serialVersionUID = 8813189331855935939L;
452
453                 public StoppedException() {
454                         
455                 }
456         }
457         
458         /*
459          * The code below this comment is copied from the nuxeo-reindex-fulltext
460          * module. The original copyright is below.
461          */
462         
463         /*
464          * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors.
465          *
466          * All rights reserved. This program and the accompanying materials
467          * are made available under the terms of the GNU Lesser General Public License
468          * (LGPL) version 2.1 which accompanies this distribution, and is available at
469          * http://www.gnu.org/licenses/lgpl.html
470          *
471          * This library is distributed in the hope that it will be useful,
472          * but WITHOUT ANY WARRANTY; without even the implied warranty of
473          * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
474          * Lesser General Public License for more details.
475          *
476          * Contributors:
477          *       Florent Guillaume
478          */
479                 
480         /**
481          * Launches a fulltext reindexing of the database.
482          *
483          * @param batchSize the batch size, defaults to 100
484          * @param batch if present, the batch number to process instead of all
485          *                      batches; starts at 1
486          * @return when done, ok + the total number of docs
487          */
488         public String reindexFulltext(int batchSize, int batch, String query) throws Exception {
489         Principal principal = coreSession.getPrincipal();
490         if (!(principal instanceof NuxeoPrincipal)) {
491             return "unauthorized";
492         }
493         NuxeoPrincipal nuxeoPrincipal = (NuxeoPrincipal) principal;
494         if (!nuxeoPrincipal.isAdministrator()) {
495             return "unauthorized";
496         }
497
498         log("Reindexing starting");
499         if (batchSize <= 0) {
500             batchSize = DEFAULT_BATCH_SIZE;
501         }
502         
503         //
504         // A default query that gets ALL the documents
505         //
506         if (query == null) {
507                 query = "SELECT ecm:uuid, ecm:primaryType FROM Document"
508                         + " WHERE ecm:isProxy = 0"
509                         + " AND ecm:currentLifeCycleState <> 'deleted'"
510                         + " ORDER BY ecm:uuid";
511         }
512
513         List<ReindexInfo> infos = getInfos(query);
514         int size = infos.size();
515         int numBatches = (size + batchSize - 1) / batchSize;
516         if (batch < 0 || batch > numBatches) {
517             batch = 0; // all
518         }
519         batch--;
520
521         log("Reindexing of %s documents, batch size: %s, number of batches: %s",
522                 size, batchSize, numBatches);
523         if (batch >= 0) {
524             log("Reindexing limited to batch: %s", batch + 1);
525         }
526
527         //
528         // Commit and close the transaction that was started by our standard request lifecycle.
529         //
530         boolean tx = TransactionHelper.isTransactionActive();
531         if (tx) {
532             TransactionHelper.commitOrRollbackTransaction();
533         }
534
535         int n = 0;
536         int errs = 0;
537         for (int i = 0; i < numBatches; i++) {
538             if (batch >= 0 && batch != i) {
539                 continue;
540             }
541             int pos = i * batchSize;
542             int end = pos + batchSize;
543             if (end > size) {
544                 end = size;
545             }
546             List<ReindexInfo> batchInfos = infos.subList(pos, end);
547             log("Reindexing batch %s/%s, first id: %s", i + 1, numBatches,
548                     batchInfos.get(0).id);
549             try {
550                 doBatch(batchInfos);
551             } catch (NuxeoException e) {
552                 log.error("Error processing batch " + i + 1, e);
553                 errs++;
554             }
555             n += end - pos;
556         }
557
558         log("Reindexing done");
559         //
560         // Start a new transaction so our standard request lifecycle can complete.
561         //
562         if (tx) {
563             TransactionHelper.startTransaction();
564         }
565         return "done: " + n + " total: " + size + " batch_errors: " + errs;
566     }
567
568         protected void log(String format, Object... args) {
569                 log.warn(String.format(format, args));
570         }
571
572         /**
573          * This has to be called once the transaction has been started.
574          */
575         protected void getLowLevelSession() throws Exception {
576         try {
577             SQLSession s = (SQLSession) ((AbstractSession) coreSession).getSession();
578             Field f2 = SQLSession.class.getDeclaredField("session");
579             f2.setAccessible(true);
580             session = (Session) f2.get(s);
581             fulltextConfiguration = session.getModel().getFulltextConfiguration();
582         } catch (ReflectiveOperationException e) {
583             throw new NuxeoException(e);
584         }
585     }
586
587         protected List<ReindexInfo> getInfos(String query) throws Exception {
588         getLowLevelSession();
589         List<ReindexInfo> infos = new ArrayList<ReindexInfo>();
590         IterableQueryResult it = session.queryAndFetch(query, NXQL.NXQL,
591                 QueryFilter.EMPTY);
592         try {
593             for (Map<String, Serializable> map : it) {
594                 Serializable id = map.get(NXQL.ECM_UUID);
595                 String type = (String) map.get(NXQL.ECM_PRIMARYTYPE);
596                 infos.add(new ReindexInfo(id, type));
597             }
598         } finally {
599             it.close();
600         }
601         return infos;
602     }
603
604         protected void doBatch(List<ReindexInfo> infos) throws Exception {
605         boolean tx;
606         boolean ok;
607
608         // transaction for the sync batch
609         tx = TransactionHelper.startTransaction();
610
611         getLowLevelSession(); // for fulltextInfo
612         List<Serializable> ids = new ArrayList<Serializable>(infos.size());
613         Set<String> asyncIds = new HashSet<String>();
614         Model model = session.getModel();
615         for (ReindexInfo info : infos) {
616             ids.add(info.id);
617             if (fulltextConfiguration.isFulltextIndexable(info.type)) {
618                 asyncIds.add(model.idToString(info.id));
619             }
620         }
621         ok = false;
622         try {
623             runSyncBatch(ids, asyncIds);
624             ok = true;
625         } finally {
626             if (tx) {
627                 if (!ok) {
628                     TransactionHelper.setTransactionRollbackOnly();
629                     log.error("Rolling back sync");
630                 }
631                 TransactionHelper.commitOrRollbackTransaction();
632             }
633         }
634
635         runAsyncBatch(asyncIds);
636
637         // wait for async completion after transaction commit
638         Framework.getLocalService(EventService.class).waitForAsyncCompletion();
639     }
640
641         /*
642          * Do this at the low-level session level because we may have to modify
643          * things like versions which aren't usually modifiable, and it's also good
644          * to bypass all listeners.
645          */
646         protected void runSyncBatch(List<Serializable> ids, Set<String> asyncIds) throws Exception {
647         getLowLevelSession();
648
649         session.getNodesByIds(ids); // batch fetch
650
651         Map<Serializable, String> titles = new HashMap<Serializable, String>();
652         for (Serializable id : ids) {
653             Node node = session.getNodeById(id);
654             if (asyncIds.contains(id)) {
655                 node.setSimpleProperty(Model.FULLTEXT_JOBID_PROP, id);
656             }
657             SimpleProperty prop;
658             try {
659                 prop = node.getSimpleProperty(DC_TITLE);
660             } catch (IllegalArgumentException e) {
661                 continue;
662             }
663             String title = (String) prop.getValue();
664             titles.put(id, title);
665             prop.setValue(title + " ");
666         }
667         session.save();
668
669         for (Serializable id : ids) {
670             Node node = session.getNodeById(id);
671             SimpleProperty prop;
672             try {
673                 prop = node.getSimpleProperty(DC_TITLE);
674             } catch (IllegalArgumentException e) {
675                 continue;
676             }
677             prop.setValue(titles.get(id));
678         }
679         session.save();
680     }
681
682         protected void runAsyncBatch(Set<String> asyncIds) {
683         if (asyncIds.isEmpty()) {
684             return;
685         }
686         String repositoryName = coreSession.getRepositoryName();
687         WorkManager workManager = Framework.getLocalService(WorkManager.class);
688         for (String id : asyncIds) {
689             Work work = new SQLFulltextExtractorWork(repositoryName, id);
690             // schedule immediately, we're outside a transaction
691             workManager.schedule(work, Scheduling.IF_NOT_SCHEDULED, false);
692         }
693     }
694 }