import subprocess | |
import time | |
import xml.dom.minidom | |
import os.path | |
import os | |
import datetime | |
import xml.etree.ElementTree as ET | |
import glob | |
import random | |
#if clearXmlSave_location set to True, deletes all the files in this directory first, giving a clean start, will slow things down | |
def clearXmlSaveLocation(clearXmlSave_location): | |
if clearXmlSave_location: | |
xmlFiles = glob.glob(XmlSave_location + "*") | |
for xmlFile in xmlFiles: | |
os.remove(xmlFile) | |
#creates general search query to enter to subprocess call - this call then saves the XML file | |
#expects a quite specific call | |
def generateSearchQuery(object_type, search_terms, return_fields, search_type): | |
#print object_type #document or mail | |
object_type_text = "" | |
if object_type == "document": | |
object_type_text = "register?&" | |
#print "hi there" | |
elif object_type == "inbox": | |
object_type_text = "mail?mail_box=Inbox&" | |
elif object_type == "outbox": | |
object_type_text = "mail?mail_box=Sentbox&" | |
else: | |
object_type_text = "" | |
print "Invalid object type: must be either 'document' or 'inbox' or 'outbox'." | |
#print search_terms #list of terms for the search, e.g. [attribute3:P027c*, (docno:tbs* OR docno:tsb*)], combined with AND each time | |
search_text = "search_query="+search_terms[0] | |
if len(search_terms)!=1: | |
for search_term in search_terms[1:]: | |
search_text = search_text + " AND " + search_term | |
search_text = search_text + "&" | |
#print return_fields #author, received, registered, filename, docno, title, attribute3 [package of works] | |
return_fields_text = "return_fields=" | |
return_fields_text = return_fields_text + ",".join(return_fields) + "&" | |
#print search_type #Full | |
if search_type == "Full": | |
search_type_text = "search_type=Full" | |
elif "Number_Limited&search_result_size" in search_type: | |
search_type_text = "search_type=" + search_type | |
else: | |
print "Invalid search type entered, must be 'Full'." | |
return object_type_text + search_text + return_fields_text + search_type_text | |
#generates and returns path to xml file of document metadata, trims and adds data to document metadata | |
#adds ApprovalStatus and ApprovalStatusReason to document metadata if a clear signal is part of the metadata (e.g. status changed to status A) | |
#expects a 19 digit number, e.g. 1348828088510064790 | |
def generateDocumentMetadata(DocumentID): | |
#check that the documentTrackingNumber is valid | |
if len(str(DocumentID)) != 19: | |
print "Document tracking number " + DocumentID + " is not valid, not 19 characters." | |
if not unicode(DocumentID).isnumeric(): | |
print "Document tracking number " + DocumentID + " is not valid, not numeric." | |
SearchQuery = "register/" + str(DocumentID) + "/eventlog" | |
if not os.path.isfile(XmlSave_location+DocumentId+"documentID.xml"): | |
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,DocumentId+"documentID.xml"] | |
subprocess.call(call_command) | |
#strip the xml of less useful information (views, downloads etc) | |
tree = ET.parse(XmlSave_location+DocumentId+"documentID.xml") | |
root = tree.getroot() | |
for item in root.findall('.//CDEventLog'): | |
status_tag = ET.SubElement(item,'ApprovalStatus') | |
status_tag.text = "Unknown" | |
status_tag_reason = ET.SubElement(item,"ApprovalStatusReason") | |
status_tag_reason.text = "None" | |
event_type = item.get('EventType') | |
detailed_event_type = item.find('Event').text | |
if "TRANSMIT" in detailed_event_type: | |
transmit_number = detailed_event_type.replace("(",")").split(")")[1] | |
transmit_number_tag = ET.SubElement(item,'TransmitNumber') | |
transmit_number_tag.text = transmit_number | |
#print "transmit number added" | |
if event_type == "View": | |
root[0].remove(item) #element not interesting, delete | |
elif event_type == "Send": | |
if ("REJCTN" in detailed_event_type): | |
status_tag.text = "Not approved" | |
status_tag_reason.text = "REJCTN mail type" | |
#print "Definately not approved, rejection mail type" | |
elif (event_type == "Update"): | |
if ("Status A" in detailed_event_type): | |
#add approved to xml file | |
status_tag.text = "Approved" | |
status_tag_reason.text = "Updated to Status A" | |
#print "Updated to Status A" | |
if ("Code 1" in detailed_event_type): | |
#add approved to xml file | |
status_tag.text = "Approved" | |
status_tag_reason.text = "Updated to Code 1" | |
#print "Definately approved, code 1" | |
if ("Code 5" in detailed_event_type): | |
#add approved to xml file | |
status_tag.text = "Approved" | |
#status_tag_reason.text = "Updated to Code 5" | |
if ("Status B" in detailed_event_type): | |
status_tag.text = "Not approved" | |
status_tag_reason.text = "Updated to Status B" | |
#print "Definately not approved, Status B" | |
if ("Status C" in detailed_event_type and "Status Changed" not in detailed_event_type): | |
status_tag.text = "Not approved" | |
status_tag_reason.text = "Updated to Status c" | |
#print "Definately not approved, Status C" | |
if ("Code 2" in detailed_event_type): | |
status_tag.text = "Not approved" | |
status_tag_reason.text = "Updated to code 2" | |
#print "Definately not approved, Code 2" | |
if ("Code 3" in detailed_event_type): | |
status_tag.text = "Not approved" | |
status_tag_reason.text = "Updated to Code 3" | |
#print "Definately not approved, Code 3" | |
if ("Draft" in detailed_event_type): | |
status_tag.text = "Not approved" | |
status_tag_reason.text = "Updated to Draft" | |
#print "Definately not approved, Status Draft" | |
else: | |
print "Unknown event type..." | |
#save the altered xml file | |
tree.write(XmlSave_location+DocumentID+"documentID.xml") | |
return XmlSave_location+DocumentID+"documentID.xml" | |
#generates and returns path to xml file of mail metadata | |
#expects a 9 digit number, e.g. 290640151 | |
def generateMailMetadata(MailID): | |
#check that the MailID is valid | |
if MailID == None: | |
print 'Mail ID provided is none' | |
return "" | |
if MailID == -1: | |
print "Mail ID has remained unchanged at -1, see the function that called this function" | |
if len(str(MailID)) != 9: | |
print "MailID number " + MailID + " is not valid, not 9 characters." | |
if not unicode(MailID).isnumeric(): | |
print "MailID number " + MailID + " is not valid, not numeric." | |
#if file does not exist already, download, otherwise leave alone (sometimes the file is manipulated by the program so don't redownload) | |
if not os.path.isfile(XmlSave_location+MailID+"mailid.xml"): | |
call_command = [AconexTestAPI1_location,AconexProject_location+"mail/"+MailID,XmlSave_location,MailID+"mailid.xml"] | |
subprocess.call(call_command) | |
#time.sleep(0.2) | |
#remove the 'ApprovalStatus' tag in the xml file, it is confusing later. It refers to Aconex processing the mail approval, not the document approval. | |
tree = ET.parse(XmlSave_location+MailID+"mailid.xml") | |
root = tree.getroot() | |
root.remove(root.find('ApprovalStatus')) | |
tree.write(XmlSave_location+MailID+"mailid.xml") | |
return XmlSave_location+MailID+"mailid.xml" | |
#generates xml file given a mail number | |
#expects a mail number, e.g. "CS JV-TRANSMIT-000054", outputs a mail ID | |
#as a side effect, downloads and saves the xml metadata of that mail | |
def generateMailIDfromMailNumber(MailNumber): | |
#if the file has already been looked up, check the existing file, e.g. CS JV-TRANSMIT-009359.xml first | |
if os.path.isfile(XmlSave_location+MailNumber+".xml"): | |
#print "n" | |
tree = ET.parse(XmlSave_location+MailNumber+".xml") | |
root = tree.getroot().find('SearchResults') | |
mail_id = root.get('MailId') | |
else: | |
#print "m" | |
MailNumber_nospaces = MailNumber.replace(' ','?') #Aconex has issues processing the spaces, replace with a wildcard, this is a workaround | |
SearchQuery = generateSearchQuery("inbox",['docno:'+MailNumber_nospaces],['docno'],"Full") | |
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,MailNumber+".xml"] | |
subprocess.call(call_command) | |
tree = ET.parse(XmlSave_location+MailNumber+'.xml') | |
root = tree.getroot() | |
numberofMailResults = int(root.get('TotalResults')) | |
#print "numberofMailResults1 " + str(numberofMailResults) | |
#if there are no results, try the outbox, this is fairly rare | |
if numberofMailResults == 0: | |
#print "d" | |
SearchQuery = generateSearchQuery("outbox",['docno:'+MailNumber_nospaces],['docno'],"Full") | |
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,MailNumber+".xml"] | |
subprocess.call(call_command) | |
tree = ET.parse(XmlSave_location+MailNumber+'.xml') | |
root = tree.getroot() | |
numberofMailResults = int(root.get('TotalResults')) | |
#print "number of mail Results2 " + str(numberofMailResults) | |
mail_id = -1 #gives a bad result if errors occur | |
if numberofMailResults == 0: | |
print "Searching mail number " + MailNumber + " produced 0 results, some kind of error. No results returned" | |
elif numberofMailResults >= 2: | |
print "Searching mail number " + MailNumber + " produced " + str(numberofMailResults) + " results, some kind of error" | |
elif numberofMailResults == 1: | |
#print "Searching mail number " + MailNumber + " produced 1 result, as expected." | |
mail_element = root.find('.//Mail') | |
mail_id = mail_element.get('MailId') | |
#print "x" | |
#print root | |
mailIDXMLPath = generateMailMetadata(mail_id) | |
else: | |
print "Searching mail number " + MailNumber + " produced " + str(numberofMailResults) + " results, some kind of error." | |
return mail_id | |
#returns list of mail ids for related mail items | |
#expects a 9 digit number, e.g. 290640151 | |
#as a side effect, downloads the metadata for each related mail item, and adds the reply meta-data to each mail xml | |
def generateRelatedMailIDs(MailID): | |
#find the thread ID given the mail id | |
#first find the threadid of the item of mail | |
#print "y" | |
#print MailID | |
MailIDXMLPath = generateMailMetadata(MailID) | |
tree = ET.parse(MailIDXMLPath) | |
root = tree.getroot() | |
threadID = root.find('ThreadId').text | |
#search for all mail items on that thread | |
#N.B. Search query very straightforward, no need for separate function | |
call_command = [AconexTestAPI1_location,AconexProject_location+"mail/"+threadID+"/thread",XmlSave_location,threadID+"threadID.xml"] | |
subprocess.call(call_command) | |
thread_tree = ET.parse(XmlSave_location+threadID+"threadID.xml") | |
thread_root = thread_tree.getroot() | |
relatedMailIDs = [] | |
#create a list of all the MailIDs in the thread | |
for mailobject in thread_root.iter('Mail'): | |
relatedMailIDs.append(mailobject.get('MailId')) | |
#download the mail meta-data for each item | |
for relatedMailID in relatedMailIDs: | |
#print "z" | |
relatedMailXMLPath = generateMailMetadata(relatedMailID) | |
#if not already added, transfer the data from the thread into the xml for the message | |
#find the element of the thread ID xml page | |
mailThreadItem = thread_root.find('.//Mail[@MailId="' + relatedMailID + '"]') | |
replyType = mailThreadItem.find('ReplyType').text | |
numberofreplies = len(list(mailThreadItem.iter('Mail')))-1 | |
#add the reply type and and number of replies to the mailid.xml files for use later in determining approval status | |
tree = ET.parse(XmlSave_location+relatedMailID+"mailid.xml") | |
root = tree.getroot() | |
if root.find('NumberOfReplies') == None: | |
ET.SubElement(root,'NumberOfReplies') | |
if root.find('MailType') == None: | |
ET.SubElement(root,'MailType') | |
root.find('NumberOfReplies').text = str(numberofreplies) | |
root.find('MailType').text = replyType | |
tree.write(XmlSave_location+relatedMailID+"mailid.xml") | |
return relatedMailIDs | |
#Searches through document metadata xml to find every transmit number, and returns all the mail numbers in threads with those transmittals | |
#Searches through all mail that includes the document number in either the subject or body of the message | |
#expects a path to an existing document metadata xml file, document ID (19 digit number), document number (e.g. 1EW02-CSJ-DS-DES-S003-000110) | |
#returns a list of Mail IDs | |
#as a side effect, downloads any outstanding mail xmls | |
def findAllRelatedMail(documentMetadataPath,DocumentID,DocumentNumber): | |
#search the inbox and outbox for any mail that includes the document number in the body, this should capture document number requests and uploads to HS2 | |
#I have chosen not to then search the threads of those messages, assumed not to be of much value | |
relatedMailIDslist = [] | |
relatedMailIDsxmlpathslist = [] | |
SearchQuery = generateSearchQuery("inbox",['(subject:' + DocumentNumber + ' OR corrdata:' + DocumentNumber + ')'],['docno'],"Full") | |
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,DocumentID+"relatedmailinbox.xml"] | |
subprocess.call(call_command) | |
tree = ET.parse(XmlSave_location+DocumentID+"relatedmailinbox.xml") | |
root = tree.getroot().find('SearchResults') | |
for item in root.findall('Mail'): | |
relatedMailIDslist.append(item.get('MailId')) | |
SearchQuery = generateSearchQuery("outbox",['(subject:' + DocumentNumber + ' OR corrdata:' + DocumentNumber + ')'],['docno'],"Full") | |
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,DocumentID+"relatedmailoutbox.xml"] | |
subprocess.call(call_command) | |
tree = ET.parse(XmlSave_location+DocumentID+"relatedmailoutbox.xml") | |
root = tree.getroot().find('SearchResults') | |
for item in root.findall('Mail'): | |
relatedMailIDslist.append(item.get('MailId')) | |
for mailID in relatedMailIDslist: | |
#download all the xml files for these files if needed | |
#print "a" | |
relatedMailIDsxmlpathslist.append(generateMailMetadata(mailID)) | |
####### | |
tree2 = ET.parse(documentMetadataPath) | |
root2 = tree2.getroot() | |
for transmit_number in root2.findall('.//TransmitNumber'): | |
#print "q" | |
#print "transmit number " + transmit_number.text | |
MailID = generateMailIDfromMailNumber(transmit_number.text) | |
#find mail thread id | |
#print "p" | |
#print MailID | |
relatedMailIDs = generateRelatedMailIDs(MailID) | |
for relatedMailID in relatedMailIDs: | |
relatedMailIDslist.append(relatedMailID) | |
if len(relatedMailIDslist) > 1: | |
relatedMailIDslist = list(dict.fromkeys(relatedMailIDslist)) #this removes duplicates | |
return relatedMailIDslist | |
#given a mail item, uses its data to decide if the mail is approved or not approved and gives a reason | |
#expects a single mail ID (9 digit number) | |
#returns path to xml file for the mail, this xml file includes "ApprovalStatus" and "ApprovalStatusReason" tags | |
def classifyMailItem(mailID,DocumentId,DocumentNumber): | |
#alter approval status confidence, if it ends at more than 60, say approved, less than -60, no approval | |
ApprovalStatusConfidence = 0 | |
print mailID | |
print DocumentId | |
print DocumentNumber | |
#get the mail number from each mailID file | |
tree = ET.parse(XmlSave_location+mailID+"mailid.xml") | |
root = tree.getroot() | |
mailNumber = root.find('MailNo').text | |
#search the document xml for that mail number | |
tree = ET.parse(XmlSave_location+DocumentId+"documentID.xml") | |
root = tree.getroot() | |
element = root.findall('.//CDEventLog[TransmitNumber="' + mailNumber + '"]') #N.B. no not leave spaces around the = sign | |
if len(element) ==1: | |
element = element[0] | |
ApprovalStatus = element.find('ApprovalStatus').text | |
ApprovalStatusReason = element.find('ApprovalStatusReason').text | |
print "approval status" + ApprovalStatus | |
print "approval status reason" + ApprovalStatusReason | |
#if mail xml does not already have approval status and approval status reason and approval status confidence, insert those | |
#update status and approval status reason in mail xml | |
tree = ET.parse(XmlSave_location+mailID+"mailid.xml") | |
root = tree.getroot() | |
if root.find('ApprovalStatus') == None: | |
element = ET.SubElement(root,'ApprovalStatus') | |
element.text = ApprovalStatus | |
if root.find('ApprovalStatusReason') == None: | |
element = ET.SubElement(root,'ApprovalStatusReason') | |
element.text = ApprovalStatusReason | |
else: | |
element.text = element.text + ApprovalStatusReason | |
elif len(element)>1: | |
print str(len(element)) + "mails found with mail number " + str(mailNumber) + ", very unexpected." | |
else: | |
#continue | |
print "element has length " + str(len(element)) + ", reasonably expected." | |
print "" | |
#add ApprovalStatus and ApprovalStatusReason tags to the xml file | |
#try to interpret if mail item is an approval or rejection | |
#signal 1: do the changes to the document include a 'Status 1' or 'Approved' status, is the ? | |
#import any existing Approval status tags from the document Id xml file | |
#this is pretty final, set confidence +100 | |
#signal 2: is the mail type a rejection? -but to what documents exactly does it respond to? | |
#import any existing Approval status tags from the document Id xml file | |
#this is pretty final, can set ApprovalStatus -100 | |
#signal 2: who sent the document? | |
#build into a ML model | |
#if sent by CSJV +10 | |
#if not sent by CSJV -10 | |
#signal 3: has the mail been marked 'closed out'? | |
#pull out of 'Status tag' of the mail | |
#build into a ML model, but ignore for now | |
#not sure about the exact logic that can be applied here, leave for now | |
#signal 3: does the reponse contain the word 'accepted' or 'approved' in its text, then likely to be approved | |
#need a robust way of separating the first mail out of the chain | |
#then an ML model or something less sophisticated? | |
#reasonably strong evidence, +30 if true | |
#signal 3b: does the filename of the attached document contain the work 'accepted' or 'approved' or 'signed', then likely to be approved | |
#fairly strong evidence, +40 if its true | |
''' | |
x = root.find('MailData').text #the text in the response | |
y = root.find('.//FileName') #the filename - not all mails will have an attached document, duh! | |
if y == None: | |
y = "abc" #This is a short term fudge, to fix | |
else: | |
y = y.text | |
if x != None: | |
if 'from' in x: | |
x = x.split('From:')[0] | |
if (('approv' in x or 'accept' in x or 'Approv' in x or 'Accept' in x or 'approv' in y or 'accept' in y) and ('for approv' not in x)): | |
print "Approval very probably granted on " + root.find('MailNo').text | |
#signal 4: does the response contain the work 'amend' in its text, unlikely to be approved | |
elif ('marked' in x or 'marked' in y or 'to be addressed' in x): | |
print "Very probably no approval on " + root.find('MailNo').text | |
else: | |
print "Probably no approval on " + root.find('MailNo').text | |
''' | |
#signal 4: from the thread information, has the message been responded to? | |
#if the last piece of mail, then some evidence of approval, +10 | |
#signal 5: | |
#does the message look like document control responding to a DNR? | |
#i.e. from document control, containing the document number in the body of the message, DNR is in the thread (especially message following a DNR) | |
#signal 6: | |
#does the message look like document control signalling an upload to eB? | |
#a mail number with INT is in the body of the text, the mail number an INT, does person who sent it have <JobTitle> "Document Controller"? | |
#signal 7: | |
#does the message look like document control coming back from eB? | |
#does the transmit have both the document and a comments sheet, does the person who sent it have <JobTitle> "Document Controller"? | |
#key inputs for the program to function | |
AconexTestAPI1_location = r"C:/Users/david/Documents/Visual Studio 2015/Projects/AxonexTestAPI1/AxonexTestAPI1/bin/Debug/AxonexTestAPI1.exe" | |
AconexTestAPI2_location = r"c:/users/david/documents/visual studio 2015/Projects/AconexAPIGetFile/AconexAPIGetFile/bin/Debug/AconexAPIGetFile.exe" | |
AconexProject_location = "https://uk1.aconex.co.uk/api/projects/268441385/" | |
SearchQuery = "" | |
XmlSave_location = "D:/AllAconexDocuments/" | |
clearXmlSave_location = False #clears the folder of any existing xml files to give a fresh start, otherwise, they stay | |
clearXmlSaveLocation(clearXmlSave_location) | |
#this is the overarching xml - where the numbers of every document returned in the search is stored | |
XmlSaveFileName = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') + ".xml" | |
#this is the overarching search query 'e.g. find every TBS in package 27c' or 'e.g. find TSB-000180' | |
#SearchQuery = generateSearchQuery("document",["attribute3:P027c*","(docno:TBS* OR docno:TSB*)"],["docno","title","statusid"],"Number_Limited&search_result_size=25") | |
SearchQuery = generateSearchQuery("document",["attribute3:P027c*"],["docno","title","statusid","Filename","versionnumber"],"Full") | |
#SearchQuery = generateSearchQuery("document",["attribute3:P027c*","(docno:TBS-000152)"],["docno","title","statusid"],"Number_Limited&search_result_size=Full") | |
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,XmlSaveFileName] | |
subprocess.call(call_command) | |
#alter the xml file to pull out some key information | |
tree1 = ET.parse(XmlSave_location+XmlSaveFileName) | |
root1 = tree1.getroot() | |
searchResults = root1.find('SearchResults') | |
y1 = 1 | |
for child in searchResults: | |
DocumentId = child.attrib['DocumentId'] | |
DocumentNumber = child.find('DocumentNumber').text | |
DocumentTitle = child.find('Title').text | |
DocumentFilename = child.find('Filename').text | |
DocumentVersionNumber = child.find('VersionNumber').text | |
if not DocumentFilename == None: | |
DocumentFilename, DocumentExtension = os.path.splitext(DocumentFilename) | |
if not os.path.exists(XmlSave_location+DocumentNumber+"_"+DocumentVersionNumber+DocumentExtension): | |
y1 = y1+1 | |
print "Total number of documents is " + str(y1) | |
y2 = 1 | |
for child in searchResults: | |
DocumentId = child.attrib['DocumentId'] | |
DocumentNumber = child.find('DocumentNumber').text | |
DocumentTitle = child.find('Title').text | |
DocumentFilename = child.find('Filename').text | |
if not DocumentFilename == None: | |
DocumentFilename, DocumentExtension = os.path.splitext(DocumentFilename) | |
if not os.path.exists(XmlSave_location+DocumentNumber+"_"+DocumentVersionNumber+DocumentExtension): | |
print str(y2) + " of " + str(y1) + " has been downloaded" | |
y2 = y2+ 1 | |
print DocumentId | |
print DocumentNumber | |
print DocumentExtension | |
print "" | |
call_command = [AconexTestAPI2_location,DocumentId,XmlSave_location,DocumentNumber+"_"+DocumentVersionNumber+DocumentExtension] | |
subprocess.call(call_command) | |
x = random.randint(3,10) | |
time.sleep(x) |
hi..... in line 396 and 397 have two EXE files.... please share to my... ?
Snk
Muy buen trabajo..... me podrías indicar si esto permite bajar todos los correos de un proyecto en aconex? y si quiero bajar todos los documentos cargados... cual código me puede ayudar...
desde ya muchas gracias.
hey, afraid this was done a long time ago, so I don't really remember the details (and Aconex already had much better ways of doing this, just they were disabled on my project so I was working around those limitations). The .exe files were compiled C# files - which are the other gists on the page. If I remember correctly it was the easiest way to run a C# script from python, which is what Aconex had provided documentation for. A bit more detail is here https://davidjohnhewlett.co.uk/?p=458.
Sorry not to be more helpful, but I'm a structural engineer rather than a developer - so I basically make code until it works for my purpose, rather than it being any good!
Muy buen trabajo..... me podrías indicar si esto permite bajar todos los correos de un proyecto en aconex? y si quiero bajar todos los documentos cargados... cual código me puede ayudar...
desde ya muchas gracias.