Skip to content

Instantly share code, notes, and snippets.

View rezaisrad's full-sized avatar

Reza Rad rezaisrad

View GitHub Profile
from kafka import KafkaProducer # pip install kafka-python
import csv
import json
import time
from datetime import datetime
# Connect to Kafka
producer = KafkaProducer(bootstrap_servers='ec2-xx-xx-xxx-xx.compute-1.amazonaws.com:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
topic = 'topic1'
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
spark = SparkSession \
.builder \
.appName("StructuredStreamKafka") \
.getOrCreate()
# Subscribe to topic named "topic1", Replace the xxx in .option() with your Kafka public DNS address
df_stream = spark \
## $ ssh -i "location of pem key" server_username@public_dns_address ##
$ ssh -i "~/.ssh/keyname.pem" ubuntu@ec2-35-143-351-222.compute-1.amazonaws.com
## $ ssh -i "location of pem key" server_username@public_dns_address ##
$ ssh -i "~/.ssh/keyname.pem" ubuntu@ec2-35-143-351-222.compute-1.amazonaws.com