Skip to content

Instantly share code, notes, and snippets.

View dvainrub's full-sized avatar

Doron Vainrub dvainrub

View GitHub Profile
# From your terminal run:
> docker exec -i -t -u root $(docker ps | grep docker_kafka | cut -d' ' -f1) /bin/bash
# $(docker ps | grep docker_kafka | cut -d' ' -f1) - Will return the docker process ID of the Kafka Docker running so you can acces it
# Create a topic
bash> $KAFKA_HOME/bin/kafka-topics.sh --create --partitions 4 --bootstrap-server kafka:9092 --topic test
# Create a consumer
bash> $KAFKA_HOME/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server kafka:9092 --topic=test
from kafka import KafkaProducer
from json import dumps
producer = KafkaProducer(
value_serializer=lambda m: dumps(m).encode('utf-8'),
bootstrap_servers=['172.17.0.1:32783','172.17.0.1:32782','172.17.0.1:32781'])
producer.send("test", value={"hello": "producer"})
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'test',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group-1',
value_deserializer=lambda m: loads(m.decode('utf-8')),
bootstrap_servers=['172.17.0.1:32783','172.17.0.1:32782','172.17.0.1:32781'])
> git clone https://github.com/wurstmeister/kafka-docker.git
> cd kafka-docker
# Update KAFKA_ADVERTISED_HOST_NAME inside 'docker-compose.yml',
# For example, set it to 172.17.0.1
> vim docker-compose.yml
> docker-compose up -d
# Optional - Scale the cluster by adding more brokers (Will start a single zookeeper instance)
> docker-compose scale kafka=3
@dvainrub
dvainrub / docker-compose.yml
Last active June 6, 2019 10:44
Expose the Docker network and Kafka brokers
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
build: .
ports:
- "9092:9092"
import findspark
findspark.init()
import pyspark
...
cmd> pip install findspark
cmd> jupyter notebook
cmd> pyspark
>>> nums = sc.parallelize([1,2,3,4])
>>> nums.map(lambda x: x*x).collect()
cmd> spark-shell
scala> spark.range(1).withColumn(“status”, lit(“All seems fine. Congratulations!”)).show(false)
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
sc = spark.sparkContext
return spark,sc
def main():
spark,sc = init_spark()
nums = sc.parallelize([1,2,3,4])