Skip to content

Instantly share code, notes, and snippets.

@lemoogle
Created March 31, 2015 19:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lemoogle/06aa7daa73b0cec91f70 to your computer and use it in GitHub Desktop.
Save lemoogle/06aa7daa73b0cec91f70 to your computer and use it in GitHub Desktop.
# table_def.py
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey, DateTime, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, backref
Base = declarative_base()
metadata = MetaData()
########################################################################
class Posts(Base):
""""""
__tablename__ = "OH_Posts"
__table_args__ = {'schema': 'bi_tool_eval_snd'}
xOwner = Column(Integer, primary_key=True)
PostedTo = Column(Integer, primary_key=True)
Published = Column(DateTime, primary_key=True)
Message = Column(String)
#----------------------------------------------------------------------
def __init__(self, xOwner, PostedTo, Published, Message):
""""""
self.xOwner = Owner
self.PostedTo = PostedTo
self.Published = Published
self.Message = Message
# IDOL on Demand, Vertica Bulk Sentiment update
# This utility uses IoD calls to apply Sentiment analysis to Data held on Vertica
# the results are then passed back up to Vertica in an associated table
# Latency for a single IoD call is about 1.5 seconds, therefore, to increase throughput the utility does the following:
# 1) It submits Asynchronous requests to IoD asking for the sentiment processing. A secondary call is then
# submitted to return the results
# 2) It uses multiprocessing - creating a worker pool of sub processes which perform the actual web service calls to IoD
# 3) Finally - standard "inserts" into Vertica are slow, therefore this utility makes use of a (simplified) Python module which
# takes the output data, writes it to a temporary file and then uses VSQL COPY to post into Vertica
# Using these techniques - approximate throughput is about 5 sentiment posts per second
# Object Relational Mapper SQL Alchemy used
# This turns Tables into Python objects. Although this makes
# manipulating tables very intuitive and Python like it doesn't lend itself to generic table utiliies
# and may be overkill for this purpose
from sqlalchemy import create_engine, ForeignKey
from sqlalchemy import Column, Date, Integer, String
from sqlalchemy.orm import relationship, backref, sessionmaker
from multiprocessing import Pool
import logging
import time
# Requests provides a helper class for submitting HTTP calls
import requests
import json
# Additional modules. Table definitions for SQLAlchemy and Vertica upload utilities
from table_def import Posts
# Simplified version used which works on Windows
#from pyvertica.batch import VerticaBatch
from VerticaBatchSimplified import VerticaBatch
# key used to allocate services on IoD
# IOD operates using an APIKey to allocate services
apikey="xxx-xxx-xxx-xxx"
# base URL fro IoD
# every service from IoD has the same format, and has a synchronous and asynchronous version
url="http://api.idolondemand.com/1/api/sync/{}/v1"
Asyncurl="http://api.idolondemand.com/1/api/async/{}/v1"
# if the ASYNC version is used, it returns a JOB ID as a handle. This JOB ID can be used to fetch results using the URL below
JobResulturl="http://api.idolondemand.com/1/job/result/{}"
# Simple log writer - outputs a message in standard format with a timer
# On each call it "resets" the timer so that elapsed time between calls can be recorded
def WriteTimerLog(LogMessage,tStartClock=0,tEndClock=0):
if tStartClock==0:
tStartCock=time.clock()
tEndClock = time.clock()
logging.critical(LogMessage + ': %s',"{0:.2f}".format(tEndClock - tStartClock))
# Reset timer by making END time the START time
tStartClock=tEndClock
return tStartClock
# General routine for calling IDOL on Demand services
def postrequests(callingurl,function,data={},files={}):
# Put API Key into URL
data["apikey"]=apikey
# Insert particular IOD service into URL
callurl=callingurl.format(function)
# Call IoD and return results
success=True
try:
r=requests.post(callurl,data=data,files=files,timeout=5)
returned_status_code=r.status_code
returned_json=r.json()
if r.status_code != 200:
success=False
# Trap timeouts - fairly common
except requests.exceptions.Timeout as e:
success=False
returned_status_code=-1
returned_json='Timeout'
return success, returned_status_code, returned_json
# Call asynchronous API for sentiment and return jobID
def asynchCallIODSentimentAPI(Instance):
# These functions are called as sub-processes and therefore
# don't have access to the assets of the main routine,
# including the logger. So this has to be re-allocated.
# pick your log file location of choice
logging.basicConfig(filename="C:\Coding\Python2.log",
level=logging.CRITICAL, format='%(asctime)s %(message)s')
# Start a Sentiment request
success, status_code, AsyncResponse=postrequests(
Asyncurl,'analyzesentiment',{'text':Instance['Message']})
if success:
# This Job ID will be used to fetch the results later
Instance['jobID'] = AsyncResponse['jobID']
else:
Instance['jobID'] = '-'
logging.critical('ERROR - Asynch request failed, Status code %s, Response %s',
status_code, AsyncResponse)
return Instance
# Call asynchronous API for sentiment with jobID and return results
def asynchReturnIODSentimentAPI(Instance):
# as above, pick your logfile location of choice
logging.basicConfig(filename="C:\Coding\Python2.log",
level=logging.CRITICAL, format='%(asctime)s %(message)s')
# Get Sentiment from IoD
success, status_code, results=postrequests(JobResulturl,Instance['jobID'],{})
results_string = json.dumps(results)
if success:
# Extract sentiment results from response by walking JSON
# Job results can contain N actual results so the JSON has to
# be "walked" to get to the sentiment results
Actions=results["actions"]
SentimentResult=Actions[0]
SentimentResult=SentimentResult["result"]
SentimentResult_string=json.dumps(SentimentResult)
aggregatelist=SentimentResult["aggregate"]
# Lots of detail is returned however only 3 elements are currently stored
# The overall Sentiment score, the overall sentiment and the full results
# (with all topics etc) in case more detail is needed later
Instance['AggregateScore']=aggregatelist["score"]
Instance['AggregateSentiment']=aggregatelist["sentiment"]
Instance['SentimentResults']=SentimentResult_string
Instance.update({'Sentiment Retrieved':True})
else:
Instance['AggregateScore']=-1
Instance['AggregateSentiment']='-'
Instance['SentimentResults']='-'
logging.critical('ERROR - Asynch response failed, Status code %s, Response %s',
status_code, results)
Instance.update({'Sentiment Retrieved':False})
return Instance
# Multiprocessing is incredibly straightforward in Python
# The "Pool" command creates N worker processes ready for work then
# the "Map" function starts each process off with the function to call and the parameters to use
# The "Map" function then waits for all processes to complete and compiles the results into a single list
# This is the pair of asnych calls, one to start the requests and one to fetch results
def SubmitParallelAsyncSentimentRequests(APIArray, WorkerProcesses):
with Pool(processes=WorkerProcesses) as urlPool:
APIArray = urlPool.map(asynchCallIODSentimentAPI, APIArray)
return APIArray
def SubmitParallelAsyncSentimentResponses(APIArray, WorkerProcesses):
with Pool(processes=WorkerProcesses) as urlPool:
APIArray = urlPool.map(asynchReturnIODSentimentAPI, APIArray)
return APIArray
# Used for posting to Vertica
# Note, simplified version used here, for Linux installs I recommend using the original
# VerticaBatch module from PyVertica as it is more flexible
def OpenVerticaBatch():
batch = VerticaBatch(
odbc_kwargs={'dsn': '<dsn here>'},
table_name=',<table here>',
truncate_table=False,
reconnect=False,
column_list=['column list here>'],
copy_options={
'DELIMITER': ',',
'REJECTEDFILE': False
}
)
return batch
#Takes the array of Asynch results and posts them
def PostJobResults(batch, ResultArray=[]):
Row=[]
RowList=[]
# Build Array
for Instance in ResultArray:
# output table has same 3 part compound key as source table
Row.append(Instance['xOwner'])
Row.append(Instance['PostedTo'])
Row.append(Instance['Published'])
Row.append(Instance['Message'])
# Row.append('-')
Row.append(Instance['AggregateScore'])
Row.append(Instance['AggregateSentiment'])
Row.append(Instance['SentimentResults'])
RowList.append(Row)
batch.insert_list(Row)
Row=[]
batch.post_rows()
batch.commit()
return []
def CountRetrievals(Array,CheckEntry):
# Loops through array looking for successful retrievals
SuccessfulCount=0
for Instance in Array:
if Instance[CheckEntry]:
SuccessfulCount +=1
return SuccessfulCount
def Main():
# Control values - these will be made into parameters
# Limit on Rows for testing
RowLimit=200
# Parallel data structures
# Number of asynchronous calls to make
RowstoProcess=100
# Number of workers
WorkerProcesses=8
# Array which will hold the set of results
APIArray=[]
# Indivdual instance
APIInstance={}
LoopCheck=0
# Timer function
tStartClock=0
tEndClock=0
# Initialise Logging
# Pick your logging file
logging.basicConfig(filename="C:\Coding\Python_3.log",level=logging.CRITICAL, format='%(asctime)s %(message)s')
logging.critical('Initialise, %s row limit, %s inner loop, %s Worker pool', RowLimit, RowstoProcess, WorkerProcesses)
tStartClock=WriteTimerLog('Starting Timer',tStartClock,tEndClock)
# used to log total elapsed time
tStartAll = time.clock()
# use DSN for connection
# XXXX to be removed XXXX
engine = create_engine('vertica+pyodbc://<connection details here>')
# Create the connection for posting to Vertica
# Note there are 2 Vertica Connections, one to read in the original text for sentiment analysis
# and one for batch uploading the sentiment results
batch=OpenVerticaBatch()
# clear down table
engine.execute('truncate table bi_tool_eval_snd.OH_Posts_Sentiment')
# create a Session
Session = sessionmaker(bind=engine)
session = Session()
tStartClock=WriteTimerLog('Opening Input Table, elapsed time',tStartClock,tEndClock)
# Fetch all rows to begin process
PostRows = session.query(Posts).limit(RowLimit)
tStartClock=WriteTimerLog('Rows fetched, elapsed time',tStartClock,tEndClock)
tProcessAfterStart = time.clock()
logging.critical('Entering Main Loop')
RowCount=0
RowSentimentRetrieved=0
BlankMessageCount=0
for PostInstance in PostRows:
# Loop will make use of multi-processing
# "RowsToProcess" rows will be read and the data to be processed written to APIArray
# Once conplete 2 sets of parallel processing jobs are set up
# The first calls and the Aynchronous "AnalyseSentiment" is called
# the ReturnedJobID is then stored in an array.
# The second then processes the job ID and fetches the results
# then a final routine will post the results to the database.
# The Loop restarts and the routine fetches the next "RowstoProcess" rows
RowCount +=1
if len(PostInstance.Message) == 0:
BlankMessageCount+=1
if len(PostInstance.Message) > 0:
# Load up APIArray
# This is the major data structure, it holds:
# The incoming primary key values and the message to be processed
# Once the jobs are submiited the structure holds:
# The Job ID used in the asynchronous IoD calls
# Once the results are retrieved the structure holds:
# The results of the Sentiment analysis
# The aggregate score
# The aggregate sentiment
APIInstance={'xOwner': PostInstance.xOwner,
'PostedTo': PostInstance.PostedTo,
'Published': PostInstance.Published,
'Message': PostInstance.Message,
'jobID': '-',
'AggregateScore': 0,
'AggregateSentiment': '-',
'SentimentResults': '-'}
# Build up array of requests to process
APIArray.append(APIInstance)
LoopCheck += 1
# Once limit reached - submit asynchronous processes
if LoopCheck == RowstoProcess:
tStartClock=WriteTimerLog('Array Load Complete, elapsed time',tStartClock,tEndClock)
# start worker processes
APIArray=SubmitParallelAsyncSentimentRequests(APIArray, WorkerProcesses)
tStartClock=WriteTimerLog('Parallel Processes- Submit Requests - Complete, elapsed time',tStartClock,tEndClock)
APIArray=SubmitParallelAsyncSentimentResponses(APIArray, WorkerProcesses)
tStartClock=WriteTimerLog('Parallel Processes - Fetch Responses - Complete, elapsed time',tStartClock,tEndClock)
# Count successful responses
RowSentimentRetrieved=RowSentimentRetrieved + CountRetrievals(APIArray,'Sentiment Retrieved')
# Post entries to Database
APIArray = PostJobResults(batch,APIArray)
tStartClock=WriteTimerLog('Rows posted complete, elapsed time',tStartClock,tEndClock)
logging.critical('Rows read - %s, Rows rejected - %s, Rows with Sentiment Retrieved - %s ', RowCount, BlankMessageCount, RowSentimentRetrieved)
LoopCheck=0
#On exit from loop do a final post
tStartClock=WriteTimerLog('Exiting Loop, elapsed time',tStartClock,tEndClock)
# start worker processes
APIArray=SubmitParallelAsyncSentimentRequests(APIArray, WorkerProcesses)
APIArray=SubmitParallelAsyncSentimentResponses(APIArray, WorkerProcesses)
# Count successful responses
RowSentimentRetrieved=RowSentimentRetrieved + CountRetrievals(APIArray,'Sentiment Retrieved')
APIArray=PostJobResults(batch,APIArray)
tStartClock=WriteTimerLog('Final rows posted, elapsed time',tStartClock,tEndClock)
session.close()
tClose = time.clock()
logging.critical('Complete - %s process, %s total, %s Rows per Second', "{0:.2f}".format(tClose - tProcessAfterStart), "{0:.2f}".format(tClose - tStartAll), "{0:.2f}".format(RowLimit/tClose - tStartAll))
logging.critical('Rows read - %s, Rows rejected - %s, Rows with Sentiment Retrieved - %s ', RowCount, BlankMessageCount, RowSentimentRetrieved)
if __name__ == '__main__':
Main()
####################
# This code builds on the publicly available VerticaBatch code but simplifies the code somewhat, to take out the multithreading and
# so that it works on Windows
# However, you are advised to use the public VerticaBatch module from PyVertica if you are running on Linux
####################
import codecs
import copy
import logging
import os
import tempfile
import threading
import taskthread
from queue import Queue
from functools import wraps
from pyvertica.connection import get_connection
logger = logging.getLogger(__name__)
# This is the primary Class
# It sets up a Vertica Connection. Essentially this class takes input, writes it to a
# text file then uses the VSQL COPY command to post the data into Vertica. This is by far the fastest
# mechanism for getting data into Vertica
class VerticaBatch(object):
"""
Object for writing multiple records to Vertica in a batch.
Usage example::
from pyvertica.batch import VerticaBatch
batch = VerticaBatch(
odbc_kwargs={'dsn': 'VerticaDWH'},
table_name='schema.my_table',
truncate=True,
column_list=['column_1', 'column_2'],
copy_options={
'DELIMITER': ',',
},
multi_batch=False
)
row_list = [
['row_1_val_1', 'row_1_val_2'],
['row_2_val_1', 'row_2_val_2'],
...
]
for column_data_list in row_list:
batch.insert_list(column_data_list)
error_bool, error_file_obj = batch.get_errors()
if error_bool:
print error_file_obj.read()
batch.commit()
.. note:: It is also possible to call :py:meth:`~.VerticaBatch.commit`
multiple times (for example after every 50000 records). Please note
that after the first insert and after calling
:py:meth:`~.VerticaBatch.commit`, the output of
:py:meth:`~.VerticaBatch.get_errors` will reflect the new series of
inserts and thus not contain the "old" inserts.
.. note:: Creating a new batch object will not create a lock on the target
table. This will happen only after first insert.
.. note:: If a batch is created with ``multi_batch = True``,
:py:meth:`~.VerticaBatch.close_batch` must be explicity called when
the batch resources should be closed. If ``multi_batch`` is set to
``False``, :py:meth:`~.VerticaBatch.close_batch` need not be called.
In this case, while the batch is reusable, the system resources will
be realoccated uppon each :py:meth:`~.VerticaBatch.commit`, which
may not be desirable.
:param table_name:
A ``str`` representing the table name (including the schema) to write
to. Example: ``'staging.my_table'``.
:param odbc_kwargs:
A ``dict`` containing the ODBC connection keyword arguments. E.g.::
{
'dsn': 'TestDSN',
}
.. seealso:: https://code.google.com/p/pyodbc/wiki/Module
:param truncate_table:
A ``bool`` indicating if the table needs truncating before first
insert. Default: ``False``. *Optional*.
:param reconnect:
A ``bool`` passed to the connection object to decide if pyvertica
should directly reconnect to a random node to bypass a load balancer.
:param analyze_constraints:
A ``bool`` indicating if a ``ANALYZE_CONSTRAINTS`` startement should
be executed when getting errors. Default: ``True``. *Optional*.
:param column_list:
A ``list`` containing the columns that will be written. *Optional*.
:param copy_options:
A ``dict`` containing the keys to override. For a list of existing keys
and their defaults, see :py:attr:`~.VerticaBatch.copy_options_dict`.
*Optional*.
:param connection:
A ``pyodbc.Connection`` to use instead of opening a new connection. If
this parameter is supplied, ``odbc_kwargs`` may not be supplied.
Default: ``None``. *Optional*.
:param multi_batch:
A ``boolean`` to indicate if the batch should keep it's resources open
after a call to commit. If you plan to only call
:py:meth:`~.VerticaBatch.commit` one time, set this to false.
Otherwise, setting ``multi_batch=True`` will prevent the batch from
closing all of its resources.
Default: ``False``. *Optional*.
"""
copy_options_dict = {
'DELIMITER': ';',
'ENCLOSED BY': '"',
'SKIP': 0,
'NULL': '',
'RECORD TERMINATOR': '\x01',
'NO COMMIT': True,
'REJECTEDFILE': __debug__,
'REJECTMAX': 0,
}
"""
Default copy options for SQL query.
.. note:: By default ``REJECTEDFILE`` is set to ``__debug__``, which is
``True``, unless you've set the ``PYTHONOPTIMIZE`` environment variable.
"""
def __init__(
self,
table_name,
odbc_kwargs={},
truncate_table=False,
reconnect=True,
analyze_constraints=True,
column_list=[],
copy_options={},
connection=None,
multi_batch=False):
if connection and odbc_kwargs:
raise ValueError("May only specify one of "
"[connection, odbc_kwargs]")
self._odbc_kwargs = odbc_kwargs
self._table_name = table_name
self._column_list = column_list
self._analyze_constraints = analyze_constraints
self.copy_options_dict.update(copy_options)
self._batch_initialized = False
self._multi_batch = multi_batch
self._total_count = 0
self._batch_count = 0
self._in_batch = False
if not connection:
# make sure we are not logging any passwords :)
odbc_kwargs_copy = copy.deepcopy(odbc_kwargs)
if 'password' in odbc_kwargs_copy:
odbc_kwargs_copy['password'] = '*****'
logger.debug(
'Initializing VerticaBatch with odbc_kwargs={0}, '
'table_name={1}, '
'column_list={2}'.format(
odbc_kwargs_copy, table_name, column_list))
self._connection = get_connection(
reconnect=reconnect, **self._odbc_kwargs)
else:
self._connection = connection
# truncate table, if needed
if truncate_table:
self._truncate_table()
# Open File objects
# Hard coded for now, needs to be made random
self._output_obj_path = 'C:\VerticaTemp\Rowset.txt'
self._output_obj = codecs.open(self._output_obj_path, 'w', 'utf-8')
# Build VSQL COPY command from class attributes
self.sql_query_str = self._get_sql_lcopy_str()
def commit(self):
"""
Commit the current transaction.
"""
self._connection.commit()
def post_rows(self):
# clear buffers prior to posting data
self._output_obj.flush()
os.fsync(self._output_obj)
self._connection.execute(self.sql_query_str)
# Clean up temporary table
self._output_obj.seek(0)
self._output_obj.truncate()
def insert_list(self, value_list):
"""
Insert a ``list`` of values (instead of a ``str`` representing a line).
Example::
batch.insert_list(['value_1', 'value_2'])
:param value_list:
A ``list``. Each item should represent a column value.
"""
return self.insert_line(self._single_list_to_string(value_list))
def _truncate_table(self):
"""
Truncate table.
"""
logger.info('Truncating table {0}'.format(self._table_name))
self._cursor.execute('TRUNCATE TABLE {0}'.format(self._table_name))
def _get_num_rejected_rows(self):
"""
Return the number of rejected rows.
:return:
An ``int``.
"""
if self._in_batch:
self._end_batch()
if not self.get_batch_count():
return 0
rejected_rows = self._cursor.execute('SELECT GET_NUM_REJECTED_ROWS()')
rejected_rows = rejected_rows.fetchone()
return rejected_rows[0]
def get_batch_count(self):
"""
Return number (``int``) of inserted items since last commit.
.. warning:: When using :py:meth:`~.VerticaBatch.insert_raw` this
value represents the number of raw ``str`` objects inserted, not
the number of lines!
:return:
An ``int``.
"""
return self._batch_count
def get_total_count(self):
"""
Return total number (``int``) of inserted items.
.. warning:: When using :py:meth:`~.VerticaBatch.insert_raw` this
value represents the number of raw ``str`` objects inserted, not
the number of lines!
:return:
An ``int``.
"""
return self._total_count
def _get_sql_lcopy_str(self):
"""
Get ``str`` representing the COPY query.
:return:
A ``str`` representing the query.
"""
# table name
output_str = 'COPY {0}'.format(self._table_name)
# columns, if available
if self._column_list:
output_str += ' ({0})'.format(', '.join(self._column_list))
# fifo path
output_str += " FROM LOCAL '{0}'".format(self._output_obj_path)
# rejected file
if self.copy_options_dict['REJECTEDFILE']:
output_str += " REJECTED DATA '{0}'".format(
self._rejected_file_obj.name)
# other arguments which map one-to-one
for key in [
'REJECTMAX',
'DELIMITER',
'ENCLOSED BY',
'SKIP',
'NULL',
'RECORD TERMINATOR']:
value = self.copy_options_dict[key]
if isinstance(value, int):
output_str += ' {0} {1}'.format(key, value)
elif isinstance(value, str):
output_str += " {0} '{1}'".format(key, value)
# NO COMMIT statement, which needs to be at the end
if self.copy_options_dict['NO COMMIT']:
output_str += ' NO COMMIT'
return output_str
def _single_list_to_string(self,
value_list,
suffix=None):
"""
Convert a single ``iterable`` to a string that represents one item
in the batch.
:param value_list:
An ``iterable``. Each item represents one column value
:param suffix:
A ``string``. If specified, this character will be appended
to the resulting string.
"""
enclosed_by = self.copy_options_dict['ENCLOSED BY']
escaped_enclosed_by = '\\%s' % enclosed_by
suffix = suffix if suffix else ''
delimiter = self.copy_options_dict['DELIMITER']
str_value_list = (
'%s%s%s' % (
enclosed_by,
# TPS - Python 3 is unicode by default so "unicode" function deprecated - use "str" instead
#unicode(value).replace(enclosed_by, escaped_enclosed_by),
str(value).replace(enclosed_by, escaped_enclosed_by),
enclosed_by
)
if value is not None else '' for value in value_list)
return delimiter.join(str_value_list) + suffix
def insert_lists(self, value_lists, row_count=1):
"""
Insert an ``iterable`` of ``iterable`` values (instead of a single
string). The iterables can be lists, generators, etc.
Example::
batch.insert_lists([['key1', 'value1'], ['key2', 'value2']))
:param value_lists:
An ``iterable``. Each iterable is another ``iterable`` containing
the values to insert.
:param row_count:
An ``int``. The number of rows being inserted. Since the
``value_lists`` parameter may be a generator, the number of
rows is not easily determinable. Therefore, the number of
rows being inserted must be specified.
"""
suffix = self.copy_options_dict['RECORD TERMINATOR']
strings = (self._single_list_to_string(value_list,
suffix=suffix)
for value_list in value_lists)
self._fifo_obj.write(
"".join(strings)
)
self._total_count += row_count
self._batch_count += row_count
def insert_line(self, line_str):
"""
Insert a ``str`` containing all the values.
This is useful when inserting lines directly from a CSV file for
example.
.. note:: When you have a loghandler with ``DEBUG`` level, every query
will be logged. For performance reason, this log statement is only
executed when ``__debug__`` equals ``True`` (which is the default
case). For a better performance, you should invoke the Python
interpreter with the ``-O`` argument or set the environment
variable ``PYTHONOPTIMIZE`` to something.
Example::
batch.insert_line('"value_1";"value_2"')
:param line_str:
A ``str`` representing the line to insert. Make sure the ``str``
is formatted according :py:attr:`~.VerticaBatch.copy_options_dict`.
Example: ``'"value1";"value2";"value3"'``.
"""
if __debug__:
logger.debug(u'Inserting line: {0}'.format(line_str))
self._output_obj.write(
line_str + self.copy_options_dict['RECORD TERMINATOR'])
self._total_count += 1
self._batch_count += 1
def rollback(self):
"""
Rollback the current transaction.
"""
if self._in_batch:
self._end_batch()
self._connection.rollback()
logger.info('Transaction rolled back')
def get_cursor(self):
"""
Return a cursor to the database.
This is useful when you want to add extra data within the same
transaction of the batch import.
:return:
Instance of :py:class:`!pyodbc.Cursor`.
"""
return self._connection.cursor()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment