Skip to content

Instantly share code, notes, and snippets.

@sureshsaggar
Last active December 18, 2015 15:39
Show Gist options
  • Save sureshsaggar/5805394 to your computer and use it in GitHub Desktop.
Save sureshsaggar/5805394 to your computer and use it in GitHub Desktop.
Analytics - Incremental tail for MySQL
'''
USAGE: ubuntu@mysql-ab1:~$ python mysqltail.py
'''
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent)
import types
import sys
from datetime import date
import redis
import time
from time import gmtime, strftime
import subprocess
import os
import re
class MySQLTail():
BUFFER_SIZE = 3000
DESCRIPTION = '''
MySQLTail: Process that keeps track of all the updates happening on (database, table)
pair as defined in the settings. Further aggregates the daily logs in HDP HDFS.
'''
def __init__(self, params):
print '# [INFO]: DESCRIPTION: %s' %MySQLTail.DESCRIPTION
self.params = params
def getMySQLDB(self):
return self.params['mysql']['db']
def getMySQLTable(self):
return self.params['mysql']['table']
def getMySQLSettings(self):
return self.params['mysql']['mysql_settings']
def getMySQLSlaveServerID(self):
return self.params['mysql']['server_id']
def getKeyNameLastTS(self):
return self.params['redis']['setname'] + '_last_timestamp'
def getRedisHandle(self):
return redis.Redis(self.params['redis']['host'], self.params['redis']['port'])
def unixstampToDate(self, unixts):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(float(unixts)))
def saveBufferToDisk(self, lines_count, records):
for event in records.keys():
fname = self.getOutputFileName(event) + '_%s' %str(date.today())
fh = open(fname, 'a')
for log in records[event]:
fh.write(str(log)+'\n')
fh.close()
print '# [INFO]:[saveBufferToDisk]:[%s] Wrote (%s) lines in (%s) file for event(%s)' %(lines_count, len(records[event]), fname, event)
self.syncWithHDFS()
def getLocalFSBaseDir(self):
return self.params['output']['localfs']['basedir']
def getHDFSBaseDir(self):
return self.params['output']['hdfs']['basedir']
def syncWithHDFS(self):
localFSBaseDir = self.getLocalFSBaseDir()
hdfsBaseDir = self.getHDFSBaseDir()
print '# [INFO]: Syncing localFSBaseDir(%s) with HDFS(%s)' %(localFSBaseDir, hdfsBaseDir)
cur_date = str(date.today())
# Find files for all events in the local directory < the ones for the current date
def filterDayBeforeFiles(filename):
m = re.search('\d{4}-\d{2}-\d{2}', filename)
if m and str(m.group()) < cur_date:
return True
return False
files = filter(filterDayBeforeFiles, os.listdir(localFSBaseDir))
if len(files) == 0:
print '# [INFO]: Nothing to sync before current date(%s)' %cur_date
else:
print '# [INFO]: Files to sync before current date(%s) are %s' %(cur_date, files)
for lf in files:
cmdargs = ["hadoop", "fs", "-put", localFSBaseDir+lf, hdfsBaseDir]
print ' '.join(cmdargs)
subprocess.Popen(cmdargs)
def getOutputFileName(self, event, type='localfs'):
return self.getLocalFSBaseDir() + self.params['output'][type]['events'][event]
def initRecordsBuffer(self):
return {'DeleteRowsEvent':[], 'UpdateRowsEvent':[], 'WriteRowsEvent':[]}
def getBinLogStream(self):
return BinLogStreamReader(
connection_settings=self.getMySQLSettings(),
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
server_id=self.getMySQLSlaveServerID(), blocking=True
)
def run(self):
binlog_stream = self.getBinLogStream()
lines_count = 0
records = self.initRecordsBuffer()
rhandle = self.getRedisHandle()
last_ts = rhandle.get(self.getKeyNameLastTS())
if last_ts is None:
last_ts = 0
print '# [INFO]: Last Time Stamp(%s)(%s) Stream(%s)' %(last_ts, self.unixstampToDate(last_ts), binlog_stream)
for binlogevent in binlog_stream:
if float(binlogevent.timestamp) < float(last_ts):
continue
prefix = "# [INFO]: (Timestamp, Schema, Table) = (%s, %s, %s)" % (binlogevent.timestamp, binlogevent.schema, binlogevent.table)
# TODO: Scope to required DB & TABLE
for row in binlogevent.rows:
try:
vals = None
if isinstance(binlogevent, DeleteRowsEvent):
vals = row["values"]
elif isinstance(binlogevent, UpdateRowsEvent):
vals = row["after_values"]
elif isinstance(binlogevent, WriteRowsEvent):
vals = row["values"]
if vals:
event = binlogevent.__class__.__name__
# print '# [DEBUG]: (event, prefix, vals) = (%s, %s, %s)' %(event, prefix, vals)
lines_count = lines_count + 1
records[event].append(vals)
if lines_count % MySQLTail.BUFFER_SIZE == 0:
self.saveBufferToDisk(lines_count, records)
records = self.initRecordsBuffer()
rhandle.set(self.getKeyNameLastTS(), binlogevent.timestamp)
except:
print '# [ERROR]: Unexpected error - ', sys.exc_info()
stream.close()
if __name__ == '__main__':
MySQLTail({
'mysql': {
'server_id': 10101, # unique slave identifier
'db': 'addressbook', 'table': 'addressbook',
'mysql_settings': {
# 'host': '10.0.0.143', 'port': 3306,
'host': 'localhost', 'port': 3306,
'user': 'analytics', 'passwd': 'u4ana2606'
# 'user': 'root', 'passwd': 'Db@Adm1nn'
}
},
'redis': {'host': '10.0.0.230', 'port': 6379, 'setname': 'mysqltailproduction'},
'output': {
'localfs': {
'basedir': '/mnt/analytics/data/mysql/incremental/',
'events': {
'DeleteRowsEvent': 'deletions', 'WriteRowsEvent': 'writes',
'UpdateRowsEvent': 'updates'
}
},
'hdfs': {'basedir': 'hdfs://10.0.0.82:8020/data/analytics/mysql/incremental/'}
}
}).run()'''
ubuntu@mysql-ab1:~$ python mysqltail.py
'''
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent)
MYSQL_SETTINGS = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": ""
}
def main():
stream = BinLogStreamReader(
connection_settings=MYSQL_SETTINGS,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
blocking=True
)
for binlogevent in stream:
prefix = "%s:%s:" % (binlogevent.schema, binlogevent.table)
for row in binlogevent.rows:
if isinstance(binlogevent, DeleteRowsEvent):
vals = row["values"]
elif isinstance(binlogevent, UpdateRowsEvent):
vals = row["after_values"]
elif isinstance(binlogevent, WriteRowsEvent):
vals = row["values"]
print 'vals: %s' %vals
stream.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment