]> git.aero2k.de Git - tmp/jakarta-migration.git/blob
bd388c7f236704584a593caa583c0017d2d033b3
[tmp/jakarta-migration.git] /
1 /*
2  * This file contains code from Florent Guillame's nuxeo-reindex-fulltext module.
3  * 
4  */
5
6 package org.collectionspace.services.batch.nuxeo;
7
8 import java.io.File;
9 import java.io.Serializable;
10 import java.lang.reflect.Field;
11 import java.security.Principal;
12 import java.util.ArrayList;
13 import java.util.Arrays;
14 import java.util.Collections;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.LinkedHashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Set;
21
22 import org.apache.commons.lang.StringUtils;
23 import org.collectionspace.services.common.CollectionSpaceResource;
24 import org.collectionspace.services.common.NuxeoBasedResource;
25 import org.collectionspace.services.common.StoredValuesUriTemplate;
26 import org.collectionspace.services.common.UriTemplateFactory;
27 import org.collectionspace.services.common.UriTemplateRegistryKey;
28 import org.collectionspace.services.common.invocable.InvocationContext.ListCSIDs;
29 import org.collectionspace.services.common.invocable.InvocationContext.Params.Param;
30 import org.collectionspace.services.common.invocable.InvocationResults;
31 import org.collectionspace.services.common.vocabulary.AuthorityResource;
32 import org.collectionspace.services.nuxeo.util.ReindexFulltextRoot.ReindexInfo;
33 import org.nuxeo.ecm.core.api.AbstractSession;
34 import org.nuxeo.ecm.core.api.CoreSession;
35 import org.nuxeo.ecm.core.api.IterableQueryResult;
36 import org.nuxeo.ecm.core.api.NuxeoException;
37 import org.nuxeo.ecm.core.api.NuxeoPrincipal;
38 import org.nuxeo.ecm.core.event.EventService;
39 import org.nuxeo.ecm.core.query.QueryFilter;
40 import org.nuxeo.ecm.core.query.sql.NXQL;
41 import org.nuxeo.ecm.core.storage.FulltextConfiguration;
42 import org.nuxeo.ecm.core.storage.sql.Model;
43 import org.nuxeo.ecm.core.storage.sql.Node;
44 import org.nuxeo.ecm.core.storage.sql.Session;
45 import org.nuxeo.ecm.core.storage.sql.SimpleProperty;
46 import org.nuxeo.ecm.core.storage.sql.coremodel.SQLFulltextExtractorWork;
47 import org.nuxeo.ecm.core.storage.sql.coremodel.SQLSession;
48 import org.nuxeo.ecm.core.work.api.Work;
49 import org.nuxeo.ecm.core.work.api.WorkManager;
50 import org.nuxeo.ecm.core.work.api.WorkManager.Scheduling;
51 import org.nuxeo.runtime.api.Framework;
52 import org.nuxeo.runtime.transaction.TransactionHelper;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 public class ReindexFullTextBatchJob extends AbstractBatchJob {
57         final Logger log = LoggerFactory.getLogger(ReindexFullTextBatchJob.class);
58
59         public static final String DC_TITLE = "dc:title";
60         public static final int DEFAULT_BATCH_SIZE = 1000;
61         public static final int DEFAULT_START_BATCH = 0;
62         public static final int DEFAULT_END_BATCH = 0;
63         public static final int DEFAULT_BATCH_PAUSE = 0;
64         public static final String BATCH_STOP_FILE = "stopBatch";
65         public static final String DOCTYPE_STOP_FILE = "stopDocType";
66         
67         private int batchSize = DEFAULT_BATCH_SIZE;
68         private int batchPause = DEFAULT_BATCH_PAUSE;
69         private int startBatch = DEFAULT_START_BATCH;
70         private int endBatch = DEFAULT_END_BATCH;
71         private int numAffected = 0;
72         
73         private String stopFileDirectory;
74
75         private CoreSession coreSession;
76         private Session session = null;
77     protected FulltextConfiguration fulltextConfiguration;
78         
79         private Map<String, NuxeoBasedResource> resourcesByDocType;
80
81         public ReindexFullTextBatchJob() {
82                 setSupportedInvocationModes(Arrays.asList(INVOCATION_MODE_NO_CONTEXT, INVOCATION_MODE_SINGLE, INVOCATION_MODE_LIST));
83                 
84                 stopFileDirectory = System.getProperty("java.io.tmpdir") + File.separator + ReindexFullTextBatchJob.class.getName();
85                 
86                 log.debug("stop file directory is " + stopFileDirectory);
87         }
88         
89         @Override
90         public void run() {
91                 setCompletionStatus(STATUS_MIN_PROGRESS);
92                 
93                 numAffected = 0;
94                 
95                 // This is needed so that resource calls (which start transactions)
96                 // will work. Otherwise, a javax.transaction.NotSupportedException 
97                 // ("Nested transactions are not supported") is thrown.
98                 
99                 boolean isTransactionActive = TransactionHelper.isTransactionActive();
100                 
101                 if (isTransactionActive) {
102                         TransactionHelper.commitOrRollbackTransaction();
103                 }
104                 
105                 try {
106                         coreSession = getRepoSession().getCoreSession();
107
108                         if (requestIsForInvocationModeSingle()) {
109                                 String csid = getInvocationContext().getSingleCSID();
110                                 
111                                 if (csid == null) {
112                                         throw new Exception("No singleCSID was supplied in invocation context.");
113                                 }
114
115                                 String docType = getInvocationContext().getDocType();
116                                 
117                                 if (StringUtils.isEmpty(docType)) {
118                                         throw new Exception("No docType was supplied in invocation context.");
119                                 }
120
121                                 log.debug("Reindexing " + docType + " record with csid: " + csid);
122                                 
123                                 reindexDocument(docType, csid);
124                         }
125                         else if (requestIsForInvocationModeList()) {
126                                 ListCSIDs list = getInvocationContext().getListCSIDs();
127                                 List<String> csids = list.getCsid();
128                                 
129                                 if (csids == null || csids.size() == 0) {
130                                         throw new Exception("no listCSIDs were supplied");
131                                 }
132
133                                 String docType = getInvocationContext().getDocType();
134
135                                 if (StringUtils.isEmpty(docType)) {
136                                         throw new Exception("No docType was supplied in invocation context.");
137                                 }
138
139                                 log.debug("Reindexing " + csids.size() + " " + docType + " records with csids: " + csids.get(0) + ", ...");
140                                 
141                                 if (log.isTraceEnabled()) {
142                                         log.trace(StringUtils.join(csids, ", "));
143                                 }
144                                 
145                                 reindexDocuments(docType, csids);
146                         }
147                         else if (requestIsForInvocationModeNoContext()) {
148                                 Set<String> docTypes = new LinkedHashSet<String>();
149                                 String docType;
150                                 
151                                 docType = getInvocationContext().getDocType();
152
153                                 if (StringUtils.isNotEmpty(docType)) {
154                                         docTypes.add(docType);
155                                 }
156                                 
157                                 // Read batch size, start and end batches, pause, and additional doctypes from params.
158                                 
159                                 for (Param param : this.getParams()) {
160                                         if (param.getKey().equals("batchSize")) {
161                                                 batchSize = Integer.parseInt(param.getValue());
162                                         }
163                                         else if (param.getKey().equals("startBatch")) {
164                                                 startBatch = Integer.parseInt(param.getValue());
165                                         }
166                                         else if (param.getKey().equals("endBatch")) {
167                                                 endBatch = Integer.parseInt(param.getValue());
168                                         }
169                                         else if (param.getKey().equals("batchPause")) {
170                                                 batchPause = Integer.parseInt(param.getValue());
171                                         }
172                                         else if (param.getKey().equals("docType")) {
173                                                 docType = param.getValue();
174                                                 
175                                                 if (StringUtils.isNotEmpty(docType)) {
176                                                         docTypes.add(docType);
177                                                 }
178                                         }
179                                 }
180                                 
181                                 initResourceMap();
182                                 reindexDocuments(docTypes);
183                         }
184                         
185                         log.debug("reindexing complete");
186                         
187                         InvocationResults results = new InvocationResults();
188                         results.setNumAffected(numAffected);
189                         results.setUserNote("reindexed " + numAffected + " records");
190                         
191                         setResults(results);
192                         setCompletionStatus(STATUS_COMPLETE);
193                 }
194                 catch(StoppedException e) {
195                         log.debug("reindexing terminated by stop file");
196                         
197                         InvocationResults results = new InvocationResults();
198                         results.setNumAffected(numAffected);
199                         results.setUserNote("reindexing terminated by stop file");
200                         
201                         setResults(results);
202                         setCompletionStatus(STATUS_COMPLETE);
203                 }
204                 catch(Exception e) {
205                         setErrorResult(e.getMessage());
206                 }       
207                 finally {               
208                         // This is needed so that when the session is released after this
209                         // batch job exits (in BatchDocumentModelHandler), there isn't an exception.
210                         // Otherwise, a "Session invoked in a container without a transaction active"
211                         // error is thrown from RepositoryJavaClientImpl.releaseRepositorySession.
212
213                         if (isTransactionActive) {
214                                 TransactionHelper.startTransaction();
215                         }
216                 }
217         }
218         
219         private void initResourceMap() {
220                 resourcesByDocType = new HashMap<String, NuxeoBasedResource>();
221
222                 for (CollectionSpaceResource<?, ?> resource : getResourceMap().values()) {
223                         Map<UriTemplateRegistryKey, StoredValuesUriTemplate> entries = resource.getUriRegistryEntries();
224                         
225                         for (UriTemplateRegistryKey key : entries.keySet()) {
226                                 String docType = key.getDocType();
227                                 String tenantId = key.getTenantId();
228                                 
229                                 if (getTenantId().equals(tenantId)) {
230                                         if (resourcesByDocType.containsKey(docType)) {
231                                                 log.warn("multiple resources found for docType " + docType);
232                                                 
233                                                 NuxeoBasedResource currentResource = resourcesByDocType.get(docType);
234                                                 NuxeoBasedResource candidateResource = (NuxeoBasedResource) resource;
235                                                 
236                                                 // Favor the resource that isn't an AuthorityResource. This
237                                                 // is really just to deal with Contacts, which are handled
238                                                 // by ContactResource, PersonAuthorityResource, and
239                                                 // OrgAuthorityResource. We want to use ContactResource.
240                                                 
241                                                 if (!(candidateResource instanceof AuthorityResource) && (currentResource instanceof AuthorityResource)) {
242                                                         resourcesByDocType.put(docType, candidateResource);
243                                                 }
244                                                 
245                                                 log.warn("using " + resourcesByDocType.get(docType));
246                                         }
247                                         else {
248                                                 resourcesByDocType.put(docType, (NuxeoBasedResource) resource);
249                                         }
250                                 }                               
251                         }
252                 }
253         }
254         
255         private void reindexDocuments(Set<String> docTypes) throws Exception {
256                 if (docTypes == null) {
257                         docTypes = new LinkedHashSet<String>();
258                 }
259
260                 // If no types are specified, do them all.
261                 
262                 if (docTypes.size() == 0) {
263                         docTypes.addAll(getAllDocTypes());
264                 }
265                                 
266                 for (String docType : docTypes) {
267                         reindexDocuments(docType);
268                 }
269         }
270         
271         private List<String> getAllDocTypes() {
272                 List<String> docTypes = new ArrayList<String>(resourcesByDocType.keySet());
273                 Collections.sort(docTypes);
274
275                 log.debug("Call to getAllDocTypes() method found: " + StringUtils.join(docTypes, ", "));
276                 
277                 return docTypes;
278         }
279         
280         private void reindexDocuments(String docType) throws Exception {
281                 // Check for a stop file before reindexing the docType.
282                 
283                 if (batchStopFileExists() || docTypeStopFileExists()) {
284                         throw new StoppedException();
285                 }
286                 
287                 log.debug("reindexing docType " + docType);
288                 
289                 NuxeoBasedResource resource = resourcesByDocType.get(docType);
290                 
291                 if (resource == null) {
292                         log.warn("No service resource found for docType " + docType);
293                 }
294                 
295                 boolean isAuthorityItem = false;
296                 
297                 if (resource instanceof AuthorityResource) {
298                         UriTemplateRegistryKey key = new UriTemplateRegistryKey(getTenantId(), docType);
299                         StoredValuesUriTemplate uriTemplate = resource.getUriRegistryEntries().get(key);
300                         
301                         log.debug("uriTemplateType=" + uriTemplate.getUriTemplateType());
302                         
303                         if (uriTemplate.getUriTemplateType() == UriTemplateFactory.ITEM) {
304                                 isAuthorityItem = true;
305                         }
306                 }
307         
308                 int pageSize = batchSize;
309                 
310                 // The supplied start and end batch numbers start with 1, but the page number starts with 0.
311                 int startPage = (startBatch > 0) ? startBatch - 1 : 0; 
312                 int endPage = (endBatch > 0) ? endBatch - 1 : Integer.MAX_VALUE;
313
314                 if (isAuthorityItem) {
315                         List<String> vocabularyCsids = getVocabularyCsids((AuthorityResource<?, ?>) resource);
316
317                         for (String vocabularyCsid : vocabularyCsids) {
318                                 int pageNum = startPage;
319                                 List<String> csids = null;
320
321                                 log.debug("Reindexing vocabulary of " + docType + " with csid " + vocabularyCsid);
322                                 
323                                 do {
324                                         // Check for a stop file before reindexing the batch.
325                                         
326                                         if (batchStopFileExists()) {
327                                                 throw new StoppedException();
328                                         }
329
330                                         csids = findAllAuthorityItems((AuthorityResource<?, ?>) resource, vocabularyCsid, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name");
331                                         
332                                         if (csids.size() > 0) {
333                                                 log.debug("reindexing vocabulary of " + docType +" with csid " + vocabularyCsid + ", batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0));
334                                                 
335                                                 // Pause for the configured amount of time.
336                                                 
337                                                 if (batchPause > 0) {
338                                                         log.trace("pausing " + batchPause + " ms");
339                                                         
340                                                         Thread.sleep(batchPause);
341                                                 }
342                                                 
343                                                 reindexDocuments(docType, csids);
344                                         }
345                                         
346                                         pageNum++;
347                                 }
348                                 while(csids.size() == pageSize && pageNum <= endPage);
349                         }
350                 } else {
351                         int pageNum = startPage;
352                         List<String> csids = null;
353
354                         do {
355                                 // Check for a stop file before reindexing the batch.
356                                 
357                                 if (batchStopFileExists()) {
358                                         throw new StoppedException();
359                                 }
360                                 
361                                 csids = findAll(resource, pageSize, pageNum, "collectionspace_core:createdAt, ecm:name");
362                                 
363                                 if (csids.size() > 0) {
364                                         log.debug("reindexing " + docType +" batch " + (pageNum + 1) + ": " + csids.size() + " records starting with " + csids.get(0));
365                                         
366                                         // Pause for the configured amount of time.
367                                         
368                                         if (batchPause > 0) {
369                                                 log.trace("pausing " + batchPause + " ms");
370                                                 
371                                                 Thread.sleep(batchPause);
372                                         }
373
374                                         reindexDocuments(docType, csids);
375                                 }
376                                 
377                                 pageNum++;
378                         }
379                         while(csids.size() == pageSize && pageNum <= endPage);
380                 }
381         }
382         
383         private void reindexDocument(String docType, String csid) throws Exception {
384                 reindexDocuments(docType, Arrays.asList(csid));
385         }
386         
387         private void reindexDocuments(String docType, List<String> csids) throws Exception {
388                 // Convert the csids to structs of nuxeo id and type, as expected
389                 // by doBatch.
390
391                 if (csids == null || csids.size() == 0) {
392                         return;
393                 }
394                 
395                 getLowLevelSession();           
396                 List<ReindexInfo> infos = new ArrayList<ReindexInfo>();
397
398                 String query = "SELECT ecm:uuid, ecm:primaryType FROM Document " +
399                                            "WHERE ecm:name IN (" + StringUtils.join(quoteList(csids), ',') + ") " +
400                                            "AND ecm:primaryType LIKE '" + docType + "%' " +
401                                            "AND ecm:isCheckedInVersion = 0 AND ecm:isProxy = 0";
402                 IterableQueryResult result = session.queryAndFetch(query, NXQL.NXQL, QueryFilter.EMPTY);
403                 
404                 try {
405                         for (Map<String, Serializable> map : result) {
406                                 String id = (String) map.get(NXQL.ECM_UUID);
407                                 String type = (String) map.get(NXQL.ECM_PRIMARYTYPE);
408                                 infos.add(new ReindexInfo(id, type));
409                         }
410                 } finally {
411                         result.close();
412                 }
413                 
414                 if (csids.size() != infos.size()) {
415                         log.warn("didn't find info for all the supplied csids: expected " + csids.size() + ", found " + infos.size());
416                 }
417                 
418                 if (log.isTraceEnabled()) {
419                         for (ReindexInfo info : infos) {
420                                 log.trace(info.type + " " + info.id);
421                         }
422                 }
423                 
424                 doBatch(infos);
425                 
426                 numAffected += infos.size();
427         }
428         
429         private List<String> quoteList(List<String> values) {
430                 List<String> quoted = new ArrayList<String>(values.size());
431                 
432                 for (String value : values) {
433                         quoted.add("'" + value + "'");
434                 }
435                 
436                 return quoted;
437         }
438         
439         private boolean batchStopFileExists() {
440                 return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + BATCH_STOP_FILE).isFile());
441         }
442
443         private boolean docTypeStopFileExists() {
444                 return (stopFileDirectory != null && new File(stopFileDirectory + File.separator + DOCTYPE_STOP_FILE).isFile());
445         }
446         
447         private static class StoppedException extends Exception {
448                 private static final long serialVersionUID = 8813189331855935939L;
449
450                 public StoppedException() {
451                         
452                 }
453         }
454         
455         /*
456          * The code below this comment is copied from the nuxeo-reindex-fulltext
457          * module. The original copyright is below.
458          */
459         
460         /*
461          * (C) Copyright 2012 Nuxeo SA (http://nuxeo.com/) and contributors.
462          *
463          * All rights reserved. This program and the accompanying materials
464          * are made available under the terms of the GNU Lesser General Public License
465          * (LGPL) version 2.1 which accompanies this distribution, and is available at
466          * http://www.gnu.org/licenses/lgpl.html
467          *
468          * This library is distributed in the hope that it will be useful,
469          * but WITHOUT ANY WARRANTY; without even the implied warranty of
470          * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
471          * Lesser General Public License for more details.
472          *
473          * Contributors:
474          *       Florent Guillaume
475          */
476                 
477         /**
478          * Launches a fulltext reindexing of the database.
479          *
480          * @param batchSize the batch size, defaults to 100
481          * @param batch if present, the batch number to process instead of all
482          *                      batches; starts at 1
483          * @return when done, ok + the total number of docs
484          */
485         public String reindexFulltext(int batchSize, int batch, String query) throws Exception {
486         Principal principal = coreSession.getPrincipal();
487         if (!(principal instanceof NuxeoPrincipal)) {
488             return "unauthorized";
489         }
490         NuxeoPrincipal nuxeoPrincipal = (NuxeoPrincipal) principal;
491         if (!nuxeoPrincipal.isAdministrator()) {
492             return "unauthorized";
493         }
494
495         log("Reindexing starting");
496         if (batchSize <= 0) {
497             batchSize = DEFAULT_BATCH_SIZE;
498         }
499         
500         //
501         // A default query that gets ALL the documents
502         //
503         if (query == null) {
504                 query = "SELECT ecm:uuid, ecm:primaryType FROM Document"
505                         + " WHERE ecm:isProxy = 0"
506                         + " AND ecm:currentLifeCycleState <> 'deleted'"
507                         + " ORDER BY ecm:uuid";
508         }
509
510         List<ReindexInfo> infos = getInfos(query);
511         int size = infos.size();
512         int numBatches = (size + batchSize - 1) / batchSize;
513         if (batch < 0 || batch > numBatches) {
514             batch = 0; // all
515         }
516         batch--;
517
518         log("Reindexing of %s documents, batch size: %s, number of batches: %s",
519                 size, batchSize, numBatches);
520         if (batch >= 0) {
521             log("Reindexing limited to batch: %s", batch + 1);
522         }
523
524         //
525         // Commit and close the transaction that was started by our standard request lifecycle.
526         //
527         boolean tx = TransactionHelper.isTransactionActive();
528         if (tx) {
529             TransactionHelper.commitOrRollbackTransaction();
530         }
531
532         int n = 0;
533         int errs = 0;
534         for (int i = 0; i < numBatches; i++) {
535             if (batch >= 0 && batch != i) {
536                 continue;
537             }
538             int pos = i * batchSize;
539             int end = pos + batchSize;
540             if (end > size) {
541                 end = size;
542             }
543             List<ReindexInfo> batchInfos = infos.subList(pos, end);
544             log("Reindexing batch %s/%s, first id: %s", i + 1, numBatches,
545                     batchInfos.get(0).id);
546             try {
547                 doBatch(batchInfos);
548             } catch (NuxeoException e) {
549                 log.error("Error processing batch " + i + 1, e);
550                 errs++;
551             }
552             n += end - pos;
553         }
554
555         log("Reindexing done");
556         //
557         // Start a new transaction so our standard request lifecycle can complete.
558         //
559         if (tx) {
560             TransactionHelper.startTransaction();
561         }
562         return "done: " + n + " total: " + size + " batch_errors: " + errs;
563     }
564
565         protected void log(String format, Object... args) {
566                 log.warn(String.format(format, args));
567         }
568
569         /**
570          * This has to be called once the transaction has been started.
571          */
572         protected void getLowLevelSession() throws Exception {
573         try {
574             SQLSession s = (SQLSession) ((AbstractSession) coreSession).getSession();
575             Field f2 = SQLSession.class.getDeclaredField("session");
576             f2.setAccessible(true);
577             session = (Session) f2.get(s);
578             fulltextConfiguration = session.getModel().getFulltextConfiguration();
579         } catch (ReflectiveOperationException e) {
580             throw new NuxeoException(e);
581         }
582     }
583
584         protected List<ReindexInfo> getInfos(String query) throws Exception {
585         getLowLevelSession();
586         List<ReindexInfo> infos = new ArrayList<ReindexInfo>();
587         IterableQueryResult it = session.queryAndFetch(query, NXQL.NXQL,
588                 QueryFilter.EMPTY);
589         try {
590             for (Map<String, Serializable> map : it) {
591                 Serializable id = map.get(NXQL.ECM_UUID);
592                 String type = (String) map.get(NXQL.ECM_PRIMARYTYPE);
593                 infos.add(new ReindexInfo(id, type));
594             }
595         } finally {
596             it.close();
597         }
598         return infos;
599     }
600
601         protected void doBatch(List<ReindexInfo> infos) throws Exception {
602         boolean tx;
603         boolean ok;
604
605         // transaction for the sync batch
606         tx = TransactionHelper.startTransaction();
607
608         getLowLevelSession(); // for fulltextInfo
609         List<Serializable> ids = new ArrayList<Serializable>(infos.size());
610         Set<String> asyncIds = new HashSet<String>();
611         Model model = session.getModel();
612         for (ReindexInfo info : infos) {
613             ids.add(info.id);
614             if (fulltextConfiguration.isFulltextIndexable(info.type)) {
615                 asyncIds.add(model.idToString(info.id));
616             }
617         }
618         ok = false;
619         try {
620             runSyncBatch(ids, asyncIds);
621             ok = true;
622         } finally {
623             if (tx) {
624                 if (!ok) {
625                     TransactionHelper.setTransactionRollbackOnly();
626                     log.error("Rolling back sync");
627                 }
628                 TransactionHelper.commitOrRollbackTransaction();
629             }
630         }
631
632         runAsyncBatch(asyncIds);
633
634         // wait for async completion after transaction commit
635         Framework.getLocalService(EventService.class).waitForAsyncCompletion();
636     }
637
638         /*
639          * Do this at the low-level session level because we may have to modify
640          * things like versions which aren't usually modifiable, and it's also good
641          * to bypass all listeners.
642          */
643         protected void runSyncBatch(List<Serializable> ids, Set<String> asyncIds) throws Exception {
644         getLowLevelSession();
645
646         session.getNodesByIds(ids); // batch fetch
647
648         Map<Serializable, String> titles = new HashMap<Serializable, String>();
649         for (Serializable id : ids) {
650             Node node = session.getNodeById(id);
651             if (asyncIds.contains(id)) {
652                 node.setSimpleProperty(Model.FULLTEXT_JOBID_PROP, id);
653             }
654             SimpleProperty prop;
655             try {
656                 prop = node.getSimpleProperty(DC_TITLE);
657             } catch (IllegalArgumentException e) {
658                 continue;
659             }
660             String title = (String) prop.getValue();
661             titles.put(id, title);
662             prop.setValue(title + " ");
663         }
664         session.save();
665
666         for (Serializable id : ids) {
667             Node node = session.getNodeById(id);
668             SimpleProperty prop;
669             try {
670                 prop = node.getSimpleProperty(DC_TITLE);
671             } catch (IllegalArgumentException e) {
672                 continue;
673             }
674             prop.setValue(titles.get(id));
675         }
676         session.save();
677     }
678
679         protected void runAsyncBatch(Set<String> asyncIds) {
680         if (asyncIds.isEmpty()) {
681             return;
682         }
683         String repositoryName = coreSession.getRepositoryName();
684         WorkManager workManager = Framework.getLocalService(WorkManager.class);
685         for (String id : asyncIds) {
686             Work work = new SQLFulltextExtractorWork(repositoryName, id);
687             // schedule immediately, we're outside a transaction
688             workManager.schedule(work, Scheduling.IF_NOT_SCHEDULED, false);
689         }
690     }
691 }