Skip to content

Instantly share code, notes, and snippets.

@weltonrodrigo
Last active February 23, 2022 17:51
Show Gist options
  • Save weltonrodrigo/b47d44c3027cabb2a44115464e314dc6 to your computer and use it in GitHub Desktop.
Save weltonrodrigo/b47d44c3027cabb2a44115464e314dc6 to your computer and use it in GitHub Desktop.
Demonstração de como conectar no azure eventhub pela api kafka no python.
import logging
import pprint
import sys
from json import dumps
from typing import List
from kafka import KafkaProducer
from kafka.errors import KafkaError
from kafka.future import Future
# Estes são criados e obtidos no portal azure.
EVENTHUB_NAMESPACE='my_evenhub_namespace'
EVENTHUB_INSTANCE='my_instance' # this is the topic
SHARED_ACCESS_POLICY_PRIMARY_CONNECTION_STRING='Endpoint=sb://my_evenhub_namespace.servicebus.windows.net/;SharedAccessKeyName=wsocrspia;SharedAccessKey=HG5pES+3LtLi0t4cdEB5Az2zI7VSTHvxXXO+UBKxuD0=;EntityPath=my_instance'
SHARED_ACCESS_POLICY_NAME='my_policy_name'
logger = logging.getLogger('kafka')
# logger.addHandler(logging.StreamHandler(sys.stdout))
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
logger.info("Conectando...")
producer = KafkaProducer(bootstrap_servers=[f'{EVENTHUB_NAMESPACE}.servicebus.windows.net:9093'], # PORT IS ALWAYS 9093
client_id=SHARED_ACCESS_POLICY_NAME, # api_version=(1, 0, 0),
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username='$ConnectionString', # literalmente o username é $ConnectionString, não é uma variável.
sasl_plain_password=SHARED_ACCESS_POLICY_PRIMARY_CONNECTION_STRING,
value_serializer=lambda x:
dumps(x).encode('utf-8'))
logger.info("Conectado.")
def send_um(topico: str, message: str) -> Future:
return producer.send(topico, message)
logger.info("Enviando assíncrono...")
futures: List[Future] = []
for i in range(10):
now = datetime.now().isoformat()
msg = f'{now} The book is on the table #{i:03}'
future = send_um(EVENTHUB_INSTANCE, msg)
futures.append(future)
for i, future in enumerate(futures):
try:
metadata = future.get(timeout=2)
logger.info(f'Confirmado envio do {i:03}.')
except KafkaError:
logger.exception()
logger.info("Encerrando o producer...")
producer.close()
logger.info("Encerrado.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment