Last active
October 4, 2019 22:20
-
-
Save utkjad/145eb1111d1dd258e8b110f68b3b3b63 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import re | |
import base64 | |
import logging | |
import unicodecsv as csv | |
import sys | |
import argparse | |
import ssl | |
import os | |
import random | |
import pytest | |
from TCLIService import ttypes | |
from TCLIService.ttypes import TOperationState | |
from thrift.transport.THttpClient import THttpClient | |
from pyhive import hive | |
from pyhive.exc import OperationalError | |
from cryptography.fernet import Fernet | |
from datetime import datetime, timedelta | |
import time | |
def get_schema_header(cursor): | |
""" Get result result schema atrributes of the Hive Query | |
Arguments: | |
cursor (:obj:`<class HiveConnection.cursor>`) : Cursor object which contains data | |
Returns: | |
columns (:obj:`List[Str]`) : List of Attributes | |
""" | |
# Get the Schema | |
req = ttypes.TGetResultSetMetadataReq(cursor._operationHandle) | |
response = cursor._connection.client.GetResultSetMetadata(req) | |
columns = response.schema.columns | |
columns = [column.columnName for column in columns] | |
return columns | |
def write_to_file(cursor, path, partition_by=[], exclude=[], type='json'): | |
""" Get the cursor from existing connection and write the data with partitions | |
Arguments: | |
cursor {Pyhive Cursor} -- Pyhve Cursor Class | |
path {string} -- Parent path where data is to be stored | |
Keyword Arguments: | |
partition_by {List of Str} -- Partition by these strings (default: {[]}) | |
type {str} -- File format to store data. Only JSON and CSV Supported (default: {'json'}) | |
""" | |
result = cursor.fetchall() | |
columns = get_schema_header(cursor) | |
columns_to_write = [] | |
for i in columns: | |
if i in exclude: | |
continue | |
else: | |
columns_to_write.append(i) | |
# columns = columns_to_write | |
if partition_by: | |
for entity in partition_by: | |
if entity not in columns: | |
raise Exception( | |
"Trying to partition by {0}, but not present in {1}".format(entity, columns)) | |
# create parent folder | |
# parent = pathlib.Path(path) | |
# parent.mkdir(parents=True, exist_ok=True) | |
if not os.path.exists(path): | |
os.makedirs(path) | |
if (type == 'json'): | |
def serialize_date_to_string(o): | |
""" Serialize datetime.datetime object to string | |
Args: | |
o (:obj:`<class datetime.datetime>`): Object to be converted to string | |
Returns: | |
:obj:`str` : String format of datetime.datetime | |
""" | |
if isinstance(o, datetime): | |
return o.__str__() | |
elif isinstance(o, bytes): | |
return o.decode() | |
else: | |
return o | |
for row in result: | |
json_data = dict(zip(columns, row)) | |
if exclude: | |
for key in exclude: | |
del json_data[key] | |
location = "" | |
for entity in partition_by: | |
location = location + "/" + entity + \ | |
"=" + str(json_data[entity]) | |
# temp_pathlib = pathlib.Path(str(parent) + location) | |
# temp_pathlib.mkdir(parents=True, exist_ok=True) | |
if not os.path.exists(path + location): | |
os.makedirs(path + location) | |
import string | |
import random | |
letters = string.ascii_lowercase | |
file_name = ''.join(random.choice(letters) | |
for i in range(5)) + ".json" | |
# file_path = temp_pathlib / file_name | |
# with file_path.open('w') as fp: | |
# json.dump(json_data, fp, default=serialize_date_to_string) | |
file_path = path + location + "/" + file_name | |
with open(file_path, 'w') as fp: | |
json.dump(json_data, fp, default=serialize_date_to_string) | |
elif(type == 'csv'): | |
def serialize_date_to_string(o): | |
""" Serialize datetime.datetime object to string | |
Args: | |
o (:obj:`<class datetime.datetime>`): Object to be converted to string | |
Returns: | |
:obj:`str` : String format of datetime.datetime | |
""" | |
if isinstance(o, datetime): | |
return o.__str__() | |
elif isinstance(o, bytes): | |
return o.decode() | |
else: | |
return o | |
idx = 0 | |
columns_to_place = {} | |
for i in columns: | |
columns_to_place[i] = idx | |
idx += 1 | |
for row in result: | |
location = "" | |
for entity in partition_by: | |
location = location + "/" + entity + "=" + \ | |
str(row[columns_to_place[entity]]) | |
# temp_pathlib = pathlib.Path(str(parent) + location) | |
# temp_pathlib.mkdir(parents=True, exist_ok=True) | |
if not os.path.exists(path + location): | |
os.makedirs(path + location) | |
import string | |
import random | |
letters = string.ascii_lowercase | |
file_name = ''.join(random.choice(letters) | |
for i in range(5)) + ".csv" | |
file_path = path + location + "/" + file_name | |
temp_row = [] | |
tmp_exclude_idx = [columns_to_place[x] for x in exclude] | |
tmp_exclude_elements = [row[j] for j in tmp_exclude_idx] | |
jdx = 0 | |
for i in row: | |
if jdx in tmp_exclude_idx: | |
jdx += 1 | |
continue | |
else: | |
jdx += 1 | |
temp_row.append(i) | |
with open(file_path, 'wb') as out: | |
csv_out = csv.writer(out, quoting=csv.QUOTE_NONNUMERIC) | |
csv_out.writerow(columns_to_write) | |
write_row = list(map(serialize_date_to_string, temp_row)) | |
csv_out.writerow(write_row) | |
# with file_path.open('wb') as out: | |
# csv_out = csv.writer(out, quoting=csv.QUOTE_NONNUMERIC) | |
# csv_out.writerow(columns) | |
# write_row = list(map(serialize_date_to_string, row)) | |
# csv_out.writerow(write_row) | |
else: | |
raise Exception( | |
"Only JSON and CSV supported. Type {0} is not!".format(type)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You need to remove reference of pytest
Is there a way i diff earlier vs this snippet