1 package org.collectionspace.services.batch.nuxeo;
3 import java.net.URISyntaxException;
4 import java.util.ArrayList;
5 import java.util.Arrays;
6 import java.util.Collections;
7 import java.util.HashMap;
8 import java.util.HashSet;
9 import java.util.Iterator;
10 import java.util.LinkedHashMap;
11 import java.util.LinkedHashSet;
12 import java.util.List;
16 import org.apache.commons.lang.StringUtils;
18 import org.collectionspace.services.client.PayloadOutputPart;
19 import org.collectionspace.services.client.PoxPayloadOut;
20 import org.collectionspace.services.client.RelationClient;
21 import org.collectionspace.services.client.workflow.WorkflowClient;
22 import org.collectionspace.services.common.NuxeoBasedResource;
23 import org.collectionspace.services.common.api.RefNameUtils;
24 import org.collectionspace.services.common.api.RefNameUtils.AuthorityTermInfo;
25 import org.collectionspace.services.common.authorityref.AuthorityRefDocList;
26 import org.collectionspace.services.common.invocable.InvocationContext.Params.Param;
27 import org.collectionspace.services.common.invocable.InvocationResults;
28 import org.collectionspace.services.common.relation.RelationResource;
29 import org.collectionspace.services.common.vocabulary.AuthorityResource;
30 import org.collectionspace.services.relation.RelationsCommonList;
32 import org.dom4j.Document;
33 import org.dom4j.DocumentException;
34 import org.dom4j.DocumentHelper;
35 import org.dom4j.Element;
36 import org.dom4j.Node;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * A batch job that merges authority items. The single and list contexts are
45 * The merge target is a record into which one or more source records will be
46 * merged. A merge source is a record that will be merged into the target, as
47 * follows: Each term in a source record is added to the target as a non-
48 * preferred term, if that term does not already exist in the target. If a term
49 * in the source already exists in the target, each non-blank term field is
50 * copied to the target, if that field is empty in the target. If the field is
51 * non-empty in the target, and differs from the source field, a warning is
52 * emitted and no action is taken. If a source is successfully merged into the
53 * target, all references to the source are transferred to the target, and the
54 * source record is soft-deleted.
56 * The context (singleCSID or listCSIDs of the batch invocation payload
57 * specifies the source record(s).
59 * The following parameters are allowed:
61 * targetCSID: The csid of the target record. Only one target may be supplied.
65 public class MergeAuthorityItemsBatchJob extends AbstractBatchJob {
66 final Logger logger = LoggerFactory.getLogger(MergeAuthorityItemsBatchJob.class);
68 public MergeAuthorityItemsBatchJob() {
69 setSupportedInvocationModes(Arrays.asList(INVOCATION_MODE_SINGLE, INVOCATION_MODE_LIST));
74 setCompletionStatus(STATUS_MIN_PROGRESS);
78 Set<String> sourceCsids = new LinkedHashSet<String>();
79 String docType = this.getDocType();
81 if (this.requestIsForInvocationModeSingle()) {
82 String singleCsid = this.getSingleCsid();
84 if (singleCsid != null) {
85 sourceCsids.add(singleCsid);
87 } else if (this.requestIsForInvocationModeList()) {
88 sourceCsids.addAll(this.getListCsids());
91 for (Param param : this.getParams()) {
92 String key = param.getKey();
94 // I don't want this batch job to appear in the UI, since it won't run successfully without parameters.
95 // That means it can't be registered with any docType. But if the invocation payload contains a docType,
96 // it will be checked against the null registered docType, and will fail. So docType should be passed as a
99 if (key.equals("docType")) {
100 docType = param.getValue();
102 else if (key.equals("target")) {
103 target = param.getValue();
105 else if (key.equals("targetCSID")) {
106 target = param.getValue();
108 else if (key.equals("sourceCSID")) {
109 sourceCsids.add(param.getValue());
113 if (target == null || target.equals("")) {
114 throw new Exception("a target or targetCSID parameter must be supplied");
117 if (sourceCsids.size() == 0) {
118 throw new Exception("a source csid must be supplied");
121 InvocationResults results = merge(docType, target, sourceCsids);
124 setCompletionStatus(STATUS_COMPLETE);
126 catch (Exception e) {
127 setCompletionStatus(STATUS_ERROR);
128 setErrorInfo(new InvocationError(INT_ERROR_STATUS, e.getMessage()));
132 public InvocationResults merge(String docType, String target, String sourceCsid) throws URISyntaxException, DocumentException {
133 return merge(docType, target, new LinkedHashSet<String>(Arrays.asList(sourceCsid)));
136 public InvocationResults merge(String docType, String target, Set<String> sourceCsids) throws URISyntaxException, DocumentException {
137 logger.debug("Merging docType=" + docType + " target=" + target + " sourceCsids=" + StringUtils.join(sourceCsids, ","));
139 String serviceName = getAuthorityServiceNameForDocType(docType);
141 PoxPayloadOut targetItemPayload = RefNameUtils.isTermRefname(target)
142 ? findAuthorityItemByRefName(serviceName, target)
143 : findAuthorityItemByCsid(serviceName, target);
145 List<PoxPayloadOut> sourceItemPayloads = new ArrayList<PoxPayloadOut>();
147 for (String sourceCsid : sourceCsids) {
148 sourceItemPayloads.add(findAuthorityItemByCsid(serviceName, sourceCsid));
151 return merge(docType, targetItemPayload, sourceItemPayloads);
154 private InvocationResults merge(String docType, PoxPayloadOut targetItemPayload, List<PoxPayloadOut> sourceItemPayloads) throws URISyntaxException, DocumentException {
156 List<String> userNotes = new ArrayList<String>();
158 Element targetTermGroupListElement = getTermGroupListElement(targetItemPayload);
159 Element mergedTermGroupListElement = targetTermGroupListElement.createCopy();
161 String targetCsid = getCsid(targetItemPayload);
162 String targetRefName = getRefName(targetItemPayload);
163 String inAuthority = getFieldValue(targetItemPayload, "inAuthority");
165 logger.debug("Merging term groups");
167 for (PoxPayloadOut sourceItemPayload : sourceItemPayloads) {
168 String sourceCsid = getCsid(sourceItemPayload);
169 Element sourceTermGroupListElement = getTermGroupListElement(sourceItemPayload);
171 logger.debug("Merging term groups from source " + sourceCsid + " into target " + targetCsid);
174 mergeTermGroupLists(mergedTermGroupListElement, sourceTermGroupListElement);
176 catch(RuntimeException e) {
177 throw new RuntimeException("Error merging source record " + sourceCsid + " into target record " + targetCsid + ": " + e.getMessage(), e);
181 logger.debug("Updating target: docType=" + docType + " inAuthority=" + inAuthority + " targetCsid=" + targetCsid);
183 updateAuthorityItem(docType, inAuthority, targetCsid, getUpdatePayload(targetTermGroupListElement, mergedTermGroupListElement));
185 String targetDisplayName = RefNameUtils.getDisplayName(targetRefName);
187 userNotes.add("Updated the target record, " + targetDisplayName + ".");
190 String serviceName = getAuthorityServiceNameForDocType(docType);
192 logger.debug("Updating references");
194 for (PoxPayloadOut sourceItemPayload : sourceItemPayloads) {
195 String sourceCsid = getCsid(sourceItemPayload);
196 String sourceRefName = getRefName(sourceItemPayload);
198 InvocationResults results = updateReferences(serviceName, inAuthority, sourceCsid, sourceRefName, targetRefName);
200 userNotes.add(results.getUserNote());
201 numAffected += results.getNumAffected();
204 logger.debug("Deleting source items");
206 for (PoxPayloadOut sourceItemPayload : sourceItemPayloads) {
207 String sourceCsid = getCsid(sourceItemPayload);
208 String sourceRefName = getRefName(sourceItemPayload);
210 InvocationResults results = deleteAuthorityItem(docType, getFieldValue(sourceItemPayload, "inAuthority"), sourceCsid, sourceRefName);
212 userNotes.add(results.getUserNote());
213 numAffected += results.getNumAffected();
216 InvocationResults results = new InvocationResults();
217 results.setNumAffected(numAffected);
218 results.setUserNote(StringUtils.join(userNotes, "\n"));
223 private InvocationResults updateReferences(String serviceName, String inAuthority, String sourceCsid, String sourceRefName, String targetRefName) throws URISyntaxException, DocumentException {
224 logger.debug("Updating references: serviceName=" + serviceName + " inAuthority=" + inAuthority + " sourceCsid=" + sourceCsid + " sourceRefName=" + sourceRefName + " targetRefName=" + targetRefName);
226 String sourceDisplayName = RefNameUtils.getDisplayName(sourceRefName);
230 List<AuthorityRefDocList.AuthorityRefDocItem> items;
235 logger.debug("Looping with pageSize=" + pageSize);
240 // The pageNum/pageSize parameters don't work properly for refobj requests!
241 // It should be safe to repeatedly fetch page 0 for a large-ish page size,
242 // and update that page, until no references are left.
244 items = findReferencingFields(serviceName, inAuthority, sourceCsid, null, pageNum, pageSize);
245 Map<String, ReferencingRecord> referencingRecordsByCsid = new LinkedHashMap<String, ReferencingRecord>();
247 logger.debug("Loop " + loopCount + ": " + items.size() + " items found");
249 for (AuthorityRefDocList.AuthorityRefDocItem item : items) {
250 // If a record contains a reference to the record multiple times, multiple items are returned,
251 // but only the first has a non-null workflow state. A bug?
253 String itemCsid = item.getDocId();
254 ReferencingRecord record = referencingRecordsByCsid.get(itemCsid);
256 if (record == null) {
257 if (item.getWorkflowState() != null && !item.getWorkflowState().equals(WorkflowClient.WORKFLOWSTATE_DELETED)) {
258 record = new ReferencingRecord(item.getUri());
259 referencingRecordsByCsid.put(itemCsid, record);
263 if (record != null) {
264 String[] sourceFieldElements = item.getSourceField().split(":");
265 String partName = sourceFieldElements[0];
266 String fieldName = sourceFieldElements[1];
268 Map<String, Set<String>> fields = record.getFields();
269 Set<String> fieldsInPart = fields.get(partName);
271 if (fieldsInPart == null) {
272 fieldsInPart = new HashSet<String>();
273 fields.put(partName, fieldsInPart);
276 fieldsInPart.add(fieldName);
280 List<ReferencingRecord> referencingRecords = new ArrayList<ReferencingRecord>(referencingRecordsByCsid.values());
282 logger.debug("Loop " + loopCount + ": updating " + referencingRecords.size() + " records");
284 for (ReferencingRecord record : referencingRecords) {
285 InvocationResults results = updateReferencingRecord(record, sourceRefName, targetRefName);
286 numUpdated += results.getNumAffected();
289 while (items.size() > 0);
291 InvocationResults results = new InvocationResults();
292 results.setNumAffected(numUpdated);
294 if (numUpdated > 0) {
298 + (numUpdated == 1 ? " record " : " records ")
299 + "that referenced the source record, "
300 + sourceDisplayName + "."
303 results.setUserNote("No records referenced the source record, " + sourceDisplayName + ".");
309 private InvocationResults updateReferencingRecord(ReferencingRecord record, String fromRefName, String toRefName) throws URISyntaxException, DocumentException {
310 String fromRefNameStem = RefNameUtils.stripAuthorityTermDisplayName(fromRefName);
311 // String toRefNameStem = RefNameUtils.stripAuthorityTermDisplayName(toRefName);
313 logger.debug("Updating references: record.uri=" + record.getUri() + " fromRefName=" + fromRefName + " toRefName=" + toRefName);
315 Map<String, Set<String>> fields = record.getFields();
317 PoxPayloadOut recordPayload = findByUri(record.getUri());
318 Document recordDocument = recordPayload.getDOMDocument();
319 Document newDocument = (Document) recordDocument.clone();
320 Element rootElement = newDocument.getRootElement();
322 for (Element partElement : (List<Element>) rootElement.elements()) {
323 String partName = partElement.getName();
325 if (fields.containsKey(partName)) {
326 for (String fieldName : fields.get(partName)) {
327 List<Node> nodes = partElement.selectNodes("descendant::" + fieldName);
329 for (Node node : nodes) {
330 String text = node.getText();
331 String refNameStem = null;
334 refNameStem = RefNameUtils.stripAuthorityTermDisplayName(text);
336 catch(IllegalArgumentException e) {}
338 if (refNameStem != null && refNameStem.equals(fromRefNameStem)) {
339 AuthorityTermInfo termInfo = RefNameUtils.parseAuthorityTermInfo(text);
340 // String newRefName = toRefNameStem + "'" + termInfo.displayName + "'";
341 String newRefName = toRefName;
343 node.setText(newRefName);
349 rootElement.remove(partElement);
353 String payload = newDocument.asXML();
355 return updateUri(record.getUri(), payload);
358 private InvocationResults updateUri(String uri, String payload) throws URISyntaxException {
359 String[] uriParts = uri.split("/");
361 if (uriParts.length == 3) {
362 String serviceName = uriParts[1];
363 String csid = uriParts[2];
365 NuxeoBasedResource resource = (NuxeoBasedResource) getResourceMap().get(serviceName);
367 resource.update(getResourceMap(), createUriInfo(), csid, payload);
369 else if (uriParts.length == 5) {
370 String serviceName = uriParts[1];
371 String vocabularyCsid = uriParts[2];
372 String items = uriParts[3];
373 String csid = uriParts[4];
375 if (items.equals("items")) {
376 AuthorityResource<?, ?> resource = (AuthorityResource<?, ?>) getResourceMap().get(serviceName);
378 resource.updateAuthorityItem(getResourceMap(), createUriInfo(), vocabularyCsid, csid, payload);
382 throw new IllegalArgumentException("Invalid uri " + uri);
385 logger.debug("Updated referencing record " + uri);
387 InvocationResults results = new InvocationResults();
388 results.setNumAffected(1);
389 results.setUserNote("Updated referencing record " + uri);
394 private void updateAuthorityItem(String docType, String inAuthority, String csid, String payload) throws URISyntaxException {
395 String serviceName = getAuthorityServiceNameForDocType(docType);
396 AuthorityResource<?, ?> resource = (AuthorityResource<?, ?>) getResourceMap().get(serviceName);
398 resource.updateAuthorityItem(getResourceMap(), createUriInfo(), inAuthority, csid, payload);
401 private InvocationResults deleteAuthorityItem(String docType, String inAuthority, String csid, String refName) throws URISyntaxException {
403 List<String> userNotes = new ArrayList<String>();
404 String displayName = RefNameUtils.getDisplayName(refName);
406 // If the item is the broader context of any items, warn and do nothing.
408 List<String> narrowerItemCsids = findNarrower(csid);
410 if (narrowerItemCsids.size() > 0) {
411 logger.debug("Item " + csid + " has narrower items -- not deleting");
413 userNotes.add("The source record, " + displayName + ", was not deleted because it has narrower items in its hierarchy.");
416 // If the item has a broader context, delete the relation.
418 List<RelationsCommonList.RelationListItem> relationItems = new ArrayList<RelationsCommonList.RelationListItem>();
420 for (RelationsCommonList.RelationListItem item : findRelated(csid, null, "hasBroader", null, null)) {
421 relationItems.add(item);
424 if (relationItems.size() > 0) {
425 RelationResource relationResource = (RelationResource) getResourceMap().get(RelationClient.SERVICE_NAME);
427 for (RelationsCommonList.RelationListItem item : relationItems) {
428 String relationCsid = item.getCsid();
430 String subjectRefName = item.getSubject().getRefName();
431 String subjectDisplayName = RefNameUtils.getDisplayName(subjectRefName);
433 String objectRefName = item.getObject().getRefName();
434 String objectDisplayName = RefNameUtils.getDisplayName(objectRefName);
436 logger.debug("Deleting hasBroader relation " + relationCsid);
438 relationResource.delete(relationCsid);
440 userNotes.add("Deleted the \"has broader\" relation from " + subjectDisplayName + " to " + objectDisplayName + ".");
445 String serviceName = getAuthorityServiceNameForDocType(docType);
446 AuthorityResource<?, ?> resource = (AuthorityResource<?, ?>) getResourceMap().get(serviceName);
448 logger.debug("Soft deleting: docType=" + docType + " inAuthority=" + inAuthority + " csid=" + csid);
450 resource.updateItemWorkflowWithTransition(null, inAuthority, csid, "delete");
452 userNotes.add("Deleted the source record, " + displayName + ".");
456 InvocationResults results = new InvocationResults();
457 results.setNumAffected(numAffected);
458 results.setUserNote(StringUtils.join(userNotes, "\n"));
464 * @param Returns a map of the term groups in term group list, keyed by display name.
465 * If multiple groups have the same display name, an exception is thrown.
466 * @return The term groups.
468 private Map<String, Element> getTermGroups(Element termGroupListElement) {
469 Map<String, Element> termGroups = new LinkedHashMap<String, Element>();
470 Iterator<Element> childIterator = termGroupListElement.elementIterator();
472 while (childIterator.hasNext()) {
473 Element termGroupElement = childIterator.next();
474 String displayName = getDisplayName(termGroupElement);
476 if (termGroups.containsKey(displayName)) {
477 // Two term groups in the same item have identical display names.
479 throw new RuntimeException("multiple terms have display name \"" + displayName + "\"");
482 termGroups.put(displayName, termGroupElement);
489 private String getDisplayName(Element termGroupElement) {
490 Node displayNameNode = termGroupElement.selectSingleNode("termDisplayName");
491 String displayName = (displayNameNode == null) ? "" : displayNameNode.getText();
496 private Element getTermGroupListElement(PoxPayloadOut itemPayload) {
497 Element termGroupListElement = null;
498 Element commonPartElement = findCommonPartElement(itemPayload);
500 if (commonPartElement != null) {
501 termGroupListElement = findTermGroupListElement(commonPartElement);
504 return termGroupListElement;
507 private Element findCommonPartElement(PoxPayloadOut itemPayload) {
508 Element commonPartElement = null;
510 for (PayloadOutputPart candidatePart : itemPayload.getParts()) {
511 Element candidatePartElement = candidatePart.asElement();
513 if (candidatePartElement.getName().endsWith("_common")) {
514 commonPartElement = candidatePartElement;
519 return commonPartElement;
522 private Element findTermGroupListElement(Element contextElement) {
523 Element termGroupListElement = null;
524 Iterator<Element> childIterator = contextElement.elementIterator();
526 while (childIterator.hasNext()) {
527 Element candidateElement = childIterator.next();
529 if (candidateElement.getName().endsWith("TermGroupList")) {
530 termGroupListElement = candidateElement;
535 return termGroupListElement;
538 private void mergeTermGroupLists(Element targetTermGroupListElement, Element sourceTermGroupListElement) {
539 Map<String, Element> sourceTermGroups;
542 sourceTermGroups = getTermGroups(sourceTermGroupListElement);
544 catch(RuntimeException e) {
545 throw new RuntimeException("a problem was found in the source record: " + e.getMessage(), e);
548 for (Element targetTermGroupElement : (List<Element>) targetTermGroupListElement.elements()) {
549 String displayName = getDisplayName(targetTermGroupElement);
551 if (sourceTermGroups.containsKey(displayName)) {
552 logger.debug("Merging in existing term \"" + displayName + "\"");
555 mergeTermGroups(targetTermGroupElement, sourceTermGroups.get(displayName));
557 catch(RuntimeException e) {
558 throw new RuntimeException("could not merge term groups with display name \"" + displayName + "\": " + e.getMessage(), e);
561 sourceTermGroups.remove(displayName);
565 for (Element sourceTermGroupElement : sourceTermGroups.values()) {
566 logger.debug("Adding new term \"" + getDisplayName(sourceTermGroupElement) + "\"");
568 targetTermGroupListElement.add(sourceTermGroupElement.createCopy());
572 private void mergeTermGroups(Element targetTermGroupElement, Element sourceTermGroupElement) {
573 // This function assumes there are no nested repeating groups.
575 for (Element sourceChildElement : (List<Element>) sourceTermGroupElement.elements()) {
576 String sourceValue = sourceChildElement.getText();
578 if (sourceValue == null) {
582 if (sourceValue.length() > 0) {
583 String name = sourceChildElement.getName();
584 Element targetChildElement = targetTermGroupElement.element(name);
586 if (targetChildElement == null) {
587 targetTermGroupElement.add(sourceChildElement.createCopy());
590 String targetValue = targetChildElement.getText();
592 if (targetValue == null) {
596 if (!targetValue.equals(sourceValue)) {
597 if (targetValue.length() > 0) {
598 throw new RuntimeException("merge conflict in field " + name + ": source value \"" + sourceValue + "\" differs from target value \"" + targetValue +"\"");
601 targetTermGroupElement.remove(targetChildElement);
602 targetTermGroupElement.add(sourceChildElement.createCopy());
609 private String getUpdatePayload(Element originalTermGroupListElement, Element updatedTermGroupListElement) {
610 List<Element> parents = new ArrayList<Element>();
612 for (Element e = originalTermGroupListElement; e != null; e = e.getParent()) {
616 Collections.reverse(parents);
618 // Remove the original termGroupList element
619 parents.remove(parents.size() - 1);
622 Element rootElement = parents.remove(0);
624 // Copy the root to a new document
625 Document document = DocumentHelper.createDocument(copyElement(rootElement));
626 Element current = document.getRootElement();
628 // Copy the remaining parents
629 for (Element parent : parents) {
630 Element parentCopy = copyElement(parent);
632 current.add(parentCopy);
633 current = parentCopy;
636 // Add the updated termGroupList element
638 current.add(updatedTermGroupListElement);
640 String payload = document.asXML();
645 private Element copyElement(Element element) {
646 Element copy = DocumentHelper.createElement(element.getQName());
647 copy.appendAttributes(element);
652 private class ReferencingRecord {
654 private Map<String, Set<String>> fields;
656 public ReferencingRecord(String uri) {
658 this.fields = new HashMap<String, Set<String>>();
661 public String getUri() {
665 public void setUri(String uri) {
669 public Map<String, Set<String>> getFields() {