1 package org.collectionspace.services.batch.nuxeo;
3 import java.net.URISyntaxException;
4 import java.util.ArrayList;
5 import java.util.Arrays;
6 import java.util.Collections;
7 import java.util.HashMap;
8 import java.util.HashSet;
9 import java.util.Iterator;
10 import java.util.LinkedHashMap;
11 import java.util.List;
15 import org.collectionspace.services.client.PayloadOutputPart;
16 import org.collectionspace.services.client.PoxPayloadOut;
17 import org.collectionspace.services.client.RelationClient;
18 import org.collectionspace.services.client.workflow.WorkflowClient;
19 import org.collectionspace.services.common.NuxeoBasedResource;
20 import org.collectionspace.services.common.api.RefNameUtils;
21 import org.collectionspace.services.common.api.RefNameUtils.AuthorityTermInfo;
22 import org.collectionspace.services.common.authorityref.AuthorityRefDocList;
23 import org.collectionspace.services.common.invocable.InvocationContext.Params.Param;
24 import org.collectionspace.services.common.invocable.InvocationResults;
25 import org.collectionspace.services.common.relation.RelationResource;
26 import org.collectionspace.services.common.vocabulary.AuthorityResource;
27 import org.collectionspace.services.relation.RelationsCommonList;
29 import org.dom4j.Document;
30 import org.dom4j.DocumentException;
31 import org.dom4j.DocumentHelper;
32 import org.dom4j.Element;
33 import org.dom4j.Node;
35 import org.nuxeo.common.utils.StringUtils;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * A batch job that merges authority items. The single and list contexts are
43 * The merge target is a record into which one or more source records will be
44 * merged. A merge source is a record that will be merged into the target, as
45 * follows: Each term in a source record is added to the target as a non-
46 * preferred term, if that term does not already exist in the target. If a term
47 * in the source already exists in the target, each non-blank term field is
48 * copied to the target, if that field is empty in the target. If the field is
49 * non-empty in the target, and differs from the source field, a warning is
50 * emitted and no action is taken. If a source is successfully merged into the
51 * target, all references to the source are transferred to the target, and the
52 * source record is soft-deleted.
54 * The context (singleCSID or listCSIDs of the batch invocation payload
55 * specifies the source record(s).
57 * The following parameters are allowed:
59 * targetCSID: The csid of the target record. Only one target may be supplied.
63 public class MergeAuthorityItemsBatchJob extends AbstractBatchJob {
64 final Logger logger = LoggerFactory.getLogger(MergeAuthorityItemsBatchJob.class);
66 public MergeAuthorityItemsBatchJob() {
67 setSupportedInvocationModes(Arrays.asList(INVOCATION_MODE_SINGLE, INVOCATION_MODE_LIST));
72 setCompletionStatus(STATUS_MIN_PROGRESS);
75 String docType = null;
76 String targetCsid = null;
77 List<String> sourceCsids = new ArrayList<String>();
79 for (Param param : this.getParams()) {
80 String key = param.getKey();
82 // I don't want this batch job to appear in the UI, since it won't run successfully without parameters.
83 // That means it can't be registered with any docType. But if the invocation payload contains a docType,
84 // it will be checked against the null registered docType, and will fail. So docType should be passed as a
87 if (key.equals("docType")) {
88 docType = param.getValue();
90 else if (key.equals("targetCSID")) {
91 targetCsid = param.getValue();
93 else if (key.equals("sourceCSID")) {
94 sourceCsids.add(param.getValue());
98 if (docType == null || docType.equals("")) {
99 throw new Exception("a docType must be supplied");
102 if (targetCsid == null || targetCsid.equals("")) {
103 throw new Exception("a target csid parameter (targetCSID) must be supplied");
106 if (sourceCsids.size() == 0) {
107 throw new Exception("a source csid must be supplied");
110 InvocationResults results = merge(docType, targetCsid, sourceCsids);
113 setCompletionStatus(STATUS_COMPLETE);
115 catch (Exception e) {
116 setCompletionStatus(STATUS_ERROR);
117 setErrorInfo(new InvocationError(INT_ERROR_STATUS, e.getMessage()));
121 public InvocationResults merge(String docType, String targetCsid, String sourceCsid) throws URISyntaxException, DocumentException {
122 return merge(docType, targetCsid, Arrays.asList(sourceCsid));
125 public InvocationResults merge(String docType, String targetCsid, List<String> sourceCsids) throws URISyntaxException, DocumentException {
126 logger.debug("Merging docType=" + docType + " targetCsid=" + targetCsid + " sourceCsids=" + StringUtils.join(sourceCsids, ","));
128 String serviceName = getAuthorityServiceNameForDocType(docType);
130 PoxPayloadOut targetItemPayload = findAuthorityItemByCsid(serviceName, targetCsid);
131 List<PoxPayloadOut> sourceItemPayloads = new ArrayList<PoxPayloadOut>();
133 for (String sourceCsid : sourceCsids) {
134 sourceItemPayloads.add(findAuthorityItemByCsid(serviceName, sourceCsid));
137 return merge(docType, targetItemPayload, sourceItemPayloads);
140 private InvocationResults merge(String docType, PoxPayloadOut targetItemPayload, List<PoxPayloadOut> sourceItemPayloads) throws URISyntaxException, DocumentException {
142 List<String> userNotes = new ArrayList<String>();
144 Element targetTermGroupListElement = getTermGroupListElement(targetItemPayload);
145 Element mergedTermGroupListElement = targetTermGroupListElement.createCopy();
147 String targetCsid = getCsid(targetItemPayload);
148 String targetRefName = getRefName(targetItemPayload);
149 String inAuthority = getFieldValue(targetItemPayload, "inAuthority");
151 logger.debug("Merging term groups");
153 for (PoxPayloadOut sourceItemPayload : sourceItemPayloads) {
154 String sourceCsid = getCsid(sourceItemPayload);
155 Element sourceTermGroupListElement = getTermGroupListElement(sourceItemPayload);
157 logger.debug("Merging term groups from source " + sourceCsid + " into target " + targetCsid);
160 mergeTermGroupLists(mergedTermGroupListElement, sourceTermGroupListElement);
162 catch(RuntimeException e) {
163 throw new RuntimeException("Error merging source record " + sourceCsid + " into target record " + targetCsid + ": " + e.getMessage(), e);
167 logger.debug("Updating target: docType=" + docType + " inAuthority=" + inAuthority + " targetCsid=" + targetCsid);
169 updateAuthorityItem(docType, inAuthority, targetCsid, getUpdatePayload(targetTermGroupListElement, mergedTermGroupListElement));
171 userNotes.add("The target record with CSID " + targetCsid + " (" + targetRefName + ") was updated.");
174 String serviceName = getAuthorityServiceNameForDocType(docType);
176 logger.debug("Updating references");
178 for (PoxPayloadOut sourceItemPayload : sourceItemPayloads) {
179 String sourceCsid = getCsid(sourceItemPayload);
180 String sourceRefName = getRefName(sourceItemPayload);
182 InvocationResults results = updateReferences(serviceName, inAuthority, sourceCsid, sourceRefName, targetRefName);
184 userNotes.add(results.getUserNote());
185 numAffected += results.getNumAffected();
188 logger.debug("Deleting source items");
190 for (PoxPayloadOut sourceItemPayload : sourceItemPayloads) {
191 String sourceCsid = getCsid(sourceItemPayload);
192 String sourceRefName = getRefName(sourceItemPayload);
194 InvocationResults results = deleteAuthorityItem(docType, getFieldValue(sourceItemPayload, "inAuthority"), sourceCsid);
196 userNotes.add(results.getUserNote());
197 numAffected += results.getNumAffected();
200 InvocationResults results = new InvocationResults();
201 results.setNumAffected(numAffected);
202 results.setUserNote(StringUtils.join(userNotes, "\n"));
207 private InvocationResults updateReferences(String serviceName, String inAuthority, String sourceCsid, String sourceRefName, String targetRefName) throws URISyntaxException, DocumentException {
208 logger.debug("Updating references: serviceName=" + serviceName + " inAuthority=" + inAuthority + " sourceCsid=" + sourceCsid + " sourceRefName=" + sourceRefName + " targetRefName=" + targetRefName);
212 List<AuthorityRefDocList.AuthorityRefDocItem> items;
217 logger.debug("Looping with pageSize=" + pageSize);
222 // The pageNum/pageSize parameters don't work properly for refobj requests!
223 // It should be safe to repeatedly fetch page 0 for a large-ish page size,
224 // and update that page, until no references are left.
226 items = findReferencingFields(serviceName, inAuthority, sourceCsid, null, pageNum, pageSize);
227 Map<String, ReferencingRecord> referencingRecordsByCsid = new LinkedHashMap<String, ReferencingRecord>();
229 logger.debug("Loop " + loopCount + ": " + items.size() + " items found");
231 for (AuthorityRefDocList.AuthorityRefDocItem item : items) {
232 // If a record contains a reference to the record multiple times, multiple items are returned,
233 // but only the first has a non-null workflow state. A bug?
235 String itemCsid = item.getDocId();
236 ReferencingRecord record = referencingRecordsByCsid.get(itemCsid);
238 if (record == null) {
239 if (item.getWorkflowState() != null && !item.getWorkflowState().equals(WorkflowClient.WORKFLOWSTATE_DELETED)) {
240 record = new ReferencingRecord(item.getUri());
241 referencingRecordsByCsid.put(itemCsid, record);
245 if (record != null) {
246 String[] sourceFieldElements = item.getSourceField().split(":");
247 String partName = sourceFieldElements[0];
248 String fieldName = sourceFieldElements[1];
250 Map<String, Set<String>> fields = record.getFields();
251 Set<String> fieldsInPart = fields.get(partName);
253 if (fieldsInPart == null) {
254 fieldsInPart = new HashSet<String>();
255 fields.put(partName, fieldsInPart);
258 fieldsInPart.add(fieldName);
262 List<ReferencingRecord> referencingRecords = new ArrayList<ReferencingRecord>(referencingRecordsByCsid.values());
264 logger.debug("Loop " + loopCount + ": updating " + referencingRecords.size() + " records");
266 for (ReferencingRecord record : referencingRecords) {
267 InvocationResults results = updateReferencingRecord(record, sourceRefName, targetRefName);
268 numUpdated += results.getNumAffected();
271 while (items.size() > 0);
273 InvocationResults results = new InvocationResults();
274 results.setNumAffected(numUpdated);
275 results.setUserNote(numUpdated > 0 ?
276 numUpdated + " records that referenced the source record with CSID " + sourceCsid + " were updated." :
277 "No records referenced the source record with CSID " + sourceCsid + ".");
282 private InvocationResults updateReferencingRecord(ReferencingRecord record, String fromRefName, String toRefName) throws URISyntaxException, DocumentException {
283 String fromRefNameStem = RefNameUtils.stripAuthorityTermDisplayName(fromRefName);
284 // String toRefNameStem = RefNameUtils.stripAuthorityTermDisplayName(toRefName);
286 logger.debug("Updating references: record.uri=" + record.getUri() + " fromRefName=" + fromRefName + " toRefName=" + toRefName);
288 Map<String, Set<String>> fields = record.getFields();
290 PoxPayloadOut recordPayload = findByUri(record.getUri());
291 Document recordDocument = recordPayload.getDOMDocument();
292 Document newDocument = (Document) recordDocument.clone();
293 Element rootElement = newDocument.getRootElement();
295 for (Element partElement : (List<Element>) rootElement.elements()) {
296 String partName = partElement.getName();
298 if (fields.containsKey(partName)) {
299 for (String fieldName : fields.get(partName)) {
300 List<Node> nodes = partElement.selectNodes("descendant::" + fieldName);
302 for (Node node : nodes) {
303 String text = node.getText();
304 String refNameStem = null;
307 refNameStem = RefNameUtils.stripAuthorityTermDisplayName(text);
309 catch(IllegalArgumentException e) {}
311 if (refNameStem != null && refNameStem.equals(fromRefNameStem)) {
312 AuthorityTermInfo termInfo = RefNameUtils.parseAuthorityTermInfo(text);
313 // String newRefName = toRefNameStem + "'" + termInfo.displayName + "'";
314 String newRefName = toRefName;
316 node.setText(newRefName);
322 rootElement.remove(partElement);
326 String payload = newDocument.asXML();
328 return updateUri(record.getUri(), payload);
331 private InvocationResults updateUri(String uri, String payload) throws URISyntaxException {
332 String[] uriParts = uri.split("/");
334 if (uriParts.length == 3) {
335 String serviceName = uriParts[1];
336 String csid = uriParts[2];
338 NuxeoBasedResource resource = (NuxeoBasedResource) getResourceMap().get(serviceName);
340 resource.update(getResourceMap(), createUriInfo(), csid, payload);
342 else if (uriParts.length == 5) {
343 String serviceName = uriParts[1];
344 String vocabularyCsid = uriParts[2];
345 String items = uriParts[3];
346 String csid = uriParts[4];
348 if (items.equals("items")) {
349 AuthorityResource<?, ?> resource = (AuthorityResource<?, ?>) getResourceMap().get(serviceName);
351 resource.updateAuthorityItem(getResourceMap(), createUriInfo(), vocabularyCsid, csid, payload);
355 throw new IllegalArgumentException("Invalid uri " + uri);
358 logger.debug("Updated referencing record " + uri);
360 InvocationResults results = new InvocationResults();
361 results.setNumAffected(1);
362 results.setUserNote("Updated referencing record " + uri);
367 private void updateAuthorityItem(String docType, String inAuthority, String csid, String payload) throws URISyntaxException {
368 String serviceName = getAuthorityServiceNameForDocType(docType);
369 AuthorityResource<?, ?> resource = (AuthorityResource<?, ?>) getResourceMap().get(serviceName);
371 resource.updateAuthorityItem(getResourceMap(), createUriInfo(), inAuthority, csid, payload);
374 private InvocationResults deleteAuthorityItem(String docType, String inAuthority, String csid) throws URISyntaxException {
376 List<String> userNotes = new ArrayList<String>();
378 // If the item is the broader context of any items, warn and do nothing.
380 List<String> narrowerItemCsids = findNarrower(csid);
382 if (narrowerItemCsids.size() > 0) {
383 logger.debug("Item " + csid + " has narrower items -- not deleting");
385 userNotes.add("The source record with CSID " + csid + " was not deleted because it has narrower context items.");
388 // If the item has a broader context, delete the relation.
390 List<String> relationCsids = new ArrayList<String>();
392 for (RelationsCommonList.RelationListItem item : findRelated(csid, null, "hasBroader", null, null)) {
393 relationCsids.add(item.getCsid());
396 if (relationCsids.size() > 0) {
397 RelationResource relationResource = (RelationResource) getResourceMap().get(RelationClient.SERVICE_NAME);
399 for (String relationCsid : relationCsids) {
400 logger.debug("Deleting hasBroader relation " + relationCsid);
402 relationResource.delete(relationCsid);
404 userNotes.add("The broader relation with CSID " + relationCsid + " was deleted.");
409 String serviceName = getAuthorityServiceNameForDocType(docType);
410 AuthorityResource<?, ?> resource = (AuthorityResource<?, ?>) getResourceMap().get(serviceName);
412 logger.debug("Soft deleting: docType=" + docType + " inAuthority=" + inAuthority + " csid=" + csid);
414 resource.updateItemWorkflowWithTransition(null, inAuthority, csid, "delete");
416 userNotes.add("The source record with CSID " + csid + " was soft deleted.");
420 InvocationResults results = new InvocationResults();
421 results.setNumAffected(numAffected);
422 results.setUserNote(StringUtils.join(userNotes, "\n"));
428 * @param Returns a map of the term groups in term group list, keyed by display name.
429 * If multiple groups have the same display name, an exception is thrown.
430 * @return The term groups.
432 private Map<String, Element> getTermGroups(Element termGroupListElement) {
433 Map<String, Element> termGroups = new LinkedHashMap<String, Element>();
434 Iterator<Element> childIterator = termGroupListElement.elementIterator();
436 while (childIterator.hasNext()) {
437 Element termGroupElement = childIterator.next();
438 String displayName = getDisplayName(termGroupElement);
440 if (termGroups.containsKey(displayName)) {
441 // Two term groups in the same item have identical display names.
443 throw new RuntimeException("multiple terms have display name \"" + displayName + "\"");
446 termGroups.put(displayName, termGroupElement);
453 private String getDisplayName(Element termGroupElement) {
454 Node displayNameNode = termGroupElement.selectSingleNode("termDisplayName");
455 String displayName = (displayNameNode == null) ? "" : displayNameNode.getText();
460 private Element getTermGroupListElement(PoxPayloadOut itemPayload) {
461 Element termGroupListElement = null;
462 Element commonPartElement = findCommonPartElement(itemPayload);
464 if (commonPartElement != null) {
465 termGroupListElement = findTermGroupListElement(commonPartElement);
468 return termGroupListElement;
471 private Element findCommonPartElement(PoxPayloadOut itemPayload) {
472 Element commonPartElement = null;
474 for (PayloadOutputPart candidatePart : itemPayload.getParts()) {
475 Element candidatePartElement = candidatePart.asElement();
477 if (candidatePartElement.getName().endsWith("_common")) {
478 commonPartElement = candidatePartElement;
483 return commonPartElement;
486 private Element findTermGroupListElement(Element contextElement) {
487 Element termGroupListElement = null;
488 Iterator<Element> childIterator = contextElement.elementIterator();
490 while (childIterator.hasNext()) {
491 Element candidateElement = childIterator.next();
493 if (candidateElement.getName().endsWith("TermGroupList")) {
494 termGroupListElement = candidateElement;
499 return termGroupListElement;
502 private void mergeTermGroupLists(Element targetTermGroupListElement, Element sourceTermGroupListElement) {
503 Map<String, Element> sourceTermGroups;
506 sourceTermGroups = getTermGroups(sourceTermGroupListElement);
508 catch(RuntimeException e) {
509 throw new RuntimeException("a problem was found in the source record: " + e.getMessage(), e);
512 for (Element targetTermGroupElement : (List<Element>) targetTermGroupListElement.elements()) {
513 String displayName = getDisplayName(targetTermGroupElement);
515 if (sourceTermGroups.containsKey(displayName)) {
516 logger.debug("Merging in existing term \"" + displayName + "\"");
519 mergeTermGroups(targetTermGroupElement, sourceTermGroups.get(displayName));
521 catch(RuntimeException e) {
522 throw new RuntimeException("could not merge term groups with display name \"" + displayName + "\": " + e.getMessage(), e);
525 sourceTermGroups.remove(displayName);
529 for (Element sourceTermGroupElement : sourceTermGroups.values()) {
530 logger.debug("Adding new term \"" + getDisplayName(sourceTermGroupElement) + "\"");
532 targetTermGroupListElement.add(sourceTermGroupElement.createCopy());
536 private void mergeTermGroups(Element targetTermGroupElement, Element sourceTermGroupElement) {
537 // This function assumes there are no nested repeating groups.
539 for (Element sourceChildElement : (List<Element>) sourceTermGroupElement.elements()) {
540 String sourceValue = sourceChildElement.getText();
542 if (sourceValue == null) {
546 if (sourceValue.length() > 0) {
547 String name = sourceChildElement.getName();
548 Element targetChildElement = targetTermGroupElement.element(name);
550 if (targetChildElement == null) {
551 targetTermGroupElement.add(sourceChildElement.createCopy());
554 String targetValue = targetChildElement.getText();
556 if (targetValue == null) {
560 if (!targetValue.equals(sourceValue)) {
561 if (targetValue.length() > 0) {
562 throw new RuntimeException("merge conflict in field " + name + ": source value \"" + sourceValue + "\" differs from target value \"" + targetValue +"\"");
565 targetTermGroupElement.remove(targetChildElement);
566 targetTermGroupElement.add(sourceChildElement.createCopy());
573 private String getUpdatePayload(Element originalTermGroupListElement, Element updatedTermGroupListElement) {
574 List<Element> parents = new ArrayList<Element>();
576 for (Element e = originalTermGroupListElement; e != null; e = e.getParent()) {
580 Collections.reverse(parents);
582 // Remove the original termGroupList element
583 parents.remove(parents.size() - 1);
586 Element rootElement = parents.remove(0);
588 // Copy the root to a new document
589 Document document = DocumentHelper.createDocument(copyElement(rootElement));
590 Element current = document.getRootElement();
592 // Copy the remaining parents
593 for (Element parent : parents) {
594 Element parentCopy = copyElement(parent);
596 current.add(parentCopy);
597 current = parentCopy;
600 // Add the updated termGroupList element
602 current.add(updatedTermGroupListElement);
604 String payload = document.asXML();
609 private Element copyElement(Element element) {
610 Element copy = DocumentHelper.createElement(element.getQName());
611 copy.appendAttributes(element);
616 private class ReferencingRecord {
618 private Map<String, Set<String>> fields;
620 public ReferencingRecord(String uri) {
622 this.fields = new HashMap<String, Set<String>>();
625 public String getUri() {
629 public void setUri(String uri) {
633 public Map<String, Set<String>> getFields() {