Skip to content

Instantly share code, notes, and snippets.

@pRoy24
Created February 16, 2018 06:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pRoy24/e91f8e34b3f9f8cb563379afde63b8ef to your computer and use it in GitHub Desktop.
Save pRoy24/e91f8e34b3f9f8cb563379afde63b8ef to your computer and use it in GitHub Desktop.
Creating a data processor configuration for Spark
//Conf File for Kafka
public static void main(String[] args) throws Exception {
//read config file
Properties prop = PropertyFileReader.readPropertyFile();
String zookeeper = prop.getProperty("com.iot.app.kafka.zookeeper");
String brokerList = prop.getProperty("com.iot.app.kafka.brokerlist");
String topic = prop.getProperty("com.iot.app.kafka.topic");
logger.info("Using Zookeeper=" + zookeeper + " ,Broker-list=" + brokerList + " and topic " + topic);
// set producer properties
Properties properties = new Properties();
properties.put("zookeeper.connect", zookeeper);
properties.put("metadata.broker.list", brokerList);
properties.put("request.required.acks", "1");
properties.put("serializer.class", "com.iot.app.kafka.util.IoTDataEncoder");
//generate event
Producer<String, IoTData> producer = new Producer<String, IoTData>(new ProducerConfig(properties));
IoTDataProducer iotProducer = new IoTDataProducer();
iotProducer.generateIoTEvent(producer,topic);
}
// Code sourced from https://github.com/pRoy24/tangled-economy
//read Spark and Cassandra properties and create SparkConf
Properties prop = PropertyFileReader.readPropertyFile();
SparkConf conf = new SparkConf()
.setAppName(prop.getProperty("com.iot.app.spark.app.name"))
.setMaster(prop.getProperty("com.iot.app.spark.master"))
.set("spark.cassandra.connection.host", prop.getProperty("com.iot.app.cassandra.host"))
.set("spark.cassandra.connection.port", prop.getProperty("com.iot.app.cassandra.port"))
.set("spark.cassandra.connection.keep_alive_ms", prop.getProperty("com.iot.app.cassandra.keep_alive"));
//batch interval of 5 seconds for incoming stream
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
//add check point directory
jssc.checkpoint(prop.getProperty("com.iot.app.spark.checkpoint.dir"));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment