1 package org.collectionspace.services.batch.nuxeo;
3 import java.net.URISyntaxException;
4 import java.util.ArrayList;
5 import java.util.Arrays;
6 import java.util.Collections;
7 import java.util.HashMap;
8 import java.util.HashSet;
9 import java.util.Iterator;
10 import java.util.LinkedHashMap;
11 import java.util.List;
15 import org.apache.commons.lang.StringUtils;
17 import org.collectionspace.services.client.PayloadOutputPart;
18 import org.collectionspace.services.client.PoxPayloadOut;
19 import org.collectionspace.services.client.RelationClient;
20 import org.collectionspace.services.client.workflow.WorkflowClient;
21 import org.collectionspace.services.common.NuxeoBasedResource;
22 import org.collectionspace.services.common.api.RefNameUtils;
23 import org.collectionspace.services.common.api.RefNameUtils.AuthorityTermInfo;
24 import org.collectionspace.services.common.authorityref.AuthorityRefDocList;
25 import org.collectionspace.services.common.invocable.InvocationContext.Params.Param;
26 import org.collectionspace.services.common.invocable.InvocationResults;
27 import org.collectionspace.services.common.relation.RelationResource;
28 import org.collectionspace.services.common.vocabulary.AuthorityResource;
29 import org.collectionspace.services.relation.RelationsCommonList;
31 import org.dom4j.Document;
32 import org.dom4j.DocumentException;
33 import org.dom4j.DocumentHelper;
34 import org.dom4j.Element;
35 import org.dom4j.Node;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * A batch job that merges authority items. The single and list contexts are
44 * The merge target is a record into which one or more source records will be
45 * merged. A merge source is a record that will be merged into the target, as
46 * follows: Each term in a source record is added to the target as a non-
47 * preferred term, if that term does not already exist in the target. If a term
48 * in the source already exists in the target, each non-blank term field is
49 * copied to the target, if that field is empty in the target. If the field is
50 * non-empty in the target, and differs from the source field, a warning is
51 * emitted and no action is taken. If a source is successfully merged into the
52 * target, all references to the source are transferred to the target, and the
53 * source record is soft-deleted.
55 * The context (singleCSID or listCSIDs of the batch invocation payload
56 * specifies the source record(s).
58 * The following parameters are allowed:
60 * targetCSID: The csid of the target record. Only one target may be supplied.
64 public class MergeAuthorityItemsBatchJob extends AbstractBatchJob {
65 final Logger logger = LoggerFactory.getLogger(MergeAuthorityItemsBatchJob.class);
67 public MergeAuthorityItemsBatchJob() {
68 setSupportedInvocationModes(Arrays.asList(INVOCATION_MODE_SINGLE, INVOCATION_MODE_LIST));
73 setCompletionStatus(STATUS_MIN_PROGRESS);
76 String docType = null;
77 String targetCsid = null;
78 List<String> sourceCsids = new ArrayList<String>();
80 for (Param param : this.getParams()) {
81 String key = param.getKey();
83 // I don't want this batch job to appear in the UI, since it won't run successfully without parameters.
84 // That means it can't be registered with any docType. But if the invocation payload contains a docType,
85 // it will be checked against the null registered docType, and will fail. So docType should be passed as a
88 if (key.equals("docType")) {
89 docType = param.getValue();
91 else if (key.equals("targetCSID")) {
92 targetCsid = param.getValue();
94 else if (key.equals("sourceCSID")) {
95 sourceCsids.add(param.getValue());
99 if (docType == null || docType.equals("")) {
100 throw new Exception("a docType must be supplied");
103 if (targetCsid == null || targetCsid.equals("")) {
104 throw new Exception("a target csid parameter (targetCSID) must be supplied");
107 if (sourceCsids.size() == 0) {
108 throw new Exception("a source csid must be supplied");
111 InvocationResults results = merge(docType, targetCsid, sourceCsids);
114 setCompletionStatus(STATUS_COMPLETE);
116 catch (Exception e) {
117 setCompletionStatus(STATUS_ERROR);
118 setErrorInfo(new InvocationError(INT_ERROR_STATUS, e.getMessage()));
122 public InvocationResults merge(String docType, String targetCsid, String sourceCsid) throws URISyntaxException, DocumentException {
123 return merge(docType, targetCsid, Arrays.asList(sourceCsid));
126 public InvocationResults merge(String docType, String targetCsid, List<String> sourceCsids) throws URISyntaxException, DocumentException {
127 logger.debug("Merging docType=" + docType + " targetCsid=" + targetCsid + " sourceCsids=" + StringUtils.join(sourceCsids, ","));
129 String serviceName = getAuthorityServiceNameForDocType(docType);
131 PoxPayloadOut targetItemPayload = findAuthorityItemByCsid(serviceName, targetCsid);
132 List<PoxPayloadOut> sourceItemPayloads = new ArrayList<PoxPayloadOut>();
134 for (String sourceCsid : sourceCsids) {
135 sourceItemPayloads.add(findAuthorityItemByCsid(serviceName, sourceCsid));
138 return merge(docType, targetItemPayload, sourceItemPayloads);
141 private InvocationResults merge(String docType, PoxPayloadOut targetItemPayload, List<PoxPayloadOut> sourceItemPayloads) throws URISyntaxException, DocumentException {
143 List<String> userNotes = new ArrayList<String>();
145 Element targetTermGroupListElement = getTermGroupListElement(targetItemPayload);
146 Element mergedTermGroupListElement = targetTermGroupListElement.createCopy();
148 String targetCsid = getCsid(targetItemPayload);
149 String targetRefName = getRefName(targetItemPayload);
150 String inAuthority = getFieldValue(targetItemPayload, "inAuthority");
152 logger.debug("Merging term groups");
154 for (PoxPayloadOut sourceItemPayload : sourceItemPayloads) {
155 String sourceCsid = getCsid(sourceItemPayload);
156 Element sourceTermGroupListElement = getTermGroupListElement(sourceItemPayload);
158 logger.debug("Merging term groups from source " + sourceCsid + " into target " + targetCsid);
161 mergeTermGroupLists(mergedTermGroupListElement, sourceTermGroupListElement);
163 catch(RuntimeException e) {
164 throw new RuntimeException("Error merging source record " + sourceCsid + " into target record " + targetCsid + ": " + e.getMessage(), e);
168 logger.debug("Updating target: docType=" + docType + " inAuthority=" + inAuthority + " targetCsid=" + targetCsid);
170 updateAuthorityItem(docType, inAuthority, targetCsid, getUpdatePayload(targetTermGroupListElement, mergedTermGroupListElement));
172 userNotes.add("The target record with CSID " + targetCsid + " (" + targetRefName + ") was updated.");
175 String serviceName = getAuthorityServiceNameForDocType(docType);
177 logger.debug("Updating references");
179 for (PoxPayloadOut sourceItemPayload : sourceItemPayloads) {
180 String sourceCsid = getCsid(sourceItemPayload);
181 String sourceRefName = getRefName(sourceItemPayload);
183 InvocationResults results = updateReferences(serviceName, inAuthority, sourceCsid, sourceRefName, targetRefName);
185 userNotes.add(results.getUserNote());
186 numAffected += results.getNumAffected();
189 logger.debug("Deleting source items");
191 for (PoxPayloadOut sourceItemPayload : sourceItemPayloads) {
192 String sourceCsid = getCsid(sourceItemPayload);
193 String sourceRefName = getRefName(sourceItemPayload);
195 InvocationResults results = deleteAuthorityItem(docType, getFieldValue(sourceItemPayload, "inAuthority"), sourceCsid);
197 userNotes.add(results.getUserNote());
198 numAffected += results.getNumAffected();
201 InvocationResults results = new InvocationResults();
202 results.setNumAffected(numAffected);
203 results.setUserNote(StringUtils.join(userNotes, "\n"));
208 private InvocationResults updateReferences(String serviceName, String inAuthority, String sourceCsid, String sourceRefName, String targetRefName) throws URISyntaxException, DocumentException {
209 logger.debug("Updating references: serviceName=" + serviceName + " inAuthority=" + inAuthority + " sourceCsid=" + sourceCsid + " sourceRefName=" + sourceRefName + " targetRefName=" + targetRefName);
213 List<AuthorityRefDocList.AuthorityRefDocItem> items;
218 logger.debug("Looping with pageSize=" + pageSize);
223 // The pageNum/pageSize parameters don't work properly for refobj requests!
224 // It should be safe to repeatedly fetch page 0 for a large-ish page size,
225 // and update that page, until no references are left.
227 items = findReferencingFields(serviceName, inAuthority, sourceCsid, null, pageNum, pageSize);
228 Map<String, ReferencingRecord> referencingRecordsByCsid = new LinkedHashMap<String, ReferencingRecord>();
230 logger.debug("Loop " + loopCount + ": " + items.size() + " items found");
232 for (AuthorityRefDocList.AuthorityRefDocItem item : items) {
233 // If a record contains a reference to the record multiple times, multiple items are returned,
234 // but only the first has a non-null workflow state. A bug?
236 String itemCsid = item.getDocId();
237 ReferencingRecord record = referencingRecordsByCsid.get(itemCsid);
239 if (record == null) {
240 if (item.getWorkflowState() != null && !item.getWorkflowState().equals(WorkflowClient.WORKFLOWSTATE_DELETED)) {
241 record = new ReferencingRecord(item.getUri());
242 referencingRecordsByCsid.put(itemCsid, record);
246 if (record != null) {
247 String[] sourceFieldElements = item.getSourceField().split(":");
248 String partName = sourceFieldElements[0];
249 String fieldName = sourceFieldElements[1];
251 Map<String, Set<String>> fields = record.getFields();
252 Set<String> fieldsInPart = fields.get(partName);
254 if (fieldsInPart == null) {
255 fieldsInPart = new HashSet<String>();
256 fields.put(partName, fieldsInPart);
259 fieldsInPart.add(fieldName);
263 List<ReferencingRecord> referencingRecords = new ArrayList<ReferencingRecord>(referencingRecordsByCsid.values());
265 logger.debug("Loop " + loopCount + ": updating " + referencingRecords.size() + " records");
267 for (ReferencingRecord record : referencingRecords) {
268 InvocationResults results = updateReferencingRecord(record, sourceRefName, targetRefName);
269 numUpdated += results.getNumAffected();
272 while (items.size() > 0);
274 InvocationResults results = new InvocationResults();
275 results.setNumAffected(numUpdated);
276 results.setUserNote(numUpdated > 0 ?
277 numUpdated + " records that referenced the source record with CSID " + sourceCsid + " were updated." :
278 "No records referenced the source record with CSID " + sourceCsid + ".");
283 private InvocationResults updateReferencingRecord(ReferencingRecord record, String fromRefName, String toRefName) throws URISyntaxException, DocumentException {
284 String fromRefNameStem = RefNameUtils.stripAuthorityTermDisplayName(fromRefName);
285 // String toRefNameStem = RefNameUtils.stripAuthorityTermDisplayName(toRefName);
287 logger.debug("Updating references: record.uri=" + record.getUri() + " fromRefName=" + fromRefName + " toRefName=" + toRefName);
289 Map<String, Set<String>> fields = record.getFields();
291 PoxPayloadOut recordPayload = findByUri(record.getUri());
292 Document recordDocument = recordPayload.getDOMDocument();
293 Document newDocument = (Document) recordDocument.clone();
294 Element rootElement = newDocument.getRootElement();
296 for (Element partElement : (List<Element>) rootElement.elements()) {
297 String partName = partElement.getName();
299 if (fields.containsKey(partName)) {
300 for (String fieldName : fields.get(partName)) {
301 List<Node> nodes = partElement.selectNodes("descendant::" + fieldName);
303 for (Node node : nodes) {
304 String text = node.getText();
305 String refNameStem = null;
308 refNameStem = RefNameUtils.stripAuthorityTermDisplayName(text);
310 catch(IllegalArgumentException e) {}
312 if (refNameStem != null && refNameStem.equals(fromRefNameStem)) {
313 AuthorityTermInfo termInfo = RefNameUtils.parseAuthorityTermInfo(text);
314 // String newRefName = toRefNameStem + "'" + termInfo.displayName + "'";
315 String newRefName = toRefName;
317 node.setText(newRefName);
323 rootElement.remove(partElement);
327 String payload = newDocument.asXML();
329 return updateUri(record.getUri(), payload);
332 private InvocationResults updateUri(String uri, String payload) throws URISyntaxException {
333 String[] uriParts = uri.split("/");
335 if (uriParts.length == 3) {
336 String serviceName = uriParts[1];
337 String csid = uriParts[2];
339 NuxeoBasedResource resource = (NuxeoBasedResource) getResourceMap().get(serviceName);
341 resource.update(getResourceMap(), createUriInfo(), csid, payload);
343 else if (uriParts.length == 5) {
344 String serviceName = uriParts[1];
345 String vocabularyCsid = uriParts[2];
346 String items = uriParts[3];
347 String csid = uriParts[4];
349 if (items.equals("items")) {
350 AuthorityResource<?, ?> resource = (AuthorityResource<?, ?>) getResourceMap().get(serviceName);
352 resource.updateAuthorityItem(getResourceMap(), createUriInfo(), vocabularyCsid, csid, payload);
356 throw new IllegalArgumentException("Invalid uri " + uri);
359 logger.debug("Updated referencing record " + uri);
361 InvocationResults results = new InvocationResults();
362 results.setNumAffected(1);
363 results.setUserNote("Updated referencing record " + uri);
368 private void updateAuthorityItem(String docType, String inAuthority, String csid, String payload) throws URISyntaxException {
369 String serviceName = getAuthorityServiceNameForDocType(docType);
370 AuthorityResource<?, ?> resource = (AuthorityResource<?, ?>) getResourceMap().get(serviceName);
372 resource.updateAuthorityItem(getResourceMap(), createUriInfo(), inAuthority, csid, payload);
375 private InvocationResults deleteAuthorityItem(String docType, String inAuthority, String csid) throws URISyntaxException {
377 List<String> userNotes = new ArrayList<String>();
379 // If the item is the broader context of any items, warn and do nothing.
381 List<String> narrowerItemCsids = findNarrower(csid);
383 if (narrowerItemCsids.size() > 0) {
384 logger.debug("Item " + csid + " has narrower items -- not deleting");
386 userNotes.add("The source record with CSID " + csid + " was not deleted because it has narrower context items.");
389 // If the item has a broader context, delete the relation.
391 List<String> relationCsids = new ArrayList<String>();
393 for (RelationsCommonList.RelationListItem item : findRelated(csid, null, "hasBroader", null, null)) {
394 relationCsids.add(item.getCsid());
397 if (relationCsids.size() > 0) {
398 RelationResource relationResource = (RelationResource) getResourceMap().get(RelationClient.SERVICE_NAME);
400 for (String relationCsid : relationCsids) {
401 logger.debug("Deleting hasBroader relation " + relationCsid);
403 relationResource.delete(relationCsid);
405 userNotes.add("The broader relation with CSID " + relationCsid + " was deleted.");
410 String serviceName = getAuthorityServiceNameForDocType(docType);
411 AuthorityResource<?, ?> resource = (AuthorityResource<?, ?>) getResourceMap().get(serviceName);
413 logger.debug("Soft deleting: docType=" + docType + " inAuthority=" + inAuthority + " csid=" + csid);
415 resource.updateItemWorkflowWithTransition(null, inAuthority, csid, "delete");
417 userNotes.add("The source record with CSID " + csid + " was soft deleted.");
421 InvocationResults results = new InvocationResults();
422 results.setNumAffected(numAffected);
423 results.setUserNote(StringUtils.join(userNotes, "\n"));
429 * @param Returns a map of the term groups in term group list, keyed by display name.
430 * If multiple groups have the same display name, an exception is thrown.
431 * @return The term groups.
433 private Map<String, Element> getTermGroups(Element termGroupListElement) {
434 Map<String, Element> termGroups = new LinkedHashMap<String, Element>();
435 Iterator<Element> childIterator = termGroupListElement.elementIterator();
437 while (childIterator.hasNext()) {
438 Element termGroupElement = childIterator.next();
439 String displayName = getDisplayName(termGroupElement);
441 if (termGroups.containsKey(displayName)) {
442 // Two term groups in the same item have identical display names.
444 throw new RuntimeException("multiple terms have display name \"" + displayName + "\"");
447 termGroups.put(displayName, termGroupElement);
454 private String getDisplayName(Element termGroupElement) {
455 Node displayNameNode = termGroupElement.selectSingleNode("termDisplayName");
456 String displayName = (displayNameNode == null) ? "" : displayNameNode.getText();
461 private Element getTermGroupListElement(PoxPayloadOut itemPayload) {
462 Element termGroupListElement = null;
463 Element commonPartElement = findCommonPartElement(itemPayload);
465 if (commonPartElement != null) {
466 termGroupListElement = findTermGroupListElement(commonPartElement);
469 return termGroupListElement;
472 private Element findCommonPartElement(PoxPayloadOut itemPayload) {
473 Element commonPartElement = null;
475 for (PayloadOutputPart candidatePart : itemPayload.getParts()) {
476 Element candidatePartElement = candidatePart.asElement();
478 if (candidatePartElement.getName().endsWith("_common")) {
479 commonPartElement = candidatePartElement;
484 return commonPartElement;
487 private Element findTermGroupListElement(Element contextElement) {
488 Element termGroupListElement = null;
489 Iterator<Element> childIterator = contextElement.elementIterator();
491 while (childIterator.hasNext()) {
492 Element candidateElement = childIterator.next();
494 if (candidateElement.getName().endsWith("TermGroupList")) {
495 termGroupListElement = candidateElement;
500 return termGroupListElement;
503 private void mergeTermGroupLists(Element targetTermGroupListElement, Element sourceTermGroupListElement) {
504 Map<String, Element> sourceTermGroups;
507 sourceTermGroups = getTermGroups(sourceTermGroupListElement);
509 catch(RuntimeException e) {
510 throw new RuntimeException("a problem was found in the source record: " + e.getMessage(), e);
513 for (Element targetTermGroupElement : (List<Element>) targetTermGroupListElement.elements()) {
514 String displayName = getDisplayName(targetTermGroupElement);
516 if (sourceTermGroups.containsKey(displayName)) {
517 logger.debug("Merging in existing term \"" + displayName + "\"");
520 mergeTermGroups(targetTermGroupElement, sourceTermGroups.get(displayName));
522 catch(RuntimeException e) {
523 throw new RuntimeException("could not merge term groups with display name \"" + displayName + "\": " + e.getMessage(), e);
526 sourceTermGroups.remove(displayName);
530 for (Element sourceTermGroupElement : sourceTermGroups.values()) {
531 logger.debug("Adding new term \"" + getDisplayName(sourceTermGroupElement) + "\"");
533 targetTermGroupListElement.add(sourceTermGroupElement.createCopy());
537 private void mergeTermGroups(Element targetTermGroupElement, Element sourceTermGroupElement) {
538 // This function assumes there are no nested repeating groups.
540 for (Element sourceChildElement : (List<Element>) sourceTermGroupElement.elements()) {
541 String sourceValue = sourceChildElement.getText();
543 if (sourceValue == null) {
547 if (sourceValue.length() > 0) {
548 String name = sourceChildElement.getName();
549 Element targetChildElement = targetTermGroupElement.element(name);
551 if (targetChildElement == null) {
552 targetTermGroupElement.add(sourceChildElement.createCopy());
555 String targetValue = targetChildElement.getText();
557 if (targetValue == null) {
561 if (!targetValue.equals(sourceValue)) {
562 if (targetValue.length() > 0) {
563 throw new RuntimeException("merge conflict in field " + name + ": source value \"" + sourceValue + "\" differs from target value \"" + targetValue +"\"");
566 targetTermGroupElement.remove(targetChildElement);
567 targetTermGroupElement.add(sourceChildElement.createCopy());
574 private String getUpdatePayload(Element originalTermGroupListElement, Element updatedTermGroupListElement) {
575 List<Element> parents = new ArrayList<Element>();
577 for (Element e = originalTermGroupListElement; e != null; e = e.getParent()) {
581 Collections.reverse(parents);
583 // Remove the original termGroupList element
584 parents.remove(parents.size() - 1);
587 Element rootElement = parents.remove(0);
589 // Copy the root to a new document
590 Document document = DocumentHelper.createDocument(copyElement(rootElement));
591 Element current = document.getRootElement();
593 // Copy the remaining parents
594 for (Element parent : parents) {
595 Element parentCopy = copyElement(parent);
597 current.add(parentCopy);
598 current = parentCopy;
601 // Add the updated termGroupList element
603 current.add(updatedTermGroupListElement);
605 String payload = document.asXML();
610 private Element copyElement(Element element) {
611 Element copy = DocumentHelper.createElement(element.getQName());
612 copy.appendAttributes(element);
617 private class ReferencingRecord {
619 private Map<String, Set<String>> fields;
621 public ReferencingRecord(String uri) {
623 this.fields = new HashMap<String, Set<String>>();
626 public String getUri() {
630 public void setUri(String uri) {
634 public Map<String, Set<String>> getFields() {