Last active
December 18, 2015 15:39
-
-
Save sureshsaggar/5805394 to your computer and use it in GitHub Desktop.
Analytics - Incremental tail for MySQL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
USAGE: ubuntu@mysql-ab1:~$ python mysqltail.py | |
''' | |
from pymysqlreplication import BinLogStreamReader | |
from pymysqlreplication.row_event import (DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent) | |
import types | |
import sys | |
from datetime import date | |
import redis | |
import time | |
from time import gmtime, strftime | |
import subprocess | |
import os | |
import re | |
class MySQLTail(): | |
BUFFER_SIZE = 3000 | |
DESCRIPTION = ''' | |
MySQLTail: Process that keeps track of all the updates happening on (database, table) | |
pair as defined in the settings. Further aggregates the daily logs in HDP HDFS. | |
''' | |
def __init__(self, params): | |
print '# [INFO]: DESCRIPTION: %s' %MySQLTail.DESCRIPTION | |
self.params = params | |
def getMySQLDB(self): | |
return self.params['mysql']['db'] | |
def getMySQLTable(self): | |
return self.params['mysql']['table'] | |
def getMySQLSettings(self): | |
return self.params['mysql']['mysql_settings'] | |
def getMySQLSlaveServerID(self): | |
return self.params['mysql']['server_id'] | |
def getKeyNameLastTS(self): | |
return self.params['redis']['setname'] + '_last_timestamp' | |
def getRedisHandle(self): | |
return redis.Redis(self.params['redis']['host'], self.params['redis']['port']) | |
def unixstampToDate(self, unixts): | |
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(float(unixts))) | |
def saveBufferToDisk(self, lines_count, records): | |
for event in records.keys(): | |
fname = self.getOutputFileName(event) + '_%s' %str(date.today()) | |
fh = open(fname, 'a') | |
for log in records[event]: | |
fh.write(str(log)+'\n') | |
fh.close() | |
print '# [INFO]:[saveBufferToDisk]:[%s] Wrote (%s) lines in (%s) file for event(%s)' %(lines_count, len(records[event]), fname, event) | |
self.syncWithHDFS() | |
def getLocalFSBaseDir(self): | |
return self.params['output']['localfs']['basedir'] | |
def getHDFSBaseDir(self): | |
return self.params['output']['hdfs']['basedir'] | |
def syncWithHDFS(self): | |
localFSBaseDir = self.getLocalFSBaseDir() | |
hdfsBaseDir = self.getHDFSBaseDir() | |
print '# [INFO]: Syncing localFSBaseDir(%s) with HDFS(%s)' %(localFSBaseDir, hdfsBaseDir) | |
cur_date = str(date.today()) | |
# Find files for all events in the local directory < the ones for the current date | |
def filterDayBeforeFiles(filename): | |
m = re.search('\d{4}-\d{2}-\d{2}', filename) | |
if m and str(m.group()) < cur_date: | |
return True | |
return False | |
files = filter(filterDayBeforeFiles, os.listdir(localFSBaseDir)) | |
if len(files) == 0: | |
print '# [INFO]: Nothing to sync before current date(%s)' %cur_date | |
else: | |
print '# [INFO]: Files to sync before current date(%s) are %s' %(cur_date, files) | |
for lf in files: | |
cmdargs = ["hadoop", "fs", "-put", localFSBaseDir+lf, hdfsBaseDir] | |
print ' '.join(cmdargs) | |
subprocess.Popen(cmdargs) | |
def getOutputFileName(self, event, type='localfs'): | |
return self.getLocalFSBaseDir() + self.params['output'][type]['events'][event] | |
def initRecordsBuffer(self): | |
return {'DeleteRowsEvent':[], 'UpdateRowsEvent':[], 'WriteRowsEvent':[]} | |
def getBinLogStream(self): | |
return BinLogStreamReader( | |
connection_settings=self.getMySQLSettings(), | |
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent], | |
server_id=self.getMySQLSlaveServerID(), blocking=True | |
) | |
def run(self): | |
binlog_stream = self.getBinLogStream() | |
lines_count = 0 | |
records = self.initRecordsBuffer() | |
rhandle = self.getRedisHandle() | |
last_ts = rhandle.get(self.getKeyNameLastTS()) | |
if last_ts is None: | |
last_ts = 0 | |
print '# [INFO]: Last Time Stamp(%s)(%s) Stream(%s)' %(last_ts, self.unixstampToDate(last_ts), binlog_stream) | |
for binlogevent in binlog_stream: | |
if float(binlogevent.timestamp) < float(last_ts): | |
continue | |
prefix = "# [INFO]: (Timestamp, Schema, Table) = (%s, %s, %s)" % (binlogevent.timestamp, binlogevent.schema, binlogevent.table) | |
# TODO: Scope to required DB & TABLE | |
for row in binlogevent.rows: | |
try: | |
vals = None | |
if isinstance(binlogevent, DeleteRowsEvent): | |
vals = row["values"] | |
elif isinstance(binlogevent, UpdateRowsEvent): | |
vals = row["after_values"] | |
elif isinstance(binlogevent, WriteRowsEvent): | |
vals = row["values"] | |
if vals: | |
event = binlogevent.__class__.__name__ | |
# print '# [DEBUG]: (event, prefix, vals) = (%s, %s, %s)' %(event, prefix, vals) | |
lines_count = lines_count + 1 | |
records[event].append(vals) | |
if lines_count % MySQLTail.BUFFER_SIZE == 0: | |
self.saveBufferToDisk(lines_count, records) | |
records = self.initRecordsBuffer() | |
rhandle.set(self.getKeyNameLastTS(), binlogevent.timestamp) | |
except: | |
print '# [ERROR]: Unexpected error - ', sys.exc_info() | |
stream.close() | |
if __name__ == '__main__': | |
MySQLTail({ | |
'mysql': { | |
'server_id': 10101, # unique slave identifier | |
'db': 'addressbook', 'table': 'addressbook', | |
'mysql_settings': { | |
# 'host': '10.0.0.143', 'port': 3306, | |
'host': 'localhost', 'port': 3306, | |
'user': 'analytics', 'passwd': 'u4ana2606' | |
# 'user': 'root', 'passwd': 'Db@Adm1nn' | |
} | |
}, | |
'redis': {'host': '10.0.0.230', 'port': 6379, 'setname': 'mysqltailproduction'}, | |
'output': { | |
'localfs': { | |
'basedir': '/mnt/analytics/data/mysql/incremental/', | |
'events': { | |
'DeleteRowsEvent': 'deletions', 'WriteRowsEvent': 'writes', | |
'UpdateRowsEvent': 'updates' | |
} | |
}, | |
'hdfs': {'basedir': 'hdfs://10.0.0.82:8020/data/analytics/mysql/incremental/'} | |
} | |
}).run()''' | |
ubuntu@mysql-ab1:~$ python mysqltail.py | |
''' | |
from pymysqlreplication import BinLogStreamReader | |
from pymysqlreplication.row_event import (DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent) | |
MYSQL_SETTINGS = { | |
"host": "127.0.0.1", | |
"port": 3306, | |
"user": "root", | |
"passwd": "" | |
} | |
def main(): | |
stream = BinLogStreamReader( | |
connection_settings=MYSQL_SETTINGS, | |
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent], | |
blocking=True | |
) | |
for binlogevent in stream: | |
prefix = "%s:%s:" % (binlogevent.schema, binlogevent.table) | |
for row in binlogevent.rows: | |
if isinstance(binlogevent, DeleteRowsEvent): | |
vals = row["values"] | |
elif isinstance(binlogevent, UpdateRowsEvent): | |
vals = row["after_values"] | |
elif isinstance(binlogevent, WriteRowsEvent): | |
vals = row["values"] | |
print 'vals: %s' %vals | |
stream.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment