Skip to content

Instantly share code, notes, and snippets.

@utkjad
Last active October 4, 2019 22:20
Embed
What would you like to do?
import json
import re
import base64
import logging
import unicodecsv as csv
import sys
import argparse
import ssl
import os
import random
import pytest
from TCLIService import ttypes
from TCLIService.ttypes import TOperationState
from thrift.transport.THttpClient import THttpClient
from pyhive import hive
from pyhive.exc import OperationalError
from cryptography.fernet import Fernet
from datetime import datetime, timedelta
import time
def get_schema_header(cursor):
""" Get result result schema atrributes of the Hive Query
Arguments:
cursor (:obj:`<class HiveConnection.cursor>`) : Cursor object which contains data
Returns:
columns (:obj:`List[Str]`) : List of Attributes
"""
# Get the Schema
req = ttypes.TGetResultSetMetadataReq(cursor._operationHandle)
response = cursor._connection.client.GetResultSetMetadata(req)
columns = response.schema.columns
columns = [column.columnName for column in columns]
return columns
def write_to_file(cursor, path, partition_by=[], exclude=[], type='json'):
""" Get the cursor from existing connection and write the data with partitions
Arguments:
cursor {Pyhive Cursor} -- Pyhve Cursor Class
path {string} -- Parent path where data is to be stored
Keyword Arguments:
partition_by {List of Str} -- Partition by these strings (default: {[]})
type {str} -- File format to store data. Only JSON and CSV Supported (default: {'json'})
"""
result = cursor.fetchall()
columns = get_schema_header(cursor)
columns_to_write = []
for i in columns:
if i in exclude:
continue
else:
columns_to_write.append(i)
# columns = columns_to_write
if partition_by:
for entity in partition_by:
if entity not in columns:
raise Exception(
"Trying to partition by {0}, but not present in {1}".format(entity, columns))
# create parent folder
# parent = pathlib.Path(path)
# parent.mkdir(parents=True, exist_ok=True)
if not os.path.exists(path):
os.makedirs(path)
if (type == 'json'):
def serialize_date_to_string(o):
""" Serialize datetime.datetime object to string
Args:
o (:obj:`<class datetime.datetime>`): Object to be converted to string
Returns:
:obj:`str` : String format of datetime.datetime
"""
if isinstance(o, datetime):
return o.__str__()
elif isinstance(o, bytes):
return o.decode()
else:
return o
for row in result:
json_data = dict(zip(columns, row))
if exclude:
for key in exclude:
del json_data[key]
location = ""
for entity in partition_by:
location = location + "/" + entity + \
"=" + str(json_data[entity])
# temp_pathlib = pathlib.Path(str(parent) + location)
# temp_pathlib.mkdir(parents=True, exist_ok=True)
if not os.path.exists(path + location):
os.makedirs(path + location)
import string
import random
letters = string.ascii_lowercase
file_name = ''.join(random.choice(letters)
for i in range(5)) + ".json"
# file_path = temp_pathlib / file_name
# with file_path.open('w') as fp:
# json.dump(json_data, fp, default=serialize_date_to_string)
file_path = path + location + "/" + file_name
with open(file_path, 'w') as fp:
json.dump(json_data, fp, default=serialize_date_to_string)
elif(type == 'csv'):
def serialize_date_to_string(o):
""" Serialize datetime.datetime object to string
Args:
o (:obj:`<class datetime.datetime>`): Object to be converted to string
Returns:
:obj:`str` : String format of datetime.datetime
"""
if isinstance(o, datetime):
return o.__str__()
elif isinstance(o, bytes):
return o.decode()
else:
return o
idx = 0
columns_to_place = {}
for i in columns:
columns_to_place[i] = idx
idx += 1
for row in result:
location = ""
for entity in partition_by:
location = location + "/" + entity + "=" + \
str(row[columns_to_place[entity]])
# temp_pathlib = pathlib.Path(str(parent) + location)
# temp_pathlib.mkdir(parents=True, exist_ok=True)
if not os.path.exists(path + location):
os.makedirs(path + location)
import string
import random
letters = string.ascii_lowercase
file_name = ''.join(random.choice(letters)
for i in range(5)) + ".csv"
file_path = path + location + "/" + file_name
temp_row = []
tmp_exclude_idx = [columns_to_place[x] for x in exclude]
tmp_exclude_elements = [row[j] for j in tmp_exclude_idx]
jdx = 0
for i in row:
if jdx in tmp_exclude_idx:
jdx += 1
continue
else:
jdx += 1
temp_row.append(i)
with open(file_path, 'wb') as out:
csv_out = csv.writer(out, quoting=csv.QUOTE_NONNUMERIC)
csv_out.writerow(columns_to_write)
write_row = list(map(serialize_date_to_string, temp_row))
csv_out.writerow(write_row)
# with file_path.open('wb') as out:
# csv_out = csv.writer(out, quoting=csv.QUOTE_NONNUMERIC)
# csv_out.writerow(columns)
# write_row = list(map(serialize_date_to_string, row))
# csv_out.writerow(write_row)
else:
raise Exception(
"Only JSON and CSV supported. Type {0} is not!".format(type))
@zscalerspark
Copy link

You need to remove reference of pytest
Is there a way i diff earlier vs this snippet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment