Skip to content

Instantly share code, notes, and snippets.

@ilyasahsan123
Created January 28, 2019 14:53
Show Gist options
  • Save ilyasahsan123/604b6c9d98f193e826464cb682357f3f to your computer and use it in GitHub Desktop.
Save ilyasahsan123/604b6c9d98f193e826464cb682357f3f to your computer and use it in GitHub Desktop.
import json
from kafka import KafkaConsumer
from google.cloud import bigquery
if __name__ == "__main__":
# bigwuery configuration
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset('DATASET_NAME')
table_ref = dataset_ref.table('TABLE_NAME')
table = bigquery_client.get_table(table_ref)
# set kafka topic name
consumer = KafkaConsumer('KAFKA_TOPIC')
# transformation and send data to bigquery
for message in consumer:
data = json.loads(message.value)
user = data['payload']['after']
stored_data = [
(user['id'], user['firstname'], user['lastname'])
]
bigquery_client.insert_rows(table, stored_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment