Skip to content

Instantly share code, notes, and snippets.

@rochoa rochoa/consumer.py

Created Mar 20, 2020
Embed
What would you like to do?
import click
import csv
import json
import requests
from io import StringIO
from datetime import datetime
from kafka import KafkaConsumer
import atexit
def flush(endpoint, datasource_name, token, rows):
try:
csv_chunk = StringIO()
writer = csv.writer(csv_chunk, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerows(rows)
except:
print(f"Failed to process rows as block, row-by-row processing fallback")
csv_chunk = StringIO()
writer = csv.writer(csv_chunk, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
for r in rows:
try:
writer.writerow(r)
except:
print(f"Failed to process row =", r)
append_url = f'{endpoint}/v0/datasources?mode=append&name={datasource_name}'
response = requests.post(append_url,
data=csv_chunk.getvalue(),
headers={'Authorization': f'Bearer {token}'}
)
print(f"[{datetime.now()}] status={response.status_code} datasource={datasource_name}, rows={len(rows)}")
@click.command()
@click.argument('topic')
@click.argument('datasource_name')
@click.option('--token', envvar='TOKEN')
@click.option('--endpoint', default='https://api.tinybird.co')
def consume(topic, datasource_name, token, endpoint):
consumer = KafkaConsumer(topic, value_deserializer=json.loads, group_id=f"tb_{datasource_name}")
rows = []
atexit.register(lambda: flush(endpoint, datasource_name, token, rows))
for msg in consumer:
rows.append(msg.value)
if len(rows) >= 20000:
flush(endpoint, datasource_name, token, rows)
rows = []
if __name__ == '__main__':
consume()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.