Skip to content

Instantly share code, notes, and snippets.

@fbagci
Last active February 19, 2023 00:11
Show Gist options
  • Save fbagci/2122a72e74c7397e4573988b4ea4b776 to your computer and use it in GitHub Desktop.
Save fbagci/2122a72e74c7397e4573988b4ea4b776 to your computer and use it in GitHub Desktop.
Kafka image produce consume
! pip install kafka-python
import cv2
from kafka import KafkaConsumer
import numpy as np
import time
def benchmark(fn):
def _timing(*a, **kw):
st = time.perf_counter()
r = fn(*a, **kw)
ms = int((time.perf_counter() - st)*1000)
fps = "%.1f" % (1000/int((time.perf_counter() - st)*1000)) if ms != 0 else 1000
print(f"{fn.__name__} execution: {ms} ms, {fps} fps")
return r
return _timing
@benchmark
def consume_img_kafka(consumer):
msg = next(consumer)
return msg
@benchmark
def decode_byte_to_frame(byte):
frame = cv2.imdecode(np.frombuffer(byte, np.uint8), -1)
return frame
def main():
# Set kafka params
kafka_server, kafka_port = 'ip', 'port'
topic = 'cam1'
# Initialize conn params
kafka_conn_str = f'{kafka_server}:{kafka_port}'
# Initialize consumer
consumer = KafkaConsumer(topic, bootstrap_servers=kafka_conn_str)
counter = 0
while True:
msg = consume_img_kafka(consumer)
# print(msg)
# print(msg.topic)
# print(msg.timestamp)
# print(msg.value)
frame = decode_byte_to_frame(msg.value)
cv2.imwrite(f"./demo{counter}.jpg", frame)
counter += 1
if __name__ == "__main__":
main()
version: "3"
services:
zookeeper:
image: zookeeper:3.6.2
container_name: zookeeper
tty: true
ports:
- "2181:2181"
networks:
- app-tier
kafka-dev-01:
image: wurstmeister/kafka:2.13-2.7.0
container_name: kafka-dev-01
tty: true
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_HOST_NAME: <internal_ip_address>
networks:
- app-tier
producer-consumer:
image: jjanzic/docker-python3-opencv
volumes:
- ./image_producer:/image_producer
networks:
- app-tier
# Prevent container to stop immediately
stdin_open: true
tty: true
portainer:
image: portainer/portainer-ce
# container_name: container-manager-dev-01
restart: always
ports:
- 18000:8000
- 19000:9000
volumes:
- /var/run/docker.sock:/var/run/docker.sock
environment:
admin-password: admin
networks:
- app-tier
networks:
app-tier:
driver: bridge
! pip install kafka-python
import cv2
from kafka import KafkaProducer
import time
def benchmark(fn):
def _timing(*a, **kw):
st = time.perf_counter()
r = fn(*a, **kw)
ms = int((time.perf_counter() - st)*1000)
fps = "%.1f" % (1000/int((time.perf_counter() - st)*1000)) if ms != 0 else 1000
print(f"{fn.__name__} execution: {ms} ms, {fps} fps")
return r
return _timing
@benchmark
def capture_frame(capture):
return capture.read()
@benchmark
def encode_frame_to_byte(frame):
# Convert frame to byte
imgbyte = cv2.imencode('.jpg', frame)[1].tobytes()
return imgbyte
@benchmark
def produce_img_kafka(producer, frame, topic):
imgbyte = encode_frame_to_byte(frame)
# Send image to kafka
producer.send(topic, imgbyte)
def main():
# Set camera params
cam_ip, cam_port = 'ip', 'port'
# Set kafka params
kafka_server, kafka_port = 'kafka', 'port'
topic = 'cam1'
# Initialize conn params
cam_url = f"http://{cam_ip}:{cam_port}"
kafka_conn_str = f'{kafka_server}:{kafka_port}'
# Initialize procuder
producer = KafkaProducer(bootstrap_servers=kafka_conn_str)
# Read frame
capture = cv2.VideoCapture(cam_url)
counter = 0
while True:
status, frame = capture_frame(capture)
if status:
produce_img_kafka(producer, frame, topic)
counter += 1
if counter % 30 == 0:
producer.flush()
if __name__ == "__main__":
main()
! pip install kafka-python
import cv2
from kafka import KafkaProducer
def main():
# Initialize procuder
producer = KafkaProducer(bootstrap_servers='kafka-dev-01:9092')
# Read frame
img = cv2.imread("./demo.jpg")
# Convert frame to byte
imgbyte = cv2.imencode('.jpg', img)[1].tobytes()
# Send image to kafka
producer.send('cam1', imgbyte)
producer.flush()
print("DONE!")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment