Last active
February 23, 2022 17:51
-
-
Save weltonrodrigo/b47d44c3027cabb2a44115464e314dc6 to your computer and use it in GitHub Desktop.
Demonstração de como conectar no azure eventhub pela api kafka no python.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import pprint | |
import sys | |
from json import dumps | |
from typing import List | |
from kafka import KafkaProducer | |
from kafka.errors import KafkaError | |
from kafka.future import Future | |
# Estes são criados e obtidos no portal azure. | |
EVENTHUB_NAMESPACE='my_evenhub_namespace' | |
EVENTHUB_INSTANCE='my_instance' # this is the topic | |
SHARED_ACCESS_POLICY_PRIMARY_CONNECTION_STRING='Endpoint=sb://my_evenhub_namespace.servicebus.windows.net/;SharedAccessKeyName=wsocrspia;SharedAccessKey=HG5pES+3LtLi0t4cdEB5Az2zI7VSTHvxXXO+UBKxuD0=;EntityPath=my_instance' | |
SHARED_ACCESS_POLICY_NAME='my_policy_name' | |
logger = logging.getLogger('kafka') | |
# logger.addHandler(logging.StreamHandler(sys.stdout)) | |
logging.basicConfig( | |
format='%(asctime)s %(levelname)-8s %(message)s', | |
level=logging.INFO, | |
datefmt='%Y-%m-%d %H:%M:%S') | |
logger.info("Conectando...") | |
producer = KafkaProducer(bootstrap_servers=[f'{EVENTHUB_NAMESPACE}.servicebus.windows.net:9093'], # PORT IS ALWAYS 9093 | |
client_id=SHARED_ACCESS_POLICY_NAME, # api_version=(1, 0, 0), | |
security_protocol="SASL_SSL", | |
sasl_mechanism="PLAIN", | |
sasl_plain_username='$ConnectionString', # literalmente o username é $ConnectionString, não é uma variável. | |
sasl_plain_password=SHARED_ACCESS_POLICY_PRIMARY_CONNECTION_STRING, | |
value_serializer=lambda x: | |
dumps(x).encode('utf-8')) | |
logger.info("Conectado.") | |
def send_um(topico: str, message: str) -> Future: | |
return producer.send(topico, message) | |
logger.info("Enviando assíncrono...") | |
futures: List[Future] = [] | |
for i in range(10): | |
now = datetime.now().isoformat() | |
msg = f'{now} The book is on the table #{i:03}' | |
future = send_um(EVENTHUB_INSTANCE, msg) | |
futures.append(future) | |
for i, future in enumerate(futures): | |
try: | |
metadata = future.get(timeout=2) | |
logger.info(f'Confirmado envio do {i:03}.') | |
except KafkaError: | |
logger.exception() | |
logger.info("Encerrando o producer...") | |
producer.close() | |
logger.info("Encerrado.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment