Last active
August 29, 2015 14:05
-
-
Save bond95/7d2f5c4a25fa97b61807 to your computer and use it in GitHub Desktop.
Corly import script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import sys | |
import requests | |
import os.path | |
import json | |
import time | |
from corlyapi import CorlyAPI | |
import tempfile | |
MB = 1048576 | |
def send_request(req): | |
response = json.loads(requests.get(req).text) | |
if response['IsValid'] == 'false': | |
print response['Errors'][0] | |
exit(0) | |
else: | |
return response | |
def show_errors(errors): | |
for err in errors: | |
print err | |
exit(0) | |
user_name = sys.argv[1] | |
password = sys.argv[2] | |
plugin = sys.argv[3] | |
project = sys.argv[4] | |
file_name = sys.argv[5] | |
user = CorlyAPI() | |
if os.path.isfile("current.token"): | |
t = open("current.token", "r") | |
temp_user_name = t.readline() | |
temp_time = t.readline() | |
if temp_user_name == user_name and (temp_time+3600*24) > time.time(): | |
user.token = t.readline() | |
user.isValid = True | |
t.close() | |
if not user.isValid: | |
if user.auth(user_name, password): | |
t = open("current.token", "w") | |
t.write("%s\n%s\n%s" % (user_name, user.last_response['Result']['CreationTime'], user.token)) | |
t.close() | |
print "Logged in" | |
else: | |
for error in user.errors: | |
print error | |
exit(0) | |
f = open(file_name, "r") | |
counter = 0 | |
if os.path.getsize(file_name) > MB: | |
temp_folder = tempfile.gettempdir() | |
session_id = user.create_upload_session(plugin, project) | |
temp_filename = os.path.basename(file_name) | |
if session_id: | |
while 1: | |
data = f.read(MB) | |
if data == "": | |
break | |
n_file_name = "%s/%s.%d" % (temp_folder, temp_filename, counter) | |
out_f = open(n_file_name, "w") | |
out_f.write(data) | |
out_f.close() | |
counter += 1 | |
print "Start upload: %s" % temp_filename | |
for i in range(0, counter): | |
n_file_name = "%s/%s.%d" % (temp_folder, temp_filename, i) | |
if user.upload_with_session(n_file_name, session_id, i): | |
print "Part %d: OK" % i | |
else: | |
print "Part %d: %s" % (i, user.errors[0]) | |
os.remove(n_file_name) | |
if user.end_upload_session(session_id): | |
print "Upload finished Successful" | |
else: | |
print show_errors(user.errors) | |
else: | |
print show_errors(user.errors) | |
else: | |
print "Start upload: %s" % file_name | |
if user.upload(plugin, project, file_name): | |
print "Upload finished Successful" | |
else: | |
print show_errors(user.errors) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import json | |
import requests | |
class CorlyAPI: | |
URL = "http://corly-verifit.rhcloud.com/method/" | |
token = "" | |
isValid = False | |
errors = list() | |
last_response = dict() | |
def auth(self, username, password): | |
resp = self.send_request("auth", "login", {"username": username, "password": password}) | |
if resp['IsValid']: | |
self.token = resp['Result']['TokenKey'] | |
return self.check_valid(resp) | |
def create_upload_session(self, plugin, project): | |
resp = self.send_request("import", "start", {"plugin": plugin, "project": project, "token_key": self.token}) | |
if self.check_valid(resp): | |
return resp['Result']['SessionId'] | |
else: | |
return False | |
def end_upload_session(self, session_id): | |
resp = self.send_request("import", "end", {"sessionid": session_id}) | |
return self.check_valid(resp) | |
def upload(self, plugin, project, filename): | |
return self.upload_base({"plugin": plugin, "project": project, "token_key": self.token}, filename) | |
def upload_with_session(self, filename, session_id, part): | |
return self.upload_base({"sessionid": session_id, "part": part}, filename) | |
def upload_base(self, params, filename): | |
resp = self.send_file(params, filename) | |
return self.check_valid(resp) | |
def send_request(self, method, function, params): | |
str_values = "&".join(["%s=%s" % (k, v) for k, v in params.items()]) | |
request = "%s%s.%s?%s" % (self.URL, method, function, str_values) | |
resp = json.loads(requests.get(request).text) | |
return resp | |
def send_file(self, params, filename): | |
str_values = "&".join(["%s=%s" % (k, v) for k, v in params.items()]) | |
files = {'file': open(filename, 'rb')} | |
request = "%simport.upload?%s" % (self.URL, str_values) | |
resp = requests.post(request, files=files).json() | |
return resp | |
def check_valid(self, resp): | |
self.last_response = resp | |
if not resp['IsValid']: | |
self.errors = resp['Errors'] | |
self.isValid = resp['IsValid'] | |
return self.isValid |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment