-
-
Save davidjohnhewlett/bc70ca6e76470dd9310bf10f68c1e434 to your computer and use it in GitHub Desktop.
import subprocess | |
import time | |
import xml.dom.minidom | |
import os.path | |
import os | |
import datetime | |
import xml.etree.ElementTree as ET | |
import glob | |
import random | |
#if clearXmlSave_location set to True, deletes all the files in this directory first, giving a clean start, will slow things down | |
def clearXmlSaveLocation(clearXmlSave_location): | |
if clearXmlSave_location: | |
xmlFiles = glob.glob(XmlSave_location + "*") | |
for xmlFile in xmlFiles: | |
os.remove(xmlFile) | |
#creates general search query to enter to subprocess call - this call then saves the XML file | |
#expects a quite specific call | |
def generateSearchQuery(object_type, search_terms, return_fields, search_type): | |
#print object_type #document or mail | |
object_type_text = "" | |
if object_type == "document": | |
object_type_text = "register?&" | |
#print "hi there" | |
elif object_type == "inbox": | |
object_type_text = "mail?mail_box=Inbox&" | |
elif object_type == "outbox": | |
object_type_text = "mail?mail_box=Sentbox&" | |
else: | |
object_type_text = "" | |
print "Invalid object type: must be either 'document' or 'inbox' or 'outbox'." | |
#print search_terms #list of terms for the search, e.g. [attribute3:P027c*, (docno:tbs* OR docno:tsb*)], combined with AND each time | |
search_text = "search_query="+search_terms[0] | |
if len(search_terms)!=1: | |
for search_term in search_terms[1:]: | |
search_text = search_text + " AND " + search_term | |
search_text = search_text + "&" | |
#print return_fields #author, received, registered, filename, docno, title, attribute3 [package of works] | |
return_fields_text = "return_fields=" | |
return_fields_text = return_fields_text + ",".join(return_fields) + "&" | |
#print search_type #Full | |
if search_type == "Full": | |
search_type_text = "search_type=Full" | |
elif "Number_Limited&search_result_size" in search_type: | |
search_type_text = "search_type=" + search_type | |
else: | |
print "Invalid search type entered, must be 'Full'." | |
return object_type_text + search_text + return_fields_text + search_type_text | |
#generates and returns path to xml file of document metadata, trims and adds data to document metadata | |
#adds ApprovalStatus and ApprovalStatusReason to document metadata if a clear signal is part of the metadata (e.g. status changed to status A) | |
#expects a 19 digit number, e.g. 1348828088510064790 | |
def generateDocumentMetadata(DocumentID): | |
#check that the documentTrackingNumber is valid | |
if len(str(DocumentID)) != 19: | |
print "Document tracking number " + DocumentID + " is not valid, not 19 characters." | |
if not unicode(DocumentID).isnumeric(): | |
print "Document tracking number " + DocumentID + " is not valid, not numeric." | |
SearchQuery = "register/" + str(DocumentID) + "/eventlog" | |
if not os.path.isfile(XmlSave_location+DocumentId+"documentID.xml"): | |
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,DocumentId+"documentID.xml"] | |
subprocess.call(call_command) | |
#strip the xml of less useful information (views, downloads etc) | |
tree = ET.parse(XmlSave_location+DocumentId+"documentID.xml") | |
root = tree.getroot() | |
for item in root.findall('.//CDEventLog'): | |
status_tag = ET.SubElement(item,'ApprovalStatus') | |
status_tag.text = "Unknown" | |
status_tag_reason = ET.SubElement(item,"ApprovalStatusReason") | |
status_tag_reason.text = "None" | |
event_type = item.get('EventType') | |
detailed_event_type = item.find('Event').text | |
if "TRANSMIT" in detailed_event_type: | |
transmit_number = detailed_event_type.replace("(",")").split(")")[1] | |
transmit_number_tag = ET.SubElement(item,'TransmitNumber') | |
transmit_number_tag.text = transmit_number | |
#print "transmit number added" | |
if event_type == "View": | |
root[0].remove(item) #element not interesting, delete | |
elif event_type == "Send": | |
if ("REJCTN" in detailed_event_type): | |
status_tag.text = "Not approved" | |
status_tag_reason.text = "REJCTN mail type" | |
#print "Definately not approved, rejection mail type" | |
elif (event_type == "Update"): | |
if ("Status A" in detailed_event_type): | |
#add approved to xml file | |
status_tag.text = "Approved" | |
status_tag_reason.text = "Updated to Status A" | |
#print "Updated to Status A" | |
if ("Code 1" in detailed_event_type): | |
#add approved to xml file | |
status_tag.text = "Approved" | |
status_tag_reason.text = "Updated to Code 1" | |
#print "Definately approved, code 1" | |
if ("Code 5" in detailed_event_type): | |
#add approved to xml file | |
status_tag.text = "Approved" | |
#status_tag_reason.text = "Updated to Code 5" | |
if ("Status B" in detailed_event_type): | |
status_tag.text = "Not approved" | |
status_tag_reason.text = "Updated to Status B" | |
#print "Definately not approved, Status B" | |
if ("Status C" in detailed_event_type and "Status Changed" not in detailed_event_type): | |
status_tag.text = "Not approved" | |
status_tag_reason.text = "Updated to Status c" | |
#print "Definately not approved, Status C" | |
if ("Code 2" in detailed_event_type): | |
status_tag.text = "Not approved" | |
status_tag_reason.text = "Updated to code 2" | |
#print "Definately not approved, Code 2" | |
if ("Code 3" in detailed_event_type): | |
status_tag.text = "Not approved" | |
status_tag_reason.text = "Updated to Code 3" | |
#print "Definately not approved, Code 3" | |
if ("Draft" in detailed_event_type): | |
status_tag.text = "Not approved" | |
status_tag_reason.text = "Updated to Draft" | |
#print "Definately not approved, Status Draft" | |
else: | |
print "Unknown event type..." | |
#save the altered xml file | |
tree.write(XmlSave_location+DocumentID+"documentID.xml") | |
return XmlSave_location+DocumentID+"documentID.xml" | |
#generates and returns path to xml file of mail metadata | |
#expects a 9 digit number, e.g. 290640151 | |
def generateMailMetadata(MailID): | |
#check that the MailID is valid | |
if MailID == None: | |
print 'Mail ID provided is none' | |
return "" | |
if MailID == -1: | |
print "Mail ID has remained unchanged at -1, see the function that called this function" | |
if len(str(MailID)) != 9: | |
print "MailID number " + MailID + " is not valid, not 9 characters." | |
if not unicode(MailID).isnumeric(): | |
print "MailID number " + MailID + " is not valid, not numeric." | |
#if file does not exist already, download, otherwise leave alone (sometimes the file is manipulated by the program so don't redownload) | |
if not os.path.isfile(XmlSave_location+MailID+"mailid.xml"): | |
call_command = [AconexTestAPI1_location,AconexProject_location+"mail/"+MailID,XmlSave_location,MailID+"mailid.xml"] | |
subprocess.call(call_command) | |
#time.sleep(0.2) | |
#remove the 'ApprovalStatus' tag in the xml file, it is confusing later. It refers to Aconex processing the mail approval, not the document approval. | |
tree = ET.parse(XmlSave_location+MailID+"mailid.xml") | |
root = tree.getroot() | |
root.remove(root.find('ApprovalStatus')) | |
tree.write(XmlSave_location+MailID+"mailid.xml") | |
return XmlSave_location+MailID+"mailid.xml" | |
#generates xml file given a mail number | |
#expects a mail number, e.g. "CS JV-TRANSMIT-000054", outputs a mail ID | |
#as a side effect, downloads and saves the xml metadata of that mail | |
def generateMailIDfromMailNumber(MailNumber): | |
#if the file has already been looked up, check the existing file, e.g. CS JV-TRANSMIT-009359.xml first | |
if os.path.isfile(XmlSave_location+MailNumber+".xml"): | |
#print "n" | |
tree = ET.parse(XmlSave_location+MailNumber+".xml") | |
root = tree.getroot().find('SearchResults') | |
mail_id = root.get('MailId') | |
else: | |
#print "m" | |
MailNumber_nospaces = MailNumber.replace(' ','?') #Aconex has issues processing the spaces, replace with a wildcard, this is a workaround | |
SearchQuery = generateSearchQuery("inbox",['docno:'+MailNumber_nospaces],['docno'],"Full") | |
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,MailNumber+".xml"] | |
subprocess.call(call_command) | |
tree = ET.parse(XmlSave_location+MailNumber+'.xml') | |
root = tree.getroot() | |
numberofMailResults = int(root.get('TotalResults')) | |
#print "numberofMailResults1 " + str(numberofMailResults) | |
#if there are no results, try the outbox, this is fairly rare | |
if numberofMailResults == 0: | |
#print "d" | |
SearchQuery = generateSearchQuery("outbox",['docno:'+MailNumber_nospaces],['docno'],"Full") | |
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,MailNumber+".xml"] | |
subprocess.call(call_command) | |
tree = ET.parse(XmlSave_location+MailNumber+'.xml') | |
root = tree.getroot() | |
numberofMailResults = int(root.get('TotalResults')) | |
#print "number of mail Results2 " + str(numberofMailResults) | |
mail_id = -1 #gives a bad result if errors occur | |
if numberofMailResults == 0: | |
print "Searching mail number " + MailNumber + " produced 0 results, some kind of error. No results returned" | |
elif numberofMailResults >= 2: | |
print "Searching mail number " + MailNumber + " produced " + str(numberofMailResults) + " results, some kind of error" | |
elif numberofMailResults == 1: | |
#print "Searching mail number " + MailNumber + " produced 1 result, as expected." | |
mail_element = root.find('.//Mail') | |
mail_id = mail_element.get('MailId') | |
#print "x" | |
#print root | |
mailIDXMLPath = generateMailMetadata(mail_id) | |
else: | |
print "Searching mail number " + MailNumber + " produced " + str(numberofMailResults) + " results, some kind of error." | |
return mail_id | |
#returns list of mail ids for related mail items | |
#expects a 9 digit number, e.g. 290640151 | |
#as a side effect, downloads the metadata for each related mail item, and adds the reply meta-data to each mail xml | |
def generateRelatedMailIDs(MailID): | |
#find the thread ID given the mail id | |
#first find the threadid of the item of mail | |
#print "y" | |
#print MailID | |
MailIDXMLPath = generateMailMetadata(MailID) | |
tree = ET.parse(MailIDXMLPath) | |
root = tree.getroot() | |
threadID = root.find('ThreadId').text | |
#search for all mail items on that thread | |
#N.B. Search query very straightforward, no need for separate function | |
call_command = [AconexTestAPI1_location,AconexProject_location+"mail/"+threadID+"/thread",XmlSave_location,threadID+"threadID.xml"] | |
subprocess.call(call_command) | |
thread_tree = ET.parse(XmlSave_location+threadID+"threadID.xml") | |
thread_root = thread_tree.getroot() | |
relatedMailIDs = [] | |
#create a list of all the MailIDs in the thread | |
for mailobject in thread_root.iter('Mail'): | |
relatedMailIDs.append(mailobject.get('MailId')) | |
#download the mail meta-data for each item | |
for relatedMailID in relatedMailIDs: | |
#print "z" | |
relatedMailXMLPath = generateMailMetadata(relatedMailID) | |
#if not already added, transfer the data from the thread into the xml for the message | |
#find the element of the thread ID xml page | |
mailThreadItem = thread_root.find('.//Mail[@MailId="' + relatedMailID + '"]') | |
replyType = mailThreadItem.find('ReplyType').text | |
numberofreplies = len(list(mailThreadItem.iter('Mail')))-1 | |
#add the reply type and and number of replies to the mailid.xml files for use later in determining approval status | |
tree = ET.parse(XmlSave_location+relatedMailID+"mailid.xml") | |
root = tree.getroot() | |
if root.find('NumberOfReplies') == None: | |
ET.SubElement(root,'NumberOfReplies') | |
if root.find('MailType') == None: | |
ET.SubElement(root,'MailType') | |
root.find('NumberOfReplies').text = str(numberofreplies) | |
root.find('MailType').text = replyType | |
tree.write(XmlSave_location+relatedMailID+"mailid.xml") | |
return relatedMailIDs | |
#Searches through document metadata xml to find every transmit number, and returns all the mail numbers in threads with those transmittals | |
#Searches through all mail that includes the document number in either the subject or body of the message | |
#expects a path to an existing document metadata xml file, document ID (19 digit number), document number (e.g. 1EW02-CSJ-DS-DES-S003-000110) | |
#returns a list of Mail IDs | |
#as a side effect, downloads any outstanding mail xmls | |
def findAllRelatedMail(documentMetadataPath,DocumentID,DocumentNumber): | |
#search the inbox and outbox for any mail that includes the document number in the body, this should capture document number requests and uploads to HS2 | |
#I have chosen not to then search the threads of those messages, assumed not to be of much value | |
relatedMailIDslist = [] | |
relatedMailIDsxmlpathslist = [] | |
SearchQuery = generateSearchQuery("inbox",['(subject:' + DocumentNumber + ' OR corrdata:' + DocumentNumber + ')'],['docno'],"Full") | |
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,DocumentID+"relatedmailinbox.xml"] | |
subprocess.call(call_command) | |
tree = ET.parse(XmlSave_location+DocumentID+"relatedmailinbox.xml") | |
root = tree.getroot().find('SearchResults') | |
for item in root.findall('Mail'): | |
relatedMailIDslist.append(item.get('MailId')) | |
SearchQuery = generateSearchQuery("outbox",['(subject:' + DocumentNumber + ' OR corrdata:' + DocumentNumber + ')'],['docno'],"Full") | |
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,DocumentID+"relatedmailoutbox.xml"] | |
subprocess.call(call_command) | |
tree = ET.parse(XmlSave_location+DocumentID+"relatedmailoutbox.xml") | |
root = tree.getroot().find('SearchResults') | |
for item in root.findall('Mail'): | |
relatedMailIDslist.append(item.get('MailId')) | |
for mailID in relatedMailIDslist: | |
#download all the xml files for these files if needed | |
#print "a" | |
relatedMailIDsxmlpathslist.append(generateMailMetadata(mailID)) | |
####### | |
tree2 = ET.parse(documentMetadataPath) | |
root2 = tree2.getroot() | |
for transmit_number in root2.findall('.//TransmitNumber'): | |
#print "q" | |
#print "transmit number " + transmit_number.text | |
MailID = generateMailIDfromMailNumber(transmit_number.text) | |
#find mail thread id | |
#print "p" | |
#print MailID | |
relatedMailIDs = generateRelatedMailIDs(MailID) | |
for relatedMailID in relatedMailIDs: | |
relatedMailIDslist.append(relatedMailID) | |
if len(relatedMailIDslist) > 1: | |
relatedMailIDslist = list(dict.fromkeys(relatedMailIDslist)) #this removes duplicates | |
return relatedMailIDslist | |
#given a mail item, uses its data to decide if the mail is approved or not approved and gives a reason | |
#expects a single mail ID (9 digit number) | |
#returns path to xml file for the mail, this xml file includes "ApprovalStatus" and "ApprovalStatusReason" tags | |
def classifyMailItem(mailID,DocumentId,DocumentNumber): | |
#alter approval status confidence, if it ends at more than 60, say approved, less than -60, no approval | |
ApprovalStatusConfidence = 0 | |
print mailID | |
print DocumentId | |
print DocumentNumber | |
#get the mail number from each mailID file | |
tree = ET.parse(XmlSave_location+mailID+"mailid.xml") | |
root = tree.getroot() | |
mailNumber = root.find('MailNo').text | |
#search the document xml for that mail number | |
tree = ET.parse(XmlSave_location+DocumentId+"documentID.xml") | |
root = tree.getroot() | |
element = root.findall('.//CDEventLog[TransmitNumber="' + mailNumber + '"]') #N.B. no not leave spaces around the = sign | |
if len(element) ==1: | |
element = element[0] | |
ApprovalStatus = element.find('ApprovalStatus').text | |
ApprovalStatusReason = element.find('ApprovalStatusReason').text | |
print "approval status" + ApprovalStatus | |
print "approval status reason" + ApprovalStatusReason | |
#if mail xml does not already have approval status and approval status reason and approval status confidence, insert those | |
#update status and approval status reason in mail xml | |
tree = ET.parse(XmlSave_location+mailID+"mailid.xml") | |
root = tree.getroot() | |
if root.find('ApprovalStatus') == None: | |
element = ET.SubElement(root,'ApprovalStatus') | |
element.text = ApprovalStatus | |
if root.find('ApprovalStatusReason') == None: | |
element = ET.SubElement(root,'ApprovalStatusReason') | |
element.text = ApprovalStatusReason | |
else: | |
element.text = element.text + ApprovalStatusReason | |
elif len(element)>1: | |
print str(len(element)) + "mails found with mail number " + str(mailNumber) + ", very unexpected." | |
else: | |
#continue | |
print "element has length " + str(len(element)) + ", reasonably expected." | |
print "" | |
#add ApprovalStatus and ApprovalStatusReason tags to the xml file | |
#try to interpret if mail item is an approval or rejection | |
#signal 1: do the changes to the document include a 'Status 1' or 'Approved' status, is the ? | |
#import any existing Approval status tags from the document Id xml file | |
#this is pretty final, set confidence +100 | |
#signal 2: is the mail type a rejection? -but to what documents exactly does it respond to? | |
#import any existing Approval status tags from the document Id xml file | |
#this is pretty final, can set ApprovalStatus -100 | |
#signal 2: who sent the document? | |
#build into a ML model | |
#if sent by CSJV +10 | |
#if not sent by CSJV -10 | |
#signal 3: has the mail been marked 'closed out'? | |
#pull out of 'Status tag' of the mail | |
#build into a ML model, but ignore for now | |
#not sure about the exact logic that can be applied here, leave for now | |
#signal 3: does the reponse contain the word 'accepted' or 'approved' in its text, then likely to be approved | |
#need a robust way of separating the first mail out of the chain | |
#then an ML model or something less sophisticated? | |
#reasonably strong evidence, +30 if true | |
#signal 3b: does the filename of the attached document contain the work 'accepted' or 'approved' or 'signed', then likely to be approved | |
#fairly strong evidence, +40 if its true | |
''' | |
x = root.find('MailData').text #the text in the response | |
y = root.find('.//FileName') #the filename - not all mails will have an attached document, duh! | |
if y == None: | |
y = "abc" #This is a short term fudge, to fix | |
else: | |
y = y.text | |
if x != None: | |
if 'from' in x: | |
x = x.split('From:')[0] | |
if (('approv' in x or 'accept' in x or 'Approv' in x or 'Accept' in x or 'approv' in y or 'accept' in y) and ('for approv' not in x)): | |
print "Approval very probably granted on " + root.find('MailNo').text | |
#signal 4: does the response contain the work 'amend' in its text, unlikely to be approved | |
elif ('marked' in x or 'marked' in y or 'to be addressed' in x): | |
print "Very probably no approval on " + root.find('MailNo').text | |
else: | |
print "Probably no approval on " + root.find('MailNo').text | |
''' | |
#signal 4: from the thread information, has the message been responded to? | |
#if the last piece of mail, then some evidence of approval, +10 | |
#signal 5: | |
#does the message look like document control responding to a DNR? | |
#i.e. from document control, containing the document number in the body of the message, DNR is in the thread (especially message following a DNR) | |
#signal 6: | |
#does the message look like document control signalling an upload to eB? | |
#a mail number with INT is in the body of the text, the mail number an INT, does person who sent it have <JobTitle> "Document Controller"? | |
#signal 7: | |
#does the message look like document control coming back from eB? | |
#does the transmit have both the document and a comments sheet, does the person who sent it have <JobTitle> "Document Controller"? | |
#key inputs for the program to function | |
AconexTestAPI1_location = r"C:/Users/david/Documents/Visual Studio 2015/Projects/AxonexTestAPI1/AxonexTestAPI1/bin/Debug/AxonexTestAPI1.exe" | |
AconexTestAPI2_location = r"c:/users/david/documents/visual studio 2015/Projects/AconexAPIGetFile/AconexAPIGetFile/bin/Debug/AconexAPIGetFile.exe" | |
AconexProject_location = "https://uk1.aconex.co.uk/api/projects/268441385/" | |
SearchQuery = "" | |
XmlSave_location = "D:/AllAconexDocuments/" | |
clearXmlSave_location = False #clears the folder of any existing xml files to give a fresh start, otherwise, they stay | |
clearXmlSaveLocation(clearXmlSave_location) | |
#this is the overarching xml - where the numbers of every document returned in the search is stored | |
XmlSaveFileName = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') + ".xml" | |
#this is the overarching search query 'e.g. find every TBS in package 27c' or 'e.g. find TSB-000180' | |
#SearchQuery = generateSearchQuery("document",["attribute3:P027c*","(docno:TBS* OR docno:TSB*)"],["docno","title","statusid"],"Number_Limited&search_result_size=25") | |
SearchQuery = generateSearchQuery("document",["attribute3:P027c*"],["docno","title","statusid","Filename","versionnumber"],"Full") | |
#SearchQuery = generateSearchQuery("document",["attribute3:P027c*","(docno:TBS-000152)"],["docno","title","statusid"],"Number_Limited&search_result_size=Full") | |
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,XmlSaveFileName] | |
subprocess.call(call_command) | |
#alter the xml file to pull out some key information | |
tree1 = ET.parse(XmlSave_location+XmlSaveFileName) | |
root1 = tree1.getroot() | |
searchResults = root1.find('SearchResults') | |
y1 = 1 | |
for child in searchResults: | |
DocumentId = child.attrib['DocumentId'] | |
DocumentNumber = child.find('DocumentNumber').text | |
DocumentTitle = child.find('Title').text | |
DocumentFilename = child.find('Filename').text | |
DocumentVersionNumber = child.find('VersionNumber').text | |
if not DocumentFilename == None: | |
DocumentFilename, DocumentExtension = os.path.splitext(DocumentFilename) | |
if not os.path.exists(XmlSave_location+DocumentNumber+"_"+DocumentVersionNumber+DocumentExtension): | |
y1 = y1+1 | |
print "Total number of documents is " + str(y1) | |
y2 = 1 | |
for child in searchResults: | |
DocumentId = child.attrib['DocumentId'] | |
DocumentNumber = child.find('DocumentNumber').text | |
DocumentTitle = child.find('Title').text | |
DocumentFilename = child.find('Filename').text | |
if not DocumentFilename == None: | |
DocumentFilename, DocumentExtension = os.path.splitext(DocumentFilename) | |
if not os.path.exists(XmlSave_location+DocumentNumber+"_"+DocumentVersionNumber+DocumentExtension): | |
print str(y2) + " of " + str(y1) + " has been downloaded" | |
y2 = y2+ 1 | |
print DocumentId | |
print DocumentNumber | |
print DocumentExtension | |
print "" | |
call_command = [AconexTestAPI2_location,DocumentId,XmlSave_location,DocumentNumber+"_"+DocumentVersionNumber+DocumentExtension] | |
subprocess.call(call_command) | |
x = random.randint(3,10) | |
time.sleep(x) |
Muy buen trabajo..... me podrías indicar si esto permite bajar todos los correos de un proyecto en aconex? y si quiero bajar todos los documentos cargados... cual código me puede ayudar...
desde ya muchas gracias.
hey, afraid this was done a long time ago, so I don't really remember the details (and Aconex already had much better ways of doing this, just they were disabled on my project so I was working around those limitations). The .exe files were compiled C# files - which are the other gists on the page. If I remember correctly it was the easiest way to run a C# script from python, which is what Aconex had provided documentation for. A bit more detail is here https://davidjohnhewlett.co.uk/?p=458.
Sorry not to be more helpful, but I'm a structural engineer rather than a developer - so I basically make code until it works for my purpose, rather than it being any good!
hi..... in line 396 and 397 have two EXE files.... please share to my... ?
Snk