Created
March 31, 2015 19:33
-
-
Save lemoogle/06aa7daa73b0cec91f70 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# table_def.py | |
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey, DateTime, Float | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import relationship, backref | |
Base = declarative_base() | |
metadata = MetaData() | |
######################################################################## | |
class Posts(Base): | |
"""""" | |
__tablename__ = "OH_Posts" | |
__table_args__ = {'schema': 'bi_tool_eval_snd'} | |
xOwner = Column(Integer, primary_key=True) | |
PostedTo = Column(Integer, primary_key=True) | |
Published = Column(DateTime, primary_key=True) | |
Message = Column(String) | |
#---------------------------------------------------------------------- | |
def __init__(self, xOwner, PostedTo, Published, Message): | |
"""""" | |
self.xOwner = Owner | |
self.PostedTo = PostedTo | |
self.Published = Published | |
self.Message = Message |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# IDOL on Demand, Vertica Bulk Sentiment update | |
# This utility uses IoD calls to apply Sentiment analysis to Data held on Vertica | |
# the results are then passed back up to Vertica in an associated table | |
# Latency for a single IoD call is about 1.5 seconds, therefore, to increase throughput the utility does the following: | |
# 1) It submits Asynchronous requests to IoD asking for the sentiment processing. A secondary call is then | |
# submitted to return the results | |
# 2) It uses multiprocessing - creating a worker pool of sub processes which perform the actual web service calls to IoD | |
# 3) Finally - standard "inserts" into Vertica are slow, therefore this utility makes use of a (simplified) Python module which | |
# takes the output data, writes it to a temporary file and then uses VSQL COPY to post into Vertica | |
# Using these techniques - approximate throughput is about 5 sentiment posts per second | |
# Object Relational Mapper SQL Alchemy used | |
# This turns Tables into Python objects. Although this makes | |
# manipulating tables very intuitive and Python like it doesn't lend itself to generic table utiliies | |
# and may be overkill for this purpose | |
from sqlalchemy import create_engine, ForeignKey | |
from sqlalchemy import Column, Date, Integer, String | |
from sqlalchemy.orm import relationship, backref, sessionmaker | |
from multiprocessing import Pool | |
import logging | |
import time | |
# Requests provides a helper class for submitting HTTP calls | |
import requests | |
import json | |
# Additional modules. Table definitions for SQLAlchemy and Vertica upload utilities | |
from table_def import Posts | |
# Simplified version used which works on Windows | |
#from pyvertica.batch import VerticaBatch | |
from VerticaBatchSimplified import VerticaBatch | |
# key used to allocate services on IoD | |
# IOD operates using an APIKey to allocate services | |
apikey="xxx-xxx-xxx-xxx" | |
# base URL fro IoD | |
# every service from IoD has the same format, and has a synchronous and asynchronous version | |
url="http://api.idolondemand.com/1/api/sync/{}/v1" | |
Asyncurl="http://api.idolondemand.com/1/api/async/{}/v1" | |
# if the ASYNC version is used, it returns a JOB ID as a handle. This JOB ID can be used to fetch results using the URL below | |
JobResulturl="http://api.idolondemand.com/1/job/result/{}" | |
# Simple log writer - outputs a message in standard format with a timer | |
# On each call it "resets" the timer so that elapsed time between calls can be recorded | |
def WriteTimerLog(LogMessage,tStartClock=0,tEndClock=0): | |
if tStartClock==0: | |
tStartCock=time.clock() | |
tEndClock = time.clock() | |
logging.critical(LogMessage + ': %s',"{0:.2f}".format(tEndClock - tStartClock)) | |
# Reset timer by making END time the START time | |
tStartClock=tEndClock | |
return tStartClock | |
# General routine for calling IDOL on Demand services | |
def postrequests(callingurl,function,data={},files={}): | |
# Put API Key into URL | |
data["apikey"]=apikey | |
# Insert particular IOD service into URL | |
callurl=callingurl.format(function) | |
# Call IoD and return results | |
success=True | |
try: | |
r=requests.post(callurl,data=data,files=files,timeout=5) | |
returned_status_code=r.status_code | |
returned_json=r.json() | |
if r.status_code != 200: | |
success=False | |
# Trap timeouts - fairly common | |
except requests.exceptions.Timeout as e: | |
success=False | |
returned_status_code=-1 | |
returned_json='Timeout' | |
return success, returned_status_code, returned_json | |
# Call asynchronous API for sentiment and return jobID | |
def asynchCallIODSentimentAPI(Instance): | |
# These functions are called as sub-processes and therefore | |
# don't have access to the assets of the main routine, | |
# including the logger. So this has to be re-allocated. | |
# pick your log file location of choice | |
logging.basicConfig(filename="C:\Coding\Python2.log", | |
level=logging.CRITICAL, format='%(asctime)s %(message)s') | |
# Start a Sentiment request | |
success, status_code, AsyncResponse=postrequests( | |
Asyncurl,'analyzesentiment',{'text':Instance['Message']}) | |
if success: | |
# This Job ID will be used to fetch the results later | |
Instance['jobID'] = AsyncResponse['jobID'] | |
else: | |
Instance['jobID'] = '-' | |
logging.critical('ERROR - Asynch request failed, Status code %s, Response %s', | |
status_code, AsyncResponse) | |
return Instance | |
# Call asynchronous API for sentiment with jobID and return results | |
def asynchReturnIODSentimentAPI(Instance): | |
# as above, pick your logfile location of choice | |
logging.basicConfig(filename="C:\Coding\Python2.log", | |
level=logging.CRITICAL, format='%(asctime)s %(message)s') | |
# Get Sentiment from IoD | |
success, status_code, results=postrequests(JobResulturl,Instance['jobID'],{}) | |
results_string = json.dumps(results) | |
if success: | |
# Extract sentiment results from response by walking JSON | |
# Job results can contain N actual results so the JSON has to | |
# be "walked" to get to the sentiment results | |
Actions=results["actions"] | |
SentimentResult=Actions[0] | |
SentimentResult=SentimentResult["result"] | |
SentimentResult_string=json.dumps(SentimentResult) | |
aggregatelist=SentimentResult["aggregate"] | |
# Lots of detail is returned however only 3 elements are currently stored | |
# The overall Sentiment score, the overall sentiment and the full results | |
# (with all topics etc) in case more detail is needed later | |
Instance['AggregateScore']=aggregatelist["score"] | |
Instance['AggregateSentiment']=aggregatelist["sentiment"] | |
Instance['SentimentResults']=SentimentResult_string | |
Instance.update({'Sentiment Retrieved':True}) | |
else: | |
Instance['AggregateScore']=-1 | |
Instance['AggregateSentiment']='-' | |
Instance['SentimentResults']='-' | |
logging.critical('ERROR - Asynch response failed, Status code %s, Response %s', | |
status_code, results) | |
Instance.update({'Sentiment Retrieved':False}) | |
return Instance | |
# Multiprocessing is incredibly straightforward in Python | |
# The "Pool" command creates N worker processes ready for work then | |
# the "Map" function starts each process off with the function to call and the parameters to use | |
# The "Map" function then waits for all processes to complete and compiles the results into a single list | |
# This is the pair of asnych calls, one to start the requests and one to fetch results | |
def SubmitParallelAsyncSentimentRequests(APIArray, WorkerProcesses): | |
with Pool(processes=WorkerProcesses) as urlPool: | |
APIArray = urlPool.map(asynchCallIODSentimentAPI, APIArray) | |
return APIArray | |
def SubmitParallelAsyncSentimentResponses(APIArray, WorkerProcesses): | |
with Pool(processes=WorkerProcesses) as urlPool: | |
APIArray = urlPool.map(asynchReturnIODSentimentAPI, APIArray) | |
return APIArray | |
# Used for posting to Vertica | |
# Note, simplified version used here, for Linux installs I recommend using the original | |
# VerticaBatch module from PyVertica as it is more flexible | |
def OpenVerticaBatch(): | |
batch = VerticaBatch( | |
odbc_kwargs={'dsn': '<dsn here>'}, | |
table_name=',<table here>', | |
truncate_table=False, | |
reconnect=False, | |
column_list=['column list here>'], | |
copy_options={ | |
'DELIMITER': ',', | |
'REJECTEDFILE': False | |
} | |
) | |
return batch | |
#Takes the array of Asynch results and posts them | |
def PostJobResults(batch, ResultArray=[]): | |
Row=[] | |
RowList=[] | |
# Build Array | |
for Instance in ResultArray: | |
# output table has same 3 part compound key as source table | |
Row.append(Instance['xOwner']) | |
Row.append(Instance['PostedTo']) | |
Row.append(Instance['Published']) | |
Row.append(Instance['Message']) | |
# Row.append('-') | |
Row.append(Instance['AggregateScore']) | |
Row.append(Instance['AggregateSentiment']) | |
Row.append(Instance['SentimentResults']) | |
RowList.append(Row) | |
batch.insert_list(Row) | |
Row=[] | |
batch.post_rows() | |
batch.commit() | |
return [] | |
def CountRetrievals(Array,CheckEntry): | |
# Loops through array looking for successful retrievals | |
SuccessfulCount=0 | |
for Instance in Array: | |
if Instance[CheckEntry]: | |
SuccessfulCount +=1 | |
return SuccessfulCount | |
def Main(): | |
# Control values - these will be made into parameters | |
# Limit on Rows for testing | |
RowLimit=200 | |
# Parallel data structures | |
# Number of asynchronous calls to make | |
RowstoProcess=100 | |
# Number of workers | |
WorkerProcesses=8 | |
# Array which will hold the set of results | |
APIArray=[] | |
# Indivdual instance | |
APIInstance={} | |
LoopCheck=0 | |
# Timer function | |
tStartClock=0 | |
tEndClock=0 | |
# Initialise Logging | |
# Pick your logging file | |
logging.basicConfig(filename="C:\Coding\Python_3.log",level=logging.CRITICAL, format='%(asctime)s %(message)s') | |
logging.critical('Initialise, %s row limit, %s inner loop, %s Worker pool', RowLimit, RowstoProcess, WorkerProcesses) | |
tStartClock=WriteTimerLog('Starting Timer',tStartClock,tEndClock) | |
# used to log total elapsed time | |
tStartAll = time.clock() | |
# use DSN for connection | |
# XXXX to be removed XXXX | |
engine = create_engine('vertica+pyodbc://<connection details here>') | |
# Create the connection for posting to Vertica | |
# Note there are 2 Vertica Connections, one to read in the original text for sentiment analysis | |
# and one for batch uploading the sentiment results | |
batch=OpenVerticaBatch() | |
# clear down table | |
engine.execute('truncate table bi_tool_eval_snd.OH_Posts_Sentiment') | |
# create a Session | |
Session = sessionmaker(bind=engine) | |
session = Session() | |
tStartClock=WriteTimerLog('Opening Input Table, elapsed time',tStartClock,tEndClock) | |
# Fetch all rows to begin process | |
PostRows = session.query(Posts).limit(RowLimit) | |
tStartClock=WriteTimerLog('Rows fetched, elapsed time',tStartClock,tEndClock) | |
tProcessAfterStart = time.clock() | |
logging.critical('Entering Main Loop') | |
RowCount=0 | |
RowSentimentRetrieved=0 | |
BlankMessageCount=0 | |
for PostInstance in PostRows: | |
# Loop will make use of multi-processing | |
# "RowsToProcess" rows will be read and the data to be processed written to APIArray | |
# Once conplete 2 sets of parallel processing jobs are set up | |
# The first calls and the Aynchronous "AnalyseSentiment" is called | |
# the ReturnedJobID is then stored in an array. | |
# The second then processes the job ID and fetches the results | |
# then a final routine will post the results to the database. | |
# The Loop restarts and the routine fetches the next "RowstoProcess" rows | |
RowCount +=1 | |
if len(PostInstance.Message) == 0: | |
BlankMessageCount+=1 | |
if len(PostInstance.Message) > 0: | |
# Load up APIArray | |
# This is the major data structure, it holds: | |
# The incoming primary key values and the message to be processed | |
# Once the jobs are submiited the structure holds: | |
# The Job ID used in the asynchronous IoD calls | |
# Once the results are retrieved the structure holds: | |
# The results of the Sentiment analysis | |
# The aggregate score | |
# The aggregate sentiment | |
APIInstance={'xOwner': PostInstance.xOwner, | |
'PostedTo': PostInstance.PostedTo, | |
'Published': PostInstance.Published, | |
'Message': PostInstance.Message, | |
'jobID': '-', | |
'AggregateScore': 0, | |
'AggregateSentiment': '-', | |
'SentimentResults': '-'} | |
# Build up array of requests to process | |
APIArray.append(APIInstance) | |
LoopCheck += 1 | |
# Once limit reached - submit asynchronous processes | |
if LoopCheck == RowstoProcess: | |
tStartClock=WriteTimerLog('Array Load Complete, elapsed time',tStartClock,tEndClock) | |
# start worker processes | |
APIArray=SubmitParallelAsyncSentimentRequests(APIArray, WorkerProcesses) | |
tStartClock=WriteTimerLog('Parallel Processes- Submit Requests - Complete, elapsed time',tStartClock,tEndClock) | |
APIArray=SubmitParallelAsyncSentimentResponses(APIArray, WorkerProcesses) | |
tStartClock=WriteTimerLog('Parallel Processes - Fetch Responses - Complete, elapsed time',tStartClock,tEndClock) | |
# Count successful responses | |
RowSentimentRetrieved=RowSentimentRetrieved + CountRetrievals(APIArray,'Sentiment Retrieved') | |
# Post entries to Database | |
APIArray = PostJobResults(batch,APIArray) | |
tStartClock=WriteTimerLog('Rows posted complete, elapsed time',tStartClock,tEndClock) | |
logging.critical('Rows read - %s, Rows rejected - %s, Rows with Sentiment Retrieved - %s ', RowCount, BlankMessageCount, RowSentimentRetrieved) | |
LoopCheck=0 | |
#On exit from loop do a final post | |
tStartClock=WriteTimerLog('Exiting Loop, elapsed time',tStartClock,tEndClock) | |
# start worker processes | |
APIArray=SubmitParallelAsyncSentimentRequests(APIArray, WorkerProcesses) | |
APIArray=SubmitParallelAsyncSentimentResponses(APIArray, WorkerProcesses) | |
# Count successful responses | |
RowSentimentRetrieved=RowSentimentRetrieved + CountRetrievals(APIArray,'Sentiment Retrieved') | |
APIArray=PostJobResults(batch,APIArray) | |
tStartClock=WriteTimerLog('Final rows posted, elapsed time',tStartClock,tEndClock) | |
session.close() | |
tClose = time.clock() | |
logging.critical('Complete - %s process, %s total, %s Rows per Second', "{0:.2f}".format(tClose - tProcessAfterStart), "{0:.2f}".format(tClose - tStartAll), "{0:.2f}".format(RowLimit/tClose - tStartAll)) | |
logging.critical('Rows read - %s, Rows rejected - %s, Rows with Sentiment Retrieved - %s ', RowCount, BlankMessageCount, RowSentimentRetrieved) | |
if __name__ == '__main__': | |
Main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#################### | |
# This code builds on the publicly available VerticaBatch code but simplifies the code somewhat, to take out the multithreading and | |
# so that it works on Windows | |
# However, you are advised to use the public VerticaBatch module from PyVertica if you are running on Linux | |
#################### | |
import codecs | |
import copy | |
import logging | |
import os | |
import tempfile | |
import threading | |
import taskthread | |
from queue import Queue | |
from functools import wraps | |
from pyvertica.connection import get_connection | |
logger = logging.getLogger(__name__) | |
# This is the primary Class | |
# It sets up a Vertica Connection. Essentially this class takes input, writes it to a | |
# text file then uses the VSQL COPY command to post the data into Vertica. This is by far the fastest | |
# mechanism for getting data into Vertica | |
class VerticaBatch(object): | |
""" | |
Object for writing multiple records to Vertica in a batch. | |
Usage example:: | |
from pyvertica.batch import VerticaBatch | |
batch = VerticaBatch( | |
odbc_kwargs={'dsn': 'VerticaDWH'}, | |
table_name='schema.my_table', | |
truncate=True, | |
column_list=['column_1', 'column_2'], | |
copy_options={ | |
'DELIMITER': ',', | |
}, | |
multi_batch=False | |
) | |
row_list = [ | |
['row_1_val_1', 'row_1_val_2'], | |
['row_2_val_1', 'row_2_val_2'], | |
... | |
] | |
for column_data_list in row_list: | |
batch.insert_list(column_data_list) | |
error_bool, error_file_obj = batch.get_errors() | |
if error_bool: | |
print error_file_obj.read() | |
batch.commit() | |
.. note:: It is also possible to call :py:meth:`~.VerticaBatch.commit` | |
multiple times (for example after every 50000 records). Please note | |
that after the first insert and after calling | |
:py:meth:`~.VerticaBatch.commit`, the output of | |
:py:meth:`~.VerticaBatch.get_errors` will reflect the new series of | |
inserts and thus not contain the "old" inserts. | |
.. note:: Creating a new batch object will not create a lock on the target | |
table. This will happen only after first insert. | |
.. note:: If a batch is created with ``multi_batch = True``, | |
:py:meth:`~.VerticaBatch.close_batch` must be explicity called when | |
the batch resources should be closed. If ``multi_batch`` is set to | |
``False``, :py:meth:`~.VerticaBatch.close_batch` need not be called. | |
In this case, while the batch is reusable, the system resources will | |
be realoccated uppon each :py:meth:`~.VerticaBatch.commit`, which | |
may not be desirable. | |
:param table_name: | |
A ``str`` representing the table name (including the schema) to write | |
to. Example: ``'staging.my_table'``. | |
:param odbc_kwargs: | |
A ``dict`` containing the ODBC connection keyword arguments. E.g.:: | |
{ | |
'dsn': 'TestDSN', | |
} | |
.. seealso:: https://code.google.com/p/pyodbc/wiki/Module | |
:param truncate_table: | |
A ``bool`` indicating if the table needs truncating before first | |
insert. Default: ``False``. *Optional*. | |
:param reconnect: | |
A ``bool`` passed to the connection object to decide if pyvertica | |
should directly reconnect to a random node to bypass a load balancer. | |
:param analyze_constraints: | |
A ``bool`` indicating if a ``ANALYZE_CONSTRAINTS`` startement should | |
be executed when getting errors. Default: ``True``. *Optional*. | |
:param column_list: | |
A ``list`` containing the columns that will be written. *Optional*. | |
:param copy_options: | |
A ``dict`` containing the keys to override. For a list of existing keys | |
and their defaults, see :py:attr:`~.VerticaBatch.copy_options_dict`. | |
*Optional*. | |
:param connection: | |
A ``pyodbc.Connection`` to use instead of opening a new connection. If | |
this parameter is supplied, ``odbc_kwargs`` may not be supplied. | |
Default: ``None``. *Optional*. | |
:param multi_batch: | |
A ``boolean`` to indicate if the batch should keep it's resources open | |
after a call to commit. If you plan to only call | |
:py:meth:`~.VerticaBatch.commit` one time, set this to false. | |
Otherwise, setting ``multi_batch=True`` will prevent the batch from | |
closing all of its resources. | |
Default: ``False``. *Optional*. | |
""" | |
copy_options_dict = { | |
'DELIMITER': ';', | |
'ENCLOSED BY': '"', | |
'SKIP': 0, | |
'NULL': '', | |
'RECORD TERMINATOR': '\x01', | |
'NO COMMIT': True, | |
'REJECTEDFILE': __debug__, | |
'REJECTMAX': 0, | |
} | |
""" | |
Default copy options for SQL query. | |
.. note:: By default ``REJECTEDFILE`` is set to ``__debug__``, which is | |
``True``, unless you've set the ``PYTHONOPTIMIZE`` environment variable. | |
""" | |
def __init__( | |
self, | |
table_name, | |
odbc_kwargs={}, | |
truncate_table=False, | |
reconnect=True, | |
analyze_constraints=True, | |
column_list=[], | |
copy_options={}, | |
connection=None, | |
multi_batch=False): | |
if connection and odbc_kwargs: | |
raise ValueError("May only specify one of " | |
"[connection, odbc_kwargs]") | |
self._odbc_kwargs = odbc_kwargs | |
self._table_name = table_name | |
self._column_list = column_list | |
self._analyze_constraints = analyze_constraints | |
self.copy_options_dict.update(copy_options) | |
self._batch_initialized = False | |
self._multi_batch = multi_batch | |
self._total_count = 0 | |
self._batch_count = 0 | |
self._in_batch = False | |
if not connection: | |
# make sure we are not logging any passwords :) | |
odbc_kwargs_copy = copy.deepcopy(odbc_kwargs) | |
if 'password' in odbc_kwargs_copy: | |
odbc_kwargs_copy['password'] = '*****' | |
logger.debug( | |
'Initializing VerticaBatch with odbc_kwargs={0}, ' | |
'table_name={1}, ' | |
'column_list={2}'.format( | |
odbc_kwargs_copy, table_name, column_list)) | |
self._connection = get_connection( | |
reconnect=reconnect, **self._odbc_kwargs) | |
else: | |
self._connection = connection | |
# truncate table, if needed | |
if truncate_table: | |
self._truncate_table() | |
# Open File objects | |
# Hard coded for now, needs to be made random | |
self._output_obj_path = 'C:\VerticaTemp\Rowset.txt' | |
self._output_obj = codecs.open(self._output_obj_path, 'w', 'utf-8') | |
# Build VSQL COPY command from class attributes | |
self.sql_query_str = self._get_sql_lcopy_str() | |
def commit(self): | |
""" | |
Commit the current transaction. | |
""" | |
self._connection.commit() | |
def post_rows(self): | |
# clear buffers prior to posting data | |
self._output_obj.flush() | |
os.fsync(self._output_obj) | |
self._connection.execute(self.sql_query_str) | |
# Clean up temporary table | |
self._output_obj.seek(0) | |
self._output_obj.truncate() | |
def insert_list(self, value_list): | |
""" | |
Insert a ``list`` of values (instead of a ``str`` representing a line). | |
Example:: | |
batch.insert_list(['value_1', 'value_2']) | |
:param value_list: | |
A ``list``. Each item should represent a column value. | |
""" | |
return self.insert_line(self._single_list_to_string(value_list)) | |
def _truncate_table(self): | |
""" | |
Truncate table. | |
""" | |
logger.info('Truncating table {0}'.format(self._table_name)) | |
self._cursor.execute('TRUNCATE TABLE {0}'.format(self._table_name)) | |
def _get_num_rejected_rows(self): | |
""" | |
Return the number of rejected rows. | |
:return: | |
An ``int``. | |
""" | |
if self._in_batch: | |
self._end_batch() | |
if not self.get_batch_count(): | |
return 0 | |
rejected_rows = self._cursor.execute('SELECT GET_NUM_REJECTED_ROWS()') | |
rejected_rows = rejected_rows.fetchone() | |
return rejected_rows[0] | |
def get_batch_count(self): | |
""" | |
Return number (``int``) of inserted items since last commit. | |
.. warning:: When using :py:meth:`~.VerticaBatch.insert_raw` this | |
value represents the number of raw ``str`` objects inserted, not | |
the number of lines! | |
:return: | |
An ``int``. | |
""" | |
return self._batch_count | |
def get_total_count(self): | |
""" | |
Return total number (``int``) of inserted items. | |
.. warning:: When using :py:meth:`~.VerticaBatch.insert_raw` this | |
value represents the number of raw ``str`` objects inserted, not | |
the number of lines! | |
:return: | |
An ``int``. | |
""" | |
return self._total_count | |
def _get_sql_lcopy_str(self): | |
""" | |
Get ``str`` representing the COPY query. | |
:return: | |
A ``str`` representing the query. | |
""" | |
# table name | |
output_str = 'COPY {0}'.format(self._table_name) | |
# columns, if available | |
if self._column_list: | |
output_str += ' ({0})'.format(', '.join(self._column_list)) | |
# fifo path | |
output_str += " FROM LOCAL '{0}'".format(self._output_obj_path) | |
# rejected file | |
if self.copy_options_dict['REJECTEDFILE']: | |
output_str += " REJECTED DATA '{0}'".format( | |
self._rejected_file_obj.name) | |
# other arguments which map one-to-one | |
for key in [ | |
'REJECTMAX', | |
'DELIMITER', | |
'ENCLOSED BY', | |
'SKIP', | |
'NULL', | |
'RECORD TERMINATOR']: | |
value = self.copy_options_dict[key] | |
if isinstance(value, int): | |
output_str += ' {0} {1}'.format(key, value) | |
elif isinstance(value, str): | |
output_str += " {0} '{1}'".format(key, value) | |
# NO COMMIT statement, which needs to be at the end | |
if self.copy_options_dict['NO COMMIT']: | |
output_str += ' NO COMMIT' | |
return output_str | |
def _single_list_to_string(self, | |
value_list, | |
suffix=None): | |
""" | |
Convert a single ``iterable`` to a string that represents one item | |
in the batch. | |
:param value_list: | |
An ``iterable``. Each item represents one column value | |
:param suffix: | |
A ``string``. If specified, this character will be appended | |
to the resulting string. | |
""" | |
enclosed_by = self.copy_options_dict['ENCLOSED BY'] | |
escaped_enclosed_by = '\\%s' % enclosed_by | |
suffix = suffix if suffix else '' | |
delimiter = self.copy_options_dict['DELIMITER'] | |
str_value_list = ( | |
'%s%s%s' % ( | |
enclosed_by, | |
# TPS - Python 3 is unicode by default so "unicode" function deprecated - use "str" instead | |
#unicode(value).replace(enclosed_by, escaped_enclosed_by), | |
str(value).replace(enclosed_by, escaped_enclosed_by), | |
enclosed_by | |
) | |
if value is not None else '' for value in value_list) | |
return delimiter.join(str_value_list) + suffix | |
def insert_lists(self, value_lists, row_count=1): | |
""" | |
Insert an ``iterable`` of ``iterable`` values (instead of a single | |
string). The iterables can be lists, generators, etc. | |
Example:: | |
batch.insert_lists([['key1', 'value1'], ['key2', 'value2'])) | |
:param value_lists: | |
An ``iterable``. Each iterable is another ``iterable`` containing | |
the values to insert. | |
:param row_count: | |
An ``int``. The number of rows being inserted. Since the | |
``value_lists`` parameter may be a generator, the number of | |
rows is not easily determinable. Therefore, the number of | |
rows being inserted must be specified. | |
""" | |
suffix = self.copy_options_dict['RECORD TERMINATOR'] | |
strings = (self._single_list_to_string(value_list, | |
suffix=suffix) | |
for value_list in value_lists) | |
self._fifo_obj.write( | |
"".join(strings) | |
) | |
self._total_count += row_count | |
self._batch_count += row_count | |
def insert_line(self, line_str): | |
""" | |
Insert a ``str`` containing all the values. | |
This is useful when inserting lines directly from a CSV file for | |
example. | |
.. note:: When you have a loghandler with ``DEBUG`` level, every query | |
will be logged. For performance reason, this log statement is only | |
executed when ``__debug__`` equals ``True`` (which is the default | |
case). For a better performance, you should invoke the Python | |
interpreter with the ``-O`` argument or set the environment | |
variable ``PYTHONOPTIMIZE`` to something. | |
Example:: | |
batch.insert_line('"value_1";"value_2"') | |
:param line_str: | |
A ``str`` representing the line to insert. Make sure the ``str`` | |
is formatted according :py:attr:`~.VerticaBatch.copy_options_dict`. | |
Example: ``'"value1";"value2";"value3"'``. | |
""" | |
if __debug__: | |
logger.debug(u'Inserting line: {0}'.format(line_str)) | |
self._output_obj.write( | |
line_str + self.copy_options_dict['RECORD TERMINATOR']) | |
self._total_count += 1 | |
self._batch_count += 1 | |
def rollback(self): | |
""" | |
Rollback the current transaction. | |
""" | |
if self._in_batch: | |
self._end_batch() | |
self._connection.rollback() | |
logger.info('Transaction rolled back') | |
def get_cursor(self): | |
""" | |
Return a cursor to the database. | |
This is useful when you want to add extra data within the same | |
transaction of the batch import. | |
:return: | |
Instance of :py:class:`!pyodbc.Cursor`. | |
""" | |
return self._connection.cursor() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment