Skip to content

Instantly share code, notes, and snippets.

View pavangadagi's full-sized avatar

Pavan Gadagi pavangadagi

View GitHub Profile
const DISCOUNT_TYPES = {
BUY_2_GET_3: "BUY_2_GET_3",
BULK: "BULK",
}
const ITEM_LIST = {
APPLE_TV: "atv",
SUPER_IPAD: "ipd"
}
int STREAM_INTERVAL_SEC = 60;
SparkConf sparkConf = new SparkConf().setAppName("CryptoStockAnalysis").setMaster("local[*]");
JavaStreamingContext streamingContext = new JavaStreamingContext(
sparkConf,
Durations.seconds(STREAM_INTERVAL_SEC)
);
Set<String> topicSet = new HashSet<String>(Arrays.asList(topics.split(",")));
JavaInputDStream<ConsumerRecord<String,JsonNode>> streams = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicSet, kafkaParams));
Map<String,Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>