1 package org.collectionspace.services.batch.nuxeo;
3 import java.net.URISyntaxException;
4 import java.util.ArrayList;
5 import java.util.Arrays;
6 import java.util.Collections;
7 import java.util.HashMap;
8 import java.util.HashSet;
9 import java.util.Iterator;
10 import java.util.LinkedHashMap;
11 import java.util.List;
15 import org.collectionspace.services.client.PayloadOutputPart;
16 import org.collectionspace.services.client.PoxPayloadOut;
17 import org.collectionspace.services.client.RelationClient;
18 import org.collectionspace.services.client.workflow.WorkflowClient;
19 import org.collectionspace.services.common.NuxeoBasedResource;
20 import org.collectionspace.services.common.api.RefNameUtils;
21 import org.collectionspace.services.common.api.RefNameUtils.AuthorityTermInfo;
22 import org.collectionspace.services.common.authorityref.AuthorityRefDocList;
23 import org.collectionspace.services.common.invocable.InvocationContext.Params.Param;
24 import org.collectionspace.services.common.invocable.InvocationResults;
25 import org.collectionspace.services.common.relation.RelationResource;
26 import org.collectionspace.services.common.vocabulary.AuthorityResource;
27 import org.collectionspace.services.relation.RelationsCommonList;
28 import org.collectionspace.services.relation.RelationsCommonList.RelationListItem;
29 import org.dom4j.Document;
30 import org.dom4j.DocumentException;
31 import org.dom4j.DocumentHelper;
32 import org.dom4j.Element;
33 import org.dom4j.Node;
34 import org.nuxeo.common.utils.StringUtils;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * A batch job that merges authority items. The single and list contexts are
42 * The merge target is a record into which one or more source records will be
43 * merged. A merge source is a record that will be merged into the target, as
44 * follows: Each term in a source record is added to the target as a non-
45 * preferred term, if that term does not already exist in the target. If a term
46 * in the source already exists in the target, each non-blank term field is
47 * copied to the target, if that field is empty in the target. If the field is
48 * non-empty in the target, and differs from the source field, a warning is
49 * emitted and no action is taken. If a source is successfully merged into the
50 * target, all references to the source are transferred to the target, and the
51 * source record is soft-deleted.
53 * The context (singleCSID or listCSIDs of the batch invocation payload
54 * specifies the source record(s).
56 * The following parameters are allowed:
58 * targetCSID: The csid of the target record. Only one target may be supplied.
62 public class MergeAuthorityItemsBatchJob extends AbstractBatchJob {
63 final Logger logger = LoggerFactory.getLogger(MergeAuthorityItemsBatchJob.class);
65 public MergeAuthorityItemsBatchJob() {
66 setSupportedInvocationModes(Arrays.asList(INVOCATION_MODE_SINGLE, INVOCATION_MODE_LIST));
71 setCompletionStatus(STATUS_MIN_PROGRESS);
74 String docType = null;
75 String targetCsid = null;
76 List<String> sourceCsids = new ArrayList<String>();
78 for (Param param : this.getParams()) {
79 String key = param.getKey();
81 // I don't want this batch job to appear in the UI, since it won't run successfully without parameters.
82 // That means it can't be registered with any docType. But if the invocation payload contains a docType,
83 // it will be checked against the null registered docType, and will fail. So docType should be passed as a
86 if (key.equals("docType")) {
87 docType = param.getValue();
89 else if (key.equals("targetCSID")) {
90 targetCsid = param.getValue();
92 else if (key.equals("sourceCSID")) {
93 sourceCsids.add(param.getValue());
97 if (docType == null || docType.equals("")) {
98 throw new Exception("a docType must be supplied");
101 if (targetCsid == null || targetCsid.equals("")) {
102 throw new Exception("a target csid parameter (targetCSID) must be supplied");
105 if (sourceCsids.size() == 0) {
106 throw new Exception("a source csid must be supplied");
109 InvocationResults results = merge(docType, targetCsid, sourceCsids);
112 setCompletionStatus(STATUS_COMPLETE);
114 catch (Exception e) {
115 setCompletionStatus(STATUS_ERROR);
116 setErrorInfo(new InvocationError(INT_ERROR_STATUS, e.getMessage()));
120 public InvocationResults merge(String docType, String targetCsid, String sourceCsid) throws URISyntaxException, DocumentException {
121 return merge(docType, targetCsid, Arrays.asList(sourceCsid));
124 public InvocationResults merge(String docType, String targetCsid, List<String> sourceCsids) throws URISyntaxException, DocumentException {
125 logger.debug("Merging docType=" + docType + " targetCsid=" + targetCsid + " sourceCsids=" + StringUtils.join(sourceCsids, ","));
127 String serviceName = getAuthorityServiceNameForDocType(docType);
129 PoxPayloadOut targetItemPayload = findAuthorityItemByCsid(serviceName, targetCsid);
130 List<PoxPayloadOut> sourceItemPayloads = new ArrayList<PoxPayloadOut>();
132 for (String sourceCsid : sourceCsids) {
133 sourceItemPayloads.add(findAuthorityItemByCsid(serviceName, sourceCsid));
136 return merge(docType, targetItemPayload, sourceItemPayloads);
139 private InvocationResults merge(String docType, PoxPayloadOut targetItemPayload, List<PoxPayloadOut> sourceItemPayloads) throws URISyntaxException, DocumentException {
141 List<String> userNotes = new ArrayList<String>();
143 Element targetTermGroupListElement = getTermGroupListElement(targetItemPayload);
144 Element mergedTermGroupListElement = targetTermGroupListElement.createCopy();
146 String targetCsid = getCsid(targetItemPayload);
147 String targetRefName = getRefName(targetItemPayload);
148 String inAuthority = getFieldValue(targetItemPayload, "inAuthority");
150 logger.debug("Merging term groups");
152 for (PoxPayloadOut sourceItemPayload : sourceItemPayloads) {
153 String sourceCsid = getCsid(sourceItemPayload);
154 Element sourceTermGroupListElement = getTermGroupListElement(sourceItemPayload);
156 logger.debug("Merging term groups from source " + sourceCsid + " into target " + targetCsid);
159 mergeTermGroupLists(mergedTermGroupListElement, sourceTermGroupListElement);
161 catch(RuntimeException e) {
162 throw new RuntimeException("Error merging source record " + sourceCsid + " into target record " + targetCsid + ": " + e.getMessage(), e);
166 logger.debug("Updating target: docType=" + docType + " inAuthority=" + inAuthority + " targetCsid=" + targetCsid);
168 updateAuthorityItem(docType, inAuthority, targetCsid, getUpdatePayload(targetTermGroupListElement, mergedTermGroupListElement));
170 userNotes.add("The target record with CSID " + targetCsid + " (" + targetRefName + ") was updated.");
173 String serviceName = getAuthorityServiceNameForDocType(docType);
175 logger.debug("Updating references");
177 for (PoxPayloadOut sourceItemPayload : sourceItemPayloads) {
178 String sourceCsid = getCsid(sourceItemPayload);
179 String sourceRefName = getRefName(sourceItemPayload);
181 InvocationResults results = updateReferences(serviceName, inAuthority, sourceCsid, sourceRefName, targetRefName);
183 userNotes.add(results.getUserNote());
184 numAffected += results.getNumAffected();
187 logger.debug("Deleting source items");
189 for (PoxPayloadOut sourceItemPayload : sourceItemPayloads) {
190 String sourceCsid = getCsid(sourceItemPayload);
191 String sourceRefName = getRefName(sourceItemPayload);
193 InvocationResults results = deleteAuthorityItem(docType, getFieldValue(sourceItemPayload, "inAuthority"), sourceCsid);
195 userNotes.add(results.getUserNote());
196 numAffected += results.getNumAffected();
199 InvocationResults results = new InvocationResults();
200 results.setNumAffected(numAffected);
201 results.setUserNote(StringUtils.join(userNotes, "\n"));
206 private InvocationResults updateReferences(String serviceName, String inAuthority, String sourceCsid, String sourceRefName, String targetRefName) throws URISyntaxException, DocumentException {
207 logger.debug("Updating references: serviceName=" + serviceName + " inAuthority=" + inAuthority + " sourceCsid=" + sourceCsid + " sourceRefName=" + sourceRefName + " targetRefName=" + targetRefName);
211 List<AuthorityRefDocList.AuthorityRefDocItem> items;
216 logger.debug("Looping with pageSize=" + pageSize);
221 // The pageNum/pageSize parameters don't work properly for refobj requests!
222 // It should be safe to repeatedly fetch page 0 for a large-ish page size,
223 // and update that page, until no references are left.
225 items = findReferencingFields(serviceName, inAuthority, sourceCsid, null, pageNum, pageSize);
226 Map<String, ReferencingRecord> referencingRecordsByCsid = new LinkedHashMap<String, ReferencingRecord>();
228 logger.debug("Loop " + loopCount + ": " + items.size() + " items found");
230 for (AuthorityRefDocList.AuthorityRefDocItem item : items) {
231 // If a record contains a reference to the record multiple times, multiple items are returned,
232 // but only the first has a non-null workflow state. A bug?
234 String itemCsid = item.getDocId();
235 ReferencingRecord record = referencingRecordsByCsid.get(itemCsid);
237 if (record == null) {
238 if (item.getWorkflowState() != null && !item.getWorkflowState().equals(WorkflowClient.WORKFLOWSTATE_DELETED)) {
239 record = new ReferencingRecord(item.getUri());
240 referencingRecordsByCsid.put(itemCsid, record);
244 if (record != null) {
245 String[] sourceFieldElements = item.getSourceField().split(":");
246 String partName = sourceFieldElements[0];
247 String fieldName = sourceFieldElements[1];
249 Map<String, Set<String>> fields = record.getFields();
250 Set<String> fieldsInPart = fields.get(partName);
252 if (fieldsInPart == null) {
253 fieldsInPart = new HashSet<String>();
254 fields.put(partName, fieldsInPart);
257 fieldsInPart.add(fieldName);
261 List<ReferencingRecord> referencingRecords = new ArrayList<ReferencingRecord>(referencingRecordsByCsid.values());
263 logger.debug("Loop " + loopCount + ": updating " + referencingRecords.size() + " records");
265 for (ReferencingRecord record : referencingRecords) {
266 InvocationResults results = updateReferencingRecord(record, sourceRefName, targetRefName);
267 numUpdated += results.getNumAffected();
270 while (items.size() > 0);
272 InvocationResults results = new InvocationResults();
273 results.setNumAffected(numUpdated);
274 results.setUserNote(numUpdated > 0 ?
275 numUpdated + " records that referenced the source record with CSID " + sourceCsid + " were updated." :
276 "No records referenced the source record with CSID " + sourceCsid + ".");
281 private InvocationResults updateReferencingRecord(ReferencingRecord record, String fromRefName, String toRefName) throws URISyntaxException, DocumentException {
282 String fromRefNameStem = RefNameUtils.stripAuthorityTermDisplayName(fromRefName);
283 // String toRefNameStem = RefNameUtils.stripAuthorityTermDisplayName(toRefName);
285 logger.debug("Updating references: record.uri=" + record.getUri() + " fromRefName=" + fromRefName + " toRefName=" + toRefName);
287 Map<String, Set<String>> fields = record.getFields();
289 PoxPayloadOut recordPayload = findByUri(record.getUri());
290 Document recordDocument = recordPayload.getDOMDocument();
291 Document newDocument = (Document) recordDocument.clone();
292 Element rootElement = newDocument.getRootElement();
294 for (Element partElement : (List<Element>) rootElement.elements()) {
295 String partName = partElement.getName();
297 if (fields.containsKey(partName)) {
298 for (String fieldName : fields.get(partName)) {
299 List<Node> nodes = partElement.selectNodes("descendant::" + fieldName);
301 for (Node node : nodes) {
302 String text = node.getText();
303 String refNameStem = null;
306 refNameStem = RefNameUtils.stripAuthorityTermDisplayName(text);
308 catch(IllegalArgumentException e) {}
310 if (refNameStem != null && refNameStem.equals(fromRefNameStem)) {
311 AuthorityTermInfo termInfo = RefNameUtils.parseAuthorityTermInfo(text);
312 // String newRefName = toRefNameStem + "'" + termInfo.displayName + "'";
313 String newRefName = toRefName;
315 node.setText(newRefName);
321 rootElement.remove(partElement);
325 String payload = newDocument.asXML();
327 return updateUri(record.getUri(), payload);
330 private InvocationResults updateUri(String uri, String payload) throws URISyntaxException {
331 String[] uriParts = uri.split("/");
333 if (uriParts.length == 3) {
334 String serviceName = uriParts[1];
335 String csid = uriParts[2];
337 NuxeoBasedResource resource = (NuxeoBasedResource) getResourceMap().get(serviceName);
339 resource.update(getResourceMap(), createUriInfo(), csid, payload);
341 else if (uriParts.length == 5) {
342 String serviceName = uriParts[1];
343 String vocabularyCsid = uriParts[2];
344 String items = uriParts[3];
345 String csid = uriParts[4];
347 if (items.equals("items")) {
348 AuthorityResource<?, ?> resource = (AuthorityResource<?, ?>) getResourceMap().get(serviceName);
350 resource.updateAuthorityItem(getResourceMap(), createUriInfo(), vocabularyCsid, csid, payload);
354 throw new IllegalArgumentException("Invalid uri " + uri);
357 logger.debug("Updated referencing record " + uri);
359 InvocationResults results = new InvocationResults();
360 results.setNumAffected(1);
361 results.setUserNote("Updated referencing record " + uri);
366 private void updateAuthorityItem(String docType, String inAuthority, String csid, String payload) throws URISyntaxException {
367 String serviceName = getAuthorityServiceNameForDocType(docType);
368 AuthorityResource<?, ?> resource = (AuthorityResource<?, ?>) getResourceMap().get(serviceName);
370 resource.updateAuthorityItem(getResourceMap(), createUriInfo(), inAuthority, csid, payload);
373 private InvocationResults deleteAuthorityItem(String docType, String inAuthority, String csid) throws URISyntaxException {
375 List<String> userNotes = new ArrayList<String>();
377 // If the item is the broader context of any items, warn and do nothing.
379 List<String> narrowerItemCsids = findNarrower(csid);
381 if (narrowerItemCsids.size() > 0) {
382 logger.debug("Item " + csid + " has narrower items -- not deleting");
384 userNotes.add("The source record with CSID " + csid + " was not deleted because it has narrower context items.");
387 // If the item has a broader context, delete the relation.
389 List<String> relationCsids = new ArrayList<String>();
391 for (RelationsCommonList.RelationListItem item : findRelated(csid, null, "hasBroader", null, null)) {
392 relationCsids.add(item.getCsid());
395 if (relationCsids.size() > 0) {
396 RelationResource relationResource = (RelationResource) getResourceMap().get(RelationClient.SERVICE_NAME);
398 for (String relationCsid : relationCsids) {
399 logger.debug("Deleting hasBroader relation " + relationCsid);
401 relationResource.delete(relationCsid);
403 userNotes.add("The broader relation with CSID " + relationCsid + " was deleted.");
408 String serviceName = getAuthorityServiceNameForDocType(docType);
409 AuthorityResource<?, ?> resource = (AuthorityResource<?, ?>) getResourceMap().get(serviceName);
411 logger.debug("Soft deleting: docType=" + docType + " inAuthority=" + inAuthority + " csid=" + csid);
413 resource.updateItemWorkflowWithTransition(null, inAuthority, csid, "delete");
415 userNotes.add("The source record with CSID " + csid + " was soft deleted.");
419 InvocationResults results = new InvocationResults();
420 results.setNumAffected(numAffected);
421 results.setUserNote(StringUtils.join(userNotes, "\n"));
427 * @param Returns a map of the term groups in term group list, keyed by display name.
428 * If multiple groups have the same display name, an exception is thrown.
429 * @return The term groups.
431 private Map<String, Element> getTermGroups(Element termGroupListElement) {
432 Map<String, Element> termGroups = new LinkedHashMap<String, Element>();
433 Iterator<Element> childIterator = termGroupListElement.elementIterator();
435 while (childIterator.hasNext()) {
436 Element termGroupElement = childIterator.next();
437 String displayName = getDisplayName(termGroupElement);
439 if (termGroups.containsKey(displayName)) {
440 // Two term groups in the same item have identical display names.
442 throw new RuntimeException("multiple terms have display name \"" + displayName + "\"");
445 termGroups.put(displayName, termGroupElement);
452 private String getDisplayName(Element termGroupElement) {
453 Node displayNameNode = termGroupElement.selectSingleNode("termDisplayName");
454 String displayName = (displayNameNode == null) ? "" : displayNameNode.getText();
459 private Element getTermGroupListElement(PoxPayloadOut itemPayload) {
460 Element termGroupListElement = null;
461 Element commonPartElement = findCommonPartElement(itemPayload);
463 if (commonPartElement != null) {
464 termGroupListElement = findTermGroupListElement(commonPartElement);
467 return termGroupListElement;
470 private Element findCommonPartElement(PoxPayloadOut itemPayload) {
471 Element commonPartElement = null;
473 for (PayloadOutputPart candidatePart : itemPayload.getParts()) {
474 Element candidatePartElement = candidatePart.asElement();
476 if (candidatePartElement.getName().endsWith("_common")) {
477 commonPartElement = candidatePartElement;
482 return commonPartElement;
485 private Element findTermGroupListElement(Element contextElement) {
486 Element termGroupListElement = null;
487 Iterator<Element> childIterator = contextElement.elementIterator();
489 while (childIterator.hasNext()) {
490 Element candidateElement = childIterator.next();
492 if (candidateElement.getName().endsWith("TermGroupList")) {
493 termGroupListElement = candidateElement;
498 return termGroupListElement;
501 private void mergeTermGroupLists(Element targetTermGroupListElement, Element sourceTermGroupListElement) {
502 Map<String, Element> sourceTermGroups;
505 sourceTermGroups = getTermGroups(sourceTermGroupListElement);
507 catch(RuntimeException e) {
508 throw new RuntimeException("a problem was found in the source record: " + e.getMessage(), e);
511 for (Element targetTermGroupElement : (List<Element>) targetTermGroupListElement.elements()) {
512 String displayName = getDisplayName(targetTermGroupElement);
514 if (sourceTermGroups.containsKey(displayName)) {
515 logger.debug("Merging in existing term \"" + displayName + "\"");
518 mergeTermGroups(targetTermGroupElement, sourceTermGroups.get(displayName));
520 catch(RuntimeException e) {
521 throw new RuntimeException("could not merge term groups with display name \"" + displayName + "\": " + e.getMessage(), e);
524 sourceTermGroups.remove(displayName);
528 for (Element sourceTermGroupElement : sourceTermGroups.values()) {
529 logger.debug("Adding new term \"" + getDisplayName(sourceTermGroupElement) + "\"");
531 targetTermGroupListElement.add(sourceTermGroupElement.createCopy());
535 private void mergeTermGroups(Element targetTermGroupElement, Element sourceTermGroupElement) {
536 // This function assumes there are no nested repeating groups.
538 for (Element sourceChildElement : (List<Element>) sourceTermGroupElement.elements()) {
539 String sourceValue = sourceChildElement.getText();
541 if (sourceValue == null) {
545 if (sourceValue.length() > 0) {
546 String name = sourceChildElement.getName();
547 Element targetChildElement = targetTermGroupElement.element(name);
549 if (targetChildElement == null) {
550 targetTermGroupElement.add(sourceChildElement.createCopy());
553 String targetValue = targetChildElement.getText();
555 if (targetValue == null) {
559 if (!targetValue.equals(sourceValue)) {
560 if (targetValue.length() > 0) {
561 throw new RuntimeException("merge conflict in field " + name + ": source value \"" + sourceValue + "\" differs from target value \"" + targetValue +"\"");
564 targetTermGroupElement.remove(targetChildElement);
565 targetTermGroupElement.add(sourceChildElement.createCopy());
572 private String getUpdatePayload(Element originalTermGroupListElement, Element updatedTermGroupListElement) {
573 List<Element> parents = new ArrayList<Element>();
575 for (Element e = originalTermGroupListElement; e != null; e = e.getParent()) {
579 Collections.reverse(parents);
581 // Remove the original termGroupList element
582 parents.remove(parents.size() - 1);
585 Element rootElement = parents.remove(0);
587 // Copy the root to a new document
588 Document document = DocumentHelper.createDocument(copyElement(rootElement));
589 Element current = document.getRootElement();
591 // Copy the remaining parents
592 for (Element parent : parents) {
593 Element parentCopy = copyElement(parent);
595 current.add(parentCopy);
596 current = parentCopy;
599 // Add the updated termGroupList element
601 current.add(updatedTermGroupListElement);
603 String payload = document.asXML();
608 private Element copyElement(Element element) {
609 Element copy = DocumentHelper.createElement(element.getQName());
610 copy.appendAttributes(element);
615 private class ReferencingRecord {
617 private Map<String, Set<String>> fields;
619 public ReferencingRecord(String uri) {
621 this.fields = new HashMap<String, Set<String>>();
624 public String getUri() {
628 public void setUri(String uri) {
632 public Map<String, Set<String>> getFields() {