Skip to content

Instantly share code, notes, and snippets.

@shiumachi
Created November 28, 2018 06:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shiumachi/28e3167b50e643e95e79d315d4b07eb6 to your computer and use it in GitHub Desktop.
Save shiumachi/28e3167b50e643e95e79d315d4b07eb6 to your computer and use it in GitHub Desktop.
from kafka import KafkaConsumer
from kafka.client import KafkaClient
import kudu
from kudu.client import Partitioning
import argparse
def init_argumentparser():
parser = argparse.ArgumentParser()
parser.add_argument('--kudu_master_address', default='', type=str, required=True)
parser.add_argument('--kudu_master_port', default='7051', type=str)
parser.add_argument('--kudu_table', default='test_table', type=str)
parser.add_argument('--kafka_broker_address', default='', type=str, required=True)
parser.add_argument('--kafka_broker_port', default='9092', type=str)
parser.add_argument('--kafka_topic', default='test_topic', type=str)
parser.add_argument('--config', default='', type=str)
return parser
def insert_msg(msg, table, session):
ts = msg.timestamp
value = msg.value.decode('utf-8')
op = table.new_insert({'key': ts, 'value': value})
session.apply(op)
session.flush()
print("key={}, value={}".format(ts, value))
def create_table(kudu_client, kudu_table):
# Define a schema for a new table
builder = kudu.schema_builder()
builder.add_column('key').type(kudu.int64).nullable(False).primary_key()
builder.add_column('value', type_=kudu.string, nullable=False, compression='lz4')
schema = builder.build()
# Define partitioning schema
partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3)
# Create new table
kudu_client.create_table(kudu_table, schema, partitioning)
def create_kafka_topic(kafka_client, kafka_topic):
kafka_client.add_topic(kafka_topic)
def main():
argumentparser = init_argumentparser()
args = argumentparser.parse_args()
kudu_master_address = args.kudu_master_address
kudu_master_port = args.kudu_master_port
kudu_table = args.kudu_table
kafka_broker_address = args.kafka_broker_address
kafka_broker_port = args.kafka_broker_port
kafka_topic = args.kafka_topic
kudu_client = kudu.connect(host=kudu_master_address, port=kudu_master_port)
# create a table
tables = kudu_client.list_tables()
if kudu_table not in tables:
create_table(kudu_client, kudu_table)
# Open a table
table = kudu_client.table(kudu_table)
# Create a new session so that we can apply write operations
session = kudu_client.new_session()
kafka_bootstrap_servers = ':'.join([kafka_broker_address, str(kafka_broker_port)])
kafka_client = KafkaClient(bootstrap_servers=kafka_bootstrap_servers)
# Create a topic
topics = kafka_client.cluster.topics()
if kafka_topic not in topics:
create_kafka_topic(kafka_client, kafka_topic)
# Listen a topic as a consumer
consumer = KafkaConsumer(kafka_topic, bootstrap_servers=kafka_bootstrap_servers)
for msg in consumer:
insert_msg(msg, table, session)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment