Skip to content

Instantly share code, notes, and snippets.

@RajatGoyal
Created April 2, 2020 20:27
Show Gist options
  • Save RajatGoyal/a7e270cea7f3723ae037857085e0c36d to your computer and use it in GitHub Desktop.
Save RajatGoyal/a7e270cea7f3723ae037857085e0c36d to your computer and use it in GitHub Desktop.
debezium cdc memsql
#!/usr/bin/python
import struct
import sys
import json
binary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.buffer
binary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.buffer
binary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.buffer
def input_stream():
"""
Consume STDIN and yield each record that is received from MemSQL
"""
while True:
byte_len = binary_stdin.read(8)
if len(byte_len) == 8:
byte_len = struct.unpack("L", byte_len)[0]
result = binary_stdin.read(byte_len)
yield result
else:
assert len(byte_len) == 0, byte_len
return
def log(message):
"""
Log an informational message to stderr which will show up in MemSQL in
the event of transform failure.
"""
binary_stderr.write(message + b"\n")
def emit(message):
"""
Emit a record back to MemSQL by writing it to STDOUT. The record
should be formatted as JSON, Avro, or CSV as it will be parsed by
LOAD DATA.
"""
binary_stdout.write(message + b"\n")
log(b"Begin transform")
def send_insert_query(table_name, columns, values_arr):
sql = 'INSERT INTO %s ( %s ) VALUES %s;' % (table_name, columns, ','.join(values_arr))
emit({'q': sql})
# We start the transform here by reading from the input_stream() iterator.
for data in input_stream():
columns = ''
values_arr = []
for item in batch:
if item['op'] == 'd':
if values_arr:
send_insert_query(item['source']['table'], columns, values_arr)
sql = 'delete from ' + item['source']['table'] + ' where ' + item['source']['table'] + '.id = ' + item['before']['id'] + ';'
emit({'q': sql})
values_arr = []
else:
if not columns:
columns = ', '.join("`" + str(x).replace('/', '_') + "`" for x in mydict.keys())
values = ', '.join("'" + str(x).replace('/', '_') + "'" for x in mydict.values())
values_arr.append("( %s )" % values)
if values_arr:
send_insert_query(table_name, columns, values_arr)
log(b"End transform")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment