Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rms1000watt/1db70653a968d2667d71ce81c48c2528 to your computer and use it in GitHub Desktop.
Save rms1000watt/1db70653a968d2667d71ce81c48c2528 to your computer and use it in GitHub Desktop.
Python Tornado Web Class for Async Zip/XML/SQL Manipulation and Browser Automation
import os
import re
import json
import ryan
import time
import redis
import shutil
import zipfile
import datetime
import threading
import tornado.web
import tornado.ioloop
from splinter import Browser
""" Setup the server and handlers and everything here... """
class GenerateReportHandler(BaseHandler):
@tornado.web.asynchronous
def post(self):
try:
currentUser = json.loads(self.get_current_user())
payload = json.loads(self.request.body)
whereColumn = payload['whereColumn']
whereValues = payload['whereValues']
fileName = payload['fileName']
except Exception as e:
l.log('POST GENERATEREPORT: FAILURE: BAD_PAYLOAD')
self.write(json.dumps(FailureMessages['BAD_PAYLOAD']))
self.finish()
return
self.guid = ryan.randomString()
l.log("Editing Template")
editTemplate(self.guid, fileName, whereColumn, whereValues.split(','))
self.r.set('report_%s' %(self.guid), 'false')
l.log("Uploading Report")
t = threading.Thread(target=uploadReport, args=(self.guid, self.r, ))
t.start()
self.count = 0
tornado.ioloop.IOLoop.instance().call_later(0.01, self.completeReportHandler)
def completeReportHandler(self):
v = self.r.get('report_%s' %(self.guid))
self.count += 1
if self.count == 60:
self.write(json.dumps(FAILURE_MESSAGES['INTERNAL_ERROR']))
self.finish()
self.r.delete('report_%s' %(self.guid))
return
if v == 'false':
tornado.ioloop.IOLoop.instance().call_later(0.5, self.completeReportHandler)
return
l.log("Returning Payload")
payload = {'success':True, 'data':{'reportName': self.guid}, 'error':''}
self.write(json.dumps(payload))
self.finish()
self.r.delete('report_%s' %(self.guid))
def editTemplate(guid, fileName, whereColumn, whereValues):
EXTRACT_BASE = './extract'
EXTRACT_DIR = '%s/%s' %(EXTRACT_BASE, guid)
ZIP_FILES = [EXTRACT_DIR + '/' + v for v in ['[Content_Types].xml', 'definition.xml']]
TEMPLATE_PATH = './upload/%s' %(fileName)
ZIP_PATH = './reports/%s.trdp' %(guid)
try: os.mkdir(EXTRACT_BASE)
except: pass
try: os.mkdir(EXTRACT_DIR)
except: pass
zf = zipfile.ZipFile(TEMPLATE_PATH)
zf.extractall(EXTRACT_DIR)
zf.close()
with open('%s/definition.xml' %(EXTRACT_DIR), 'rb') as f:
xmlString = f.read()
m = re.search(r'SelectCommand="(.*?)"', xmlString)
sql = m.group(1) if m else False
if not sql:
print 'Error!'
return
sql = "%s WHERE %s in (%s)" %(sql, whereColumn, ','.join(whereValues))
print sql
xmlAttribute = 'SelectCommand="%s"' %(sql)
newXML = re.sub(r'SelectCommand="(.*?)"', xmlAttribute, xmlString)
with open('%s/definition.xml' %(EXTRACT_DIR), 'wb') as f:
f.write(newXML)
zf = zipfile.ZipFile(ZIP_PATH, 'w', zipfile.ZIP_DEFLATED)
for file in ZIP_FILES:
zf.write(file, file.split('/')[-1])
zf.close()
shutil.rmtree('%s/%s' %(EXTRACT_BASE, guid))
def uploadReport(guid, r):
# DRIVER = 'chrome'
# DRIVER = 'firefox'
DRIVER = 'phantomjs'
try:
t0 = time.time()
b = Browser(DRIVER)
l.log('Browser Open')
url = 'http://www.urlhere.com/path/here'
b.visit(url)
b.fill('Username', 'USERNAME')
b.fill('Password', 'PASSWORD')
button = b.find_by_value('Log in').click()
if not b.is_text_present('Reports'):
print "FAIL1"
return
button = b.find_by_id('uploadReportButton').click()
if not b.is_text_present('Upload Report'):
print "FAIL2"
return
b.fill('reportName', guid)
if DRIVER == 'firefox':
b.find_by_xpath('//*[@id="reportUploadForm"]/div[2]/div[2]/span').mouse_over()
b.find_by_xpath('//*[@id="reportUploadForm"]/div[2]/div[2]/span').click()
time.sleep(.5)
b.find_by_xpath('//*[@id="reportCategory_listbox"]/li[2]').click()
if DRIVER == 'chrome' or DRIVER == 'phantomjs':
b.find_by_xpath('//*[@id="reportUploadForm"]/div[2]/div[2]/span').mouse_over()
b.find_by_xpath('//*[@id="reportUploadForm"]/div[2]/div[2]/span').click()
for i in range(10):
try:
b.find_by_xpath('//*[@id="reportCategory_listbox"]/li[2]').click()
break
except: pass
b.attach_file('reportFile', '%s/reports/%s.trdp'%(CWD, guid))
# b.driver.save_screenshot('your_screenshot.png')
b.find_by_id('reportUploadSaveButton').click()
except Exception as e:
eln = ryan.getErrorLineNumber()
print "ERROR: LN: %s: MSG: %s" %(eln, e.message)
print e
finally:
r.set('report_%s' %(guid), 'true')
b.quit()
l.log("Total Time: %s" %(time.time()-t0))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment