Skip to content

Instantly share code, notes, and snippets.

@kartben
Last active October 7, 2023 16:16
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save kartben/614fea74e9c67df0aae0 to your computer and use it in GitHub Desktop.
Save kartben/614fea74e9c67df0aae0 to your computer and use it in GitHub Desktop.
Example of how to use Spark Streaming for MQTT data consolidation
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This example will consume temperature data (or any other numerical values, really)
from an MQTT broker, and consolidate/graph this data on a 15-second sliding window.
This work is based on the original mqtt_wordcount.py sample from the Apache Spark codebase
Running the example:
`$ bin/spark-submit --jars \
external/mqtt-assembly/target/spark-streaming-mqtt-assembly_*.jar \
mqtt_spark_streaming.py`
"""
def is_number(s):
try:
float(s)
return True
except ValueError:
return False
import sys
import operator
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
sc = SparkContext(appName="TemperatureHistory")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
# broker URI
brokerUrl = "tcp://192.168.2.26:1883" # "tcp://iot.eclipse.org:1883"
# topic or topic pattern where temperature data is being sent
topic = "+/+/sensors/temperature"
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
counts = mqttStream \
.filter(lambda message: is_number(message)) \
.map(lambda message: ( round(float(message) * 2, 0) / 2, 1 )) \
.reduceByKeyAndWindow(operator.add, operator.sub, 15, 1) \
.transform(lambda rdd: rdd.sortByKey())
def printHistogram(time, rdd):
c = rdd.collect()
print("-------------------------------------------")
print("Time: %s" % time)
print("-------------------------------------------")
for record in c:
# "draw" our lil' ASCII-based histogram
print(str(record[0]) + ': ' + '#'*record[1])
print("")
counts.foreachRDD(printHistogram)
ssc.start()
ssc.awaitTermination()
@usama999
Copy link

Hello ! i am facing an issue while running this, it says "ModuleNotFoundError: No module named 'pyspark.streaming.mqtt'" what could be the solution ?

@massimocallisto
Copy link

Hi,
I had the same problem but running the code using Apache Bahir package worked for me. However I made some changes to the code.
My Apache Spark version is 2.45.
First I started pyspark with this command:
pyspark --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.4.0

When the console is ready I can run the code example changing
from pyspark.streaming.mqtt import MQTTUtils to from mqtt import MQTTUtils

Also I had to change also the MQTT connection to:
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic, username=None, password=None)

@BennisonDevadoss
Copy link

@massimocallisto When I run the command pyspark --packages org.apache.bahir:spark-streaming-mqtt_2.11:3.3.1, I got the following error
module not found: org.apache.bahir#spark-streaming-mqtt_2.11;3.3.1

Please let me know if you have any solution?

@kartben
Copy link
Author

kartben commented Oct 7, 2023

This code is really old so I doubt it still works, unfortunately.

@BennisonDevadoss
Copy link

@kartben , It is not working, So is there any other way to do spark streaming with MQTT protocol?

@kartben
Copy link
Author

kartben commented Oct 7, 2023

@BennisonDevadoss have you tried the latest version of the official samples which this code was originally inspired from?
https://github.com/apache/bahir/tree/master/streaming-mqtt/examples

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment