Skip to content

Instantly share code, notes, and snippets.

@apoclyps
Created August 10, 2018 16:35
Show Gist options
  • Save apoclyps/e3909ba54aa93bba293d4983a74e1cf8 to your computer and use it in GitHub Desktop.
Save apoclyps/e3909ba54aa93bba293d4983a74e1cf8 to your computer and use it in GitHub Desktop.

kafka-python SSLSocket Error

Issue

There appears to be an issue in kafka-python which manifests when building with python:3.7-alpine. The issue is not present in ``python:3.6.0-alpine`.

Reproduction

The example code runs in a container and relies on a linked Kafka container built from paddycarey/kafka. All of this assumes you have a working Docker setup and that Docker Compose is installed.

The following steps will reproduce the issue:

  1. Set up the required Kafka dependency by running:
$ docker-compose up -d kafka
  1. Run the example consumer:
$ docker-compose up consumer

Observe that the consumer fails to start when using kafka-python==1.4.3. You should see output similar to the following:

consumer_1  | Traceback (most recent call last):
consumer_1  |   File "consumer.py", line 40, in <module>
consumer_1  |     consume()
consumer_1  |   File "consumer.py", line 21, in consume
consumer_1  |     ssl_context=ssl_context)
consumer_1  |   File "/usr/local/lib/python3.7/site-packages/kafka/consumer/group.py", line 340, in __init__
consumer_1  |     self._client = KafkaClient(metrics=self._metrics, **self.config)
consumer_1  |   File "/usr/local/lib/python3.7/site-packages/kafka/client_async.py", line 214, in __init__
consumer_1  |     self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
consumer_1  |   File "/usr/local/lib/python3.7/site-packages/kafka/client_async.py", line 245, in _bootstrap
consumer_1  |     if not bootstrap.connect_blocking():
consumer_1  |   File "/usr/local/lib/python3.7/site-packages/kafka/conn.py", line 301, in connect_blocking
consumer_1  |     self.connect()
consumer_1  |   File "/usr/local/lib/python3.7/site-packages/kafka/conn.py", line 354, in connect
consumer_1  |     ret = self._sock.connect_ex(self._sock_addr)
consumer_1  |   File "/usr/local/lib/python3.7/ssl.py", line 1146, in connect_ex
consumer_1  |     return self._real_connect(addr, True)
consumer_1  |   File "/usr/local/lib/python3.7/ssl.py", line 1118, in _real_connect
consumer_1  |     raise ValueError("attempt to connect already-connected SSLSocket!")
consumer_1  | ValueError: attempt to connect already-connected SSLSocket!

** Reproducing the above steps with kafka-python==1.4.2 produces output similar to the following: **

consumer_1  | Traceback (most recent call last):
consumer_1  |   File "consumer.py", line 7, in <module>
consumer_1  |     from kafka import KafkaConsumer
consumer_1  |   File "/usr/local/lib/python3.7/site-packages/kafka/__init__.py", line 23, in <module>
consumer_1  |     from kafka.producer import KafkaProducer
consumer_1  |   File "/usr/local/lib/python3.7/site-packages/kafka/producer/__init__.py", line 4, in <module>
consumer_1  |     from kafka.producer.simple import SimpleProducer
consumer_1  |   File "/usr/local/lib/python3.7/site-packages/kafka/producer/simple.py", line 54
consumer_1  |     return '<SimpleProducer batch=%s>' % self.async
consumer_1  |                                                   ^
consumer_1  | SyntaxError: invalid syntax
# stdlib imports
import logging
import ssl
import sys
# third-party imports
from kafka import KafkaConsumer
def consume():
""" Implements a basic Kafka consumer for test and debugging purposes.
"""
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.verify_mode = ssl.CERT_NONE
ssl_context.check_hostname = False
consumer = KafkaConsumer("test-topic",
group_id="test-group",
bootstrap_servers="kafka:9093",
security_protocol="SSL",
ssl_context=ssl_context)
while True:
response = consumer.poll(timeout_ms=30000)
messages = []
for message in _extract_messages(response):
messages.append(message)
print("Received %d messages" % len(messages))
def _extract_messages(kafka_response):
for topic_partition in kafka_response.values():
yield from topic_partition
if __name__ == "__main__":
_logger = logging.getLogger()
_logger.addHandler(logging.StreamHandler(stream=sys.stdout))
_logger.setLevel("INFO")
consume()
consumer:
build: .
command: python consumer.py
links:
- kafka
producer:
build: .
command: python producer.py
links:
- kafka
# runs Kafka + Zookeeper in a single container for development purposes
kafka:
image: paddycarey/kafka:1.0.1
ports:
- "9092:9092"
- "9093:9093"
volumes:
- "./.data/kafka/:/var/lib/kafka"
environment:
- KAFKA_ADVERTISED_HOST=kafka
FROM python:3.7-alpine
MAINTAINER Declan Traynor <coderdex@gmail.com>
WORKDIR /usr/src/app
RUN apk add --no-cache build-base
# install application dependencies inside the container
COPY requirements.txt /usr/src/app/
RUN pip install --no-cache-dir -r requirements.txt
# copy the application source into the container
COPY . /usr/src/app
appdirs==1.4.0
kafka-python==1.4.2
lz4tools==1.3.1.2
packaging==16.8
pyparsing==2.1.10
six==1.10.0
xxhash==1.0.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment