Skip to content

Instantly share code, notes, and snippets.

@meysampg
Created July 20, 2022 09:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save meysampg/0c05c74bfbebc8e87ce405ac1e5fc2a3 to your computer and use it in GitHub Desktop.
Save meysampg/0c05c74bfbebc8e87ce405ac1e5fc2a3 to your computer and use it in GitHub Desktop.
Produce Coingecko Recorrds to Kafka
import requests
import datetime
from kafka import KafkaProducer
from json import dumps
from kafka.errors import KafkaError
def produce():
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
response = requests.get('https://api.coingecko.com/api/v3/exchange_rates').json()['rates']
result = [dict(response[k], **{'timestamp': datetime.datetime.now().isoformat()}) for k in response]
for msg in result:
future = producer.send('druid-sample-data', value=msg)
try:
future.get(timeout=10)
except KafkaError:
print('failed')
pass
if __name__ == '__main__':
produce()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment