Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davidjohnhewlett/bc70ca6e76470dd9310bf10f68c1e434 to your computer and use it in GitHub Desktop.
Save davidjohnhewlett/bc70ca6e76470dd9310bf10f68c1e434 to your computer and use it in GitHub Desktop.
import subprocess
import time
import xml.dom.minidom
import os.path
import os
import datetime
import xml.etree.ElementTree as ET
import glob
import random
#if clearXmlSave_location set to True, deletes all the files in this directory first, giving a clean start, will slow things down
def clearXmlSaveLocation(clearXmlSave_location):
if clearXmlSave_location:
xmlFiles = glob.glob(XmlSave_location + "*")
for xmlFile in xmlFiles:
os.remove(xmlFile)
#creates general search query to enter to subprocess call - this call then saves the XML file
#expects a quite specific call
def generateSearchQuery(object_type, search_terms, return_fields, search_type):
#print object_type #document or mail
object_type_text = ""
if object_type == "document":
object_type_text = "register?&"
#print "hi there"
elif object_type == "inbox":
object_type_text = "mail?mail_box=Inbox&"
elif object_type == "outbox":
object_type_text = "mail?mail_box=Sentbox&"
else:
object_type_text = ""
print "Invalid object type: must be either 'document' or 'inbox' or 'outbox'."
#print search_terms #list of terms for the search, e.g. [attribute3:P027c*, (docno:tbs* OR docno:tsb*)], combined with AND each time
search_text = "search_query="+search_terms[0]
if len(search_terms)!=1:
for search_term in search_terms[1:]:
search_text = search_text + " AND " + search_term
search_text = search_text + "&"
#print return_fields #author, received, registered, filename, docno, title, attribute3 [package of works]
return_fields_text = "return_fields="
return_fields_text = return_fields_text + ",".join(return_fields) + "&"
#print search_type #Full
if search_type == "Full":
search_type_text = "search_type=Full"
elif "Number_Limited&search_result_size" in search_type:
search_type_text = "search_type=" + search_type
else:
print "Invalid search type entered, must be 'Full'."
return object_type_text + search_text + return_fields_text + search_type_text
#generates and returns path to xml file of document metadata, trims and adds data to document metadata
#adds ApprovalStatus and ApprovalStatusReason to document metadata if a clear signal is part of the metadata (e.g. status changed to status A)
#expects a 19 digit number, e.g. 1348828088510064790
def generateDocumentMetadata(DocumentID):
#check that the documentTrackingNumber is valid
if len(str(DocumentID)) != 19:
print "Document tracking number " + DocumentID + " is not valid, not 19 characters."
if not unicode(DocumentID).isnumeric():
print "Document tracking number " + DocumentID + " is not valid, not numeric."
SearchQuery = "register/" + str(DocumentID) + "/eventlog"
if not os.path.isfile(XmlSave_location+DocumentId+"documentID.xml"):
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,DocumentId+"documentID.xml"]
subprocess.call(call_command)
#strip the xml of less useful information (views, downloads etc)
tree = ET.parse(XmlSave_location+DocumentId+"documentID.xml")
root = tree.getroot()
for item in root.findall('.//CDEventLog'):
status_tag = ET.SubElement(item,'ApprovalStatus')
status_tag.text = "Unknown"
status_tag_reason = ET.SubElement(item,"ApprovalStatusReason")
status_tag_reason.text = "None"
event_type = item.get('EventType')
detailed_event_type = item.find('Event').text
if "TRANSMIT" in detailed_event_type:
transmit_number = detailed_event_type.replace("(",")").split(")")[1]
transmit_number_tag = ET.SubElement(item,'TransmitNumber')
transmit_number_tag.text = transmit_number
#print "transmit number added"
if event_type == "View":
root[0].remove(item) #element not interesting, delete
elif event_type == "Send":
if ("REJCTN" in detailed_event_type):
status_tag.text = "Not approved"
status_tag_reason.text = "REJCTN mail type"
#print "Definately not approved, rejection mail type"
elif (event_type == "Update"):
if ("Status A" in detailed_event_type):
#add approved to xml file
status_tag.text = "Approved"
status_tag_reason.text = "Updated to Status A"
#print "Updated to Status A"
if ("Code 1" in detailed_event_type):
#add approved to xml file
status_tag.text = "Approved"
status_tag_reason.text = "Updated to Code 1"
#print "Definately approved, code 1"
if ("Code 5" in detailed_event_type):
#add approved to xml file
status_tag.text = "Approved"
#status_tag_reason.text = "Updated to Code 5"
if ("Status B" in detailed_event_type):
status_tag.text = "Not approved"
status_tag_reason.text = "Updated to Status B"
#print "Definately not approved, Status B"
if ("Status C" in detailed_event_type and "Status Changed" not in detailed_event_type):
status_tag.text = "Not approved"
status_tag_reason.text = "Updated to Status c"
#print "Definately not approved, Status C"
if ("Code 2" in detailed_event_type):
status_tag.text = "Not approved"
status_tag_reason.text = "Updated to code 2"
#print "Definately not approved, Code 2"
if ("Code 3" in detailed_event_type):
status_tag.text = "Not approved"
status_tag_reason.text = "Updated to Code 3"
#print "Definately not approved, Code 3"
if ("Draft" in detailed_event_type):
status_tag.text = "Not approved"
status_tag_reason.text = "Updated to Draft"
#print "Definately not approved, Status Draft"
else:
print "Unknown event type..."
#save the altered xml file
tree.write(XmlSave_location+DocumentID+"documentID.xml")
return XmlSave_location+DocumentID+"documentID.xml"
#generates and returns path to xml file of mail metadata
#expects a 9 digit number, e.g. 290640151
def generateMailMetadata(MailID):
#check that the MailID is valid
if MailID == None:
print 'Mail ID provided is none'
return ""
if MailID == -1:
print "Mail ID has remained unchanged at -1, see the function that called this function"
if len(str(MailID)) != 9:
print "MailID number " + MailID + " is not valid, not 9 characters."
if not unicode(MailID).isnumeric():
print "MailID number " + MailID + " is not valid, not numeric."
#if file does not exist already, download, otherwise leave alone (sometimes the file is manipulated by the program so don't redownload)
if not os.path.isfile(XmlSave_location+MailID+"mailid.xml"):
call_command = [AconexTestAPI1_location,AconexProject_location+"mail/"+MailID,XmlSave_location,MailID+"mailid.xml"]
subprocess.call(call_command)
#time.sleep(0.2)
#remove the 'ApprovalStatus' tag in the xml file, it is confusing later. It refers to Aconex processing the mail approval, not the document approval.
tree = ET.parse(XmlSave_location+MailID+"mailid.xml")
root = tree.getroot()
root.remove(root.find('ApprovalStatus'))
tree.write(XmlSave_location+MailID+"mailid.xml")
return XmlSave_location+MailID+"mailid.xml"
#generates xml file given a mail number
#expects a mail number, e.g. "CS JV-TRANSMIT-000054", outputs a mail ID
#as a side effect, downloads and saves the xml metadata of that mail
def generateMailIDfromMailNumber(MailNumber):
#if the file has already been looked up, check the existing file, e.g. CS JV-TRANSMIT-009359.xml first
if os.path.isfile(XmlSave_location+MailNumber+".xml"):
#print "n"
tree = ET.parse(XmlSave_location+MailNumber+".xml")
root = tree.getroot().find('SearchResults')
mail_id = root.get('MailId')
else:
#print "m"
MailNumber_nospaces = MailNumber.replace(' ','?') #Aconex has issues processing the spaces, replace with a wildcard, this is a workaround
SearchQuery = generateSearchQuery("inbox",['docno:'+MailNumber_nospaces],['docno'],"Full")
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,MailNumber+".xml"]
subprocess.call(call_command)
tree = ET.parse(XmlSave_location+MailNumber+'.xml')
root = tree.getroot()
numberofMailResults = int(root.get('TotalResults'))
#print "numberofMailResults1 " + str(numberofMailResults)
#if there are no results, try the outbox, this is fairly rare
if numberofMailResults == 0:
#print "d"
SearchQuery = generateSearchQuery("outbox",['docno:'+MailNumber_nospaces],['docno'],"Full")
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,MailNumber+".xml"]
subprocess.call(call_command)
tree = ET.parse(XmlSave_location+MailNumber+'.xml')
root = tree.getroot()
numberofMailResults = int(root.get('TotalResults'))
#print "number of mail Results2 " + str(numberofMailResults)
mail_id = -1 #gives a bad result if errors occur
if numberofMailResults == 0:
print "Searching mail number " + MailNumber + " produced 0 results, some kind of error. No results returned"
elif numberofMailResults >= 2:
print "Searching mail number " + MailNumber + " produced " + str(numberofMailResults) + " results, some kind of error"
elif numberofMailResults == 1:
#print "Searching mail number " + MailNumber + " produced 1 result, as expected."
mail_element = root.find('.//Mail')
mail_id = mail_element.get('MailId')
#print "x"
#print root
mailIDXMLPath = generateMailMetadata(mail_id)
else:
print "Searching mail number " + MailNumber + " produced " + str(numberofMailResults) + " results, some kind of error."
return mail_id
#returns list of mail ids for related mail items
#expects a 9 digit number, e.g. 290640151
#as a side effect, downloads the metadata for each related mail item, and adds the reply meta-data to each mail xml
def generateRelatedMailIDs(MailID):
#find the thread ID given the mail id
#first find the threadid of the item of mail
#print "y"
#print MailID
MailIDXMLPath = generateMailMetadata(MailID)
tree = ET.parse(MailIDXMLPath)
root = tree.getroot()
threadID = root.find('ThreadId').text
#search for all mail items on that thread
#N.B. Search query very straightforward, no need for separate function
call_command = [AconexTestAPI1_location,AconexProject_location+"mail/"+threadID+"/thread",XmlSave_location,threadID+"threadID.xml"]
subprocess.call(call_command)
thread_tree = ET.parse(XmlSave_location+threadID+"threadID.xml")
thread_root = thread_tree.getroot()
relatedMailIDs = []
#create a list of all the MailIDs in the thread
for mailobject in thread_root.iter('Mail'):
relatedMailIDs.append(mailobject.get('MailId'))
#download the mail meta-data for each item
for relatedMailID in relatedMailIDs:
#print "z"
relatedMailXMLPath = generateMailMetadata(relatedMailID)
#if not already added, transfer the data from the thread into the xml for the message
#find the element of the thread ID xml page
mailThreadItem = thread_root.find('.//Mail[@MailId="' + relatedMailID + '"]')
replyType = mailThreadItem.find('ReplyType').text
numberofreplies = len(list(mailThreadItem.iter('Mail')))-1
#add the reply type and and number of replies to the mailid.xml files for use later in determining approval status
tree = ET.parse(XmlSave_location+relatedMailID+"mailid.xml")
root = tree.getroot()
if root.find('NumberOfReplies') == None:
ET.SubElement(root,'NumberOfReplies')
if root.find('MailType') == None:
ET.SubElement(root,'MailType')
root.find('NumberOfReplies').text = str(numberofreplies)
root.find('MailType').text = replyType
tree.write(XmlSave_location+relatedMailID+"mailid.xml")
return relatedMailIDs
#Searches through document metadata xml to find every transmit number, and returns all the mail numbers in threads with those transmittals
#Searches through all mail that includes the document number in either the subject or body of the message
#expects a path to an existing document metadata xml file, document ID (19 digit number), document number (e.g. 1EW02-CSJ-DS-DES-S003-000110)
#returns a list of Mail IDs
#as a side effect, downloads any outstanding mail xmls
def findAllRelatedMail(documentMetadataPath,DocumentID,DocumentNumber):
#search the inbox and outbox for any mail that includes the document number in the body, this should capture document number requests and uploads to HS2
#I have chosen not to then search the threads of those messages, assumed not to be of much value
relatedMailIDslist = []
relatedMailIDsxmlpathslist = []
SearchQuery = generateSearchQuery("inbox",['(subject:' + DocumentNumber + ' OR corrdata:' + DocumentNumber + ')'],['docno'],"Full")
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,DocumentID+"relatedmailinbox.xml"]
subprocess.call(call_command)
tree = ET.parse(XmlSave_location+DocumentID+"relatedmailinbox.xml")
root = tree.getroot().find('SearchResults')
for item in root.findall('Mail'):
relatedMailIDslist.append(item.get('MailId'))
SearchQuery = generateSearchQuery("outbox",['(subject:' + DocumentNumber + ' OR corrdata:' + DocumentNumber + ')'],['docno'],"Full")
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,DocumentID+"relatedmailoutbox.xml"]
subprocess.call(call_command)
tree = ET.parse(XmlSave_location+DocumentID+"relatedmailoutbox.xml")
root = tree.getroot().find('SearchResults')
for item in root.findall('Mail'):
relatedMailIDslist.append(item.get('MailId'))
for mailID in relatedMailIDslist:
#download all the xml files for these files if needed
#print "a"
relatedMailIDsxmlpathslist.append(generateMailMetadata(mailID))
#######
tree2 = ET.parse(documentMetadataPath)
root2 = tree2.getroot()
for transmit_number in root2.findall('.//TransmitNumber'):
#print "q"
#print "transmit number " + transmit_number.text
MailID = generateMailIDfromMailNumber(transmit_number.text)
#find mail thread id
#print "p"
#print MailID
relatedMailIDs = generateRelatedMailIDs(MailID)
for relatedMailID in relatedMailIDs:
relatedMailIDslist.append(relatedMailID)
if len(relatedMailIDslist) > 1:
relatedMailIDslist = list(dict.fromkeys(relatedMailIDslist)) #this removes duplicates
return relatedMailIDslist
#given a mail item, uses its data to decide if the mail is approved or not approved and gives a reason
#expects a single mail ID (9 digit number)
#returns path to xml file for the mail, this xml file includes "ApprovalStatus" and "ApprovalStatusReason" tags
def classifyMailItem(mailID,DocumentId,DocumentNumber):
#alter approval status confidence, if it ends at more than 60, say approved, less than -60, no approval
ApprovalStatusConfidence = 0
print mailID
print DocumentId
print DocumentNumber
#get the mail number from each mailID file
tree = ET.parse(XmlSave_location+mailID+"mailid.xml")
root = tree.getroot()
mailNumber = root.find('MailNo').text
#search the document xml for that mail number
tree = ET.parse(XmlSave_location+DocumentId+"documentID.xml")
root = tree.getroot()
element = root.findall('.//CDEventLog[TransmitNumber="' + mailNumber + '"]') #N.B. no not leave spaces around the = sign
if len(element) ==1:
element = element[0]
ApprovalStatus = element.find('ApprovalStatus').text
ApprovalStatusReason = element.find('ApprovalStatusReason').text
print "approval status" + ApprovalStatus
print "approval status reason" + ApprovalStatusReason
#if mail xml does not already have approval status and approval status reason and approval status confidence, insert those
#update status and approval status reason in mail xml
tree = ET.parse(XmlSave_location+mailID+"mailid.xml")
root = tree.getroot()
if root.find('ApprovalStatus') == None:
element = ET.SubElement(root,'ApprovalStatus')
element.text = ApprovalStatus
if root.find('ApprovalStatusReason') == None:
element = ET.SubElement(root,'ApprovalStatusReason')
element.text = ApprovalStatusReason
else:
element.text = element.text + ApprovalStatusReason
elif len(element)>1:
print str(len(element)) + "mails found with mail number " + str(mailNumber) + ", very unexpected."
else:
#continue
print "element has length " + str(len(element)) + ", reasonably expected."
print ""
#add ApprovalStatus and ApprovalStatusReason tags to the xml file
#try to interpret if mail item is an approval or rejection
#signal 1: do the changes to the document include a 'Status 1' or 'Approved' status, is the ?
#import any existing Approval status tags from the document Id xml file
#this is pretty final, set confidence +100
#signal 2: is the mail type a rejection? -but to what documents exactly does it respond to?
#import any existing Approval status tags from the document Id xml file
#this is pretty final, can set ApprovalStatus -100
#signal 2: who sent the document?
#build into a ML model
#if sent by CSJV +10
#if not sent by CSJV -10
#signal 3: has the mail been marked 'closed out'?
#pull out of 'Status tag' of the mail
#build into a ML model, but ignore for now
#not sure about the exact logic that can be applied here, leave for now
#signal 3: does the reponse contain the word 'accepted' or 'approved' in its text, then likely to be approved
#need a robust way of separating the first mail out of the chain
#then an ML model or something less sophisticated?
#reasonably strong evidence, +30 if true
#signal 3b: does the filename of the attached document contain the work 'accepted' or 'approved' or 'signed', then likely to be approved
#fairly strong evidence, +40 if its true
'''
x = root.find('MailData').text #the text in the response
y = root.find('.//FileName') #the filename - not all mails will have an attached document, duh!
if y == None:
y = "abc" #This is a short term fudge, to fix
else:
y = y.text
if x != None:
if 'from' in x:
x = x.split('From:')[0]
if (('approv' in x or 'accept' in x or 'Approv' in x or 'Accept' in x or 'approv' in y or 'accept' in y) and ('for approv' not in x)):
print "Approval very probably granted on " + root.find('MailNo').text
#signal 4: does the response contain the work 'amend' in its text, unlikely to be approved
elif ('marked' in x or 'marked' in y or 'to be addressed' in x):
print "Very probably no approval on " + root.find('MailNo').text
else:
print "Probably no approval on " + root.find('MailNo').text
'''
#signal 4: from the thread information, has the message been responded to?
#if the last piece of mail, then some evidence of approval, +10
#signal 5:
#does the message look like document control responding to a DNR?
#i.e. from document control, containing the document number in the body of the message, DNR is in the thread (especially message following a DNR)
#signal 6:
#does the message look like document control signalling an upload to eB?
#a mail number with INT is in the body of the text, the mail number an INT, does person who sent it have <JobTitle> "Document Controller"?
#signal 7:
#does the message look like document control coming back from eB?
#does the transmit have both the document and a comments sheet, does the person who sent it have <JobTitle> "Document Controller"?
#key inputs for the program to function
AconexTestAPI1_location = r"C:/Users/david/Documents/Visual Studio 2015/Projects/AxonexTestAPI1/AxonexTestAPI1/bin/Debug/AxonexTestAPI1.exe"
AconexTestAPI2_location = r"c:/users/david/documents/visual studio 2015/Projects/AconexAPIGetFile/AconexAPIGetFile/bin/Debug/AconexAPIGetFile.exe"
AconexProject_location = "https://uk1.aconex.co.uk/api/projects/268441385/"
SearchQuery = ""
XmlSave_location = "D:/AllAconexDocuments/"
clearXmlSave_location = False #clears the folder of any existing xml files to give a fresh start, otherwise, they stay
clearXmlSaveLocation(clearXmlSave_location)
#this is the overarching xml - where the numbers of every document returned in the search is stored
XmlSaveFileName = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S') + ".xml"
#this is the overarching search query 'e.g. find every TBS in package 27c' or 'e.g. find TSB-000180'
#SearchQuery = generateSearchQuery("document",["attribute3:P027c*","(docno:TBS* OR docno:TSB*)"],["docno","title","statusid"],"Number_Limited&search_result_size=25")
SearchQuery = generateSearchQuery("document",["attribute3:P027c*"],["docno","title","statusid","Filename","versionnumber"],"Full")
#SearchQuery = generateSearchQuery("document",["attribute3:P027c*","(docno:TBS-000152)"],["docno","title","statusid"],"Number_Limited&search_result_size=Full")
call_command = [AconexTestAPI1_location,AconexProject_location+SearchQuery,XmlSave_location,XmlSaveFileName]
subprocess.call(call_command)
#alter the xml file to pull out some key information
tree1 = ET.parse(XmlSave_location+XmlSaveFileName)
root1 = tree1.getroot()
searchResults = root1.find('SearchResults')
y1 = 1
for child in searchResults:
DocumentId = child.attrib['DocumentId']
DocumentNumber = child.find('DocumentNumber').text
DocumentTitle = child.find('Title').text
DocumentFilename = child.find('Filename').text
DocumentVersionNumber = child.find('VersionNumber').text
if not DocumentFilename == None:
DocumentFilename, DocumentExtension = os.path.splitext(DocumentFilename)
if not os.path.exists(XmlSave_location+DocumentNumber+"_"+DocumentVersionNumber+DocumentExtension):
y1 = y1+1
print "Total number of documents is " + str(y1)
y2 = 1
for child in searchResults:
DocumentId = child.attrib['DocumentId']
DocumentNumber = child.find('DocumentNumber').text
DocumentTitle = child.find('Title').text
DocumentFilename = child.find('Filename').text
if not DocumentFilename == None:
DocumentFilename, DocumentExtension = os.path.splitext(DocumentFilename)
if not os.path.exists(XmlSave_location+DocumentNumber+"_"+DocumentVersionNumber+DocumentExtension):
print str(y2) + " of " + str(y1) + " has been downloaded"
y2 = y2+ 1
print DocumentId
print DocumentNumber
print DocumentExtension
print ""
call_command = [AconexTestAPI2_location,DocumentId,XmlSave_location,DocumentNumber+"_"+DocumentVersionNumber+DocumentExtension]
subprocess.call(call_command)
x = random.randint(3,10)
time.sleep(x)
@davidjohnhewlett
Copy link
Author

Muy buen trabajo..... me podrías indicar si esto permite bajar todos los correos de un proyecto en aconex? y si quiero bajar todos los documentos cargados... cual código me puede ayudar...
desde ya muchas gracias.

hey, afraid this was done a long time ago, so I don't really remember the details (and Aconex already had much better ways of doing this, just they were disabled on my project so I was working around those limitations). The .exe files were compiled C# files - which are the other gists on the page. If I remember correctly it was the easiest way to run a C# script from python, which is what Aconex had provided documentation for. A bit more detail is here https://davidjohnhewlett.co.uk/?p=458.

Sorry not to be more helpful, but I'm a structural engineer rather than a developer - so I basically make code until it works for my purpose, rather than it being any good!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment