Last active
February 19, 2023 00:11
-
-
Save fbagci/2122a72e74c7397e4573988b4ea4b776 to your computer and use it in GitHub Desktop.
Kafka image produce consume
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
! pip install kafka-python | |
import cv2 | |
from kafka import KafkaConsumer | |
import numpy as np | |
import time | |
def benchmark(fn): | |
def _timing(*a, **kw): | |
st = time.perf_counter() | |
r = fn(*a, **kw) | |
ms = int((time.perf_counter() - st)*1000) | |
fps = "%.1f" % (1000/int((time.perf_counter() - st)*1000)) if ms != 0 else 1000 | |
print(f"{fn.__name__} execution: {ms} ms, {fps} fps") | |
return r | |
return _timing | |
@benchmark | |
def consume_img_kafka(consumer): | |
msg = next(consumer) | |
return msg | |
@benchmark | |
def decode_byte_to_frame(byte): | |
frame = cv2.imdecode(np.frombuffer(byte, np.uint8), -1) | |
return frame | |
def main(): | |
# Set kafka params | |
kafka_server, kafka_port = 'ip', 'port' | |
topic = 'cam1' | |
# Initialize conn params | |
kafka_conn_str = f'{kafka_server}:{kafka_port}' | |
# Initialize consumer | |
consumer = KafkaConsumer(topic, bootstrap_servers=kafka_conn_str) | |
counter = 0 | |
while True: | |
msg = consume_img_kafka(consumer) | |
# print(msg) | |
# print(msg.topic) | |
# print(msg.timestamp) | |
# print(msg.value) | |
frame = decode_byte_to_frame(msg.value) | |
cv2.imwrite(f"./demo{counter}.jpg", frame) | |
counter += 1 | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: "3" | |
services: | |
zookeeper: | |
image: zookeeper:3.6.2 | |
container_name: zookeeper | |
tty: true | |
ports: | |
- "2181:2181" | |
networks: | |
- app-tier | |
kafka-dev-01: | |
image: wurstmeister/kafka:2.13-2.7.0 | |
container_name: kafka-dev-01 | |
tty: true | |
depends_on: | |
- zookeeper | |
ports: | |
- "9092:9092" | |
environment: | |
KAFKA_BROKER_ID: 1 | |
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" | |
KAFKA_ADVERTISED_HOST_NAME: <internal_ip_address> | |
networks: | |
- app-tier | |
producer-consumer: | |
image: jjanzic/docker-python3-opencv | |
volumes: | |
- ./image_producer:/image_producer | |
networks: | |
- app-tier | |
# Prevent container to stop immediately | |
stdin_open: true | |
tty: true | |
portainer: | |
image: portainer/portainer-ce | |
# container_name: container-manager-dev-01 | |
restart: always | |
ports: | |
- 18000:8000 | |
- 19000:9000 | |
volumes: | |
- /var/run/docker.sock:/var/run/docker.sock | |
environment: | |
admin-password: admin | |
networks: | |
- app-tier | |
networks: | |
app-tier: | |
driver: bridge |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
! pip install kafka-python | |
import cv2 | |
from kafka import KafkaProducer | |
import time | |
def benchmark(fn): | |
def _timing(*a, **kw): | |
st = time.perf_counter() | |
r = fn(*a, **kw) | |
ms = int((time.perf_counter() - st)*1000) | |
fps = "%.1f" % (1000/int((time.perf_counter() - st)*1000)) if ms != 0 else 1000 | |
print(f"{fn.__name__} execution: {ms} ms, {fps} fps") | |
return r | |
return _timing | |
@benchmark | |
def capture_frame(capture): | |
return capture.read() | |
@benchmark | |
def encode_frame_to_byte(frame): | |
# Convert frame to byte | |
imgbyte = cv2.imencode('.jpg', frame)[1].tobytes() | |
return imgbyte | |
@benchmark | |
def produce_img_kafka(producer, frame, topic): | |
imgbyte = encode_frame_to_byte(frame) | |
# Send image to kafka | |
producer.send(topic, imgbyte) | |
def main(): | |
# Set camera params | |
cam_ip, cam_port = 'ip', 'port' | |
# Set kafka params | |
kafka_server, kafka_port = 'kafka', 'port' | |
topic = 'cam1' | |
# Initialize conn params | |
cam_url = f"http://{cam_ip}:{cam_port}" | |
kafka_conn_str = f'{kafka_server}:{kafka_port}' | |
# Initialize procuder | |
producer = KafkaProducer(bootstrap_servers=kafka_conn_str) | |
# Read frame | |
capture = cv2.VideoCapture(cam_url) | |
counter = 0 | |
while True: | |
status, frame = capture_frame(capture) | |
if status: | |
produce_img_kafka(producer, frame, topic) | |
counter += 1 | |
if counter % 30 == 0: | |
producer.flush() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
! pip install kafka-python | |
import cv2 | |
from kafka import KafkaProducer | |
def main(): | |
# Initialize procuder | |
producer = KafkaProducer(bootstrap_servers='kafka-dev-01:9092') | |
# Read frame | |
img = cv2.imread("./demo.jpg") | |
# Convert frame to byte | |
imgbyte = cv2.imencode('.jpg', img)[1].tobytes() | |
# Send image to kafka | |
producer.send('cam1', imgbyte) | |
producer.flush() | |
print("DONE!") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment