Skip to content

Instantly share code, notes, and snippets.

@kunickiaj
Created November 10, 2015 00:15
Show Gist options
  • Save kunickiaj/365ab567b07291df8738 to your computer and use it in GitHub Desktop.
Save kunickiaj/365ab567b07291df8738 to your computer and use it in GitHub Desktop.
Patch to rebuild spark-streaming-kafka jar to work with HDP 2.3.2
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index 8a43eb7..59e9b62 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -28,6 +28,7 @@
<artifactId>spark-streaming-kafka_2.10</artifactId>
<properties>
<sbt.project.name>streaming-kafka</sbt.project.name>
+ <kafka.version>0.8.2.2.3.2.0-2950</kafka.version>
</properties>
<packaging>jar</packaging>
<name>Spark Project External Kafka</name>
@@ -50,7 +51,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
- <version>0.8.2.1</version>
+ <version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
@@ -100,4 +101,4 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index 6dc4e95..34a59f2 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -109,7 +109,7 @@ private class KafkaTestUtils extends Logging {
// Kafka broker startup
Utils.startServiceOnPort(brokerPort, port => {
brokerPort = port
- brokerConf = new KafkaConfig(brokerConfiguration)
+ brokerConf = KafkaConfig.fromProps(brokerConfiguration)
server = new KafkaServer(brokerConf)
server.startup()
(server, port)
@@ -251,7 +251,7 @@ private class KafkaTestUtils extends Logging {
ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
- leaderAndInSyncReplicas.isr.size >= 1
+ leaderAndInSyncReplicas.isr.nonEmpty
case _ =>
false
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment