Skip to content

Instantly share code, notes, and snippets.

@marmbrus
Created September 8, 2015 18:23
Show Gist options
  • Save marmbrus/50da3d78eaa1acbd7bee to your computer and use it in GitHub Desktop.
Save marmbrus/50da3d78eaa1acbd7bee to your computer and use it in GitHub Desktop.
package com.databricks.spark.jira
import scala.io.Source
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources.{TableScan, BaseRelation, RelationProvider}
import java.nio.charset.Charset
import com.fasterxml.jackson.databind._
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import org.apache.commons.codec.binary.Base64
import org.apache.http.client.methods._
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.DefaultHttpClient
/**
* A simple Spark SQL Data Source for JIRA.
*/
class DefaultSource extends RelationProvider {
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
JiraRelation(
parameters("url"),
parameters("user"),
parameters("password"),
parameters.getOrElse("query", ""))(sqlContext)
}
}
case class JiraRelation(url: String, user: String, password: String, query: String)(
@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan {
import sqlContext.implicits._
val client = new JiraRestClient(url, user, password)
val baseData = {
var startAt = 0L
val rdds = new scala.collection.mutable.ArrayBuffer[RDD[String]]
def getNext = {
val curRDD = sqlContext.sparkContext.parallelize(client.getIssues(startAt, query) :: Nil)
val curDF = sqlContext.jsonRDD(curRDD)
val issues = curDF.select(explode($"issues"))
startAt += issues.count()
(curRDD, issues.count())
}
var (curRDD, count) = getNext
while (count > 0) {
println(startAt) // NOLINT
rdds += curRDD
val x = getNext
curRDD = x._1
count = x._2
}
val allDFs = sqlContext.jsonRDD(rdds.reduceLeft(_ union _))
allDFs.printSchema()
allDFs.select(explode('issues).as('issue))
}
def schema = baseData.schema
def buildScan() = baseData.rdd
}
case class JiraIssue(key: String, summary: String)
/**
* A homebrewed JIRA client
* @param user Username to authenticate with
* @param password to authenticate with
*/
class JiraRestClient(url: String, user: String, password: String) {
trait JiraQuery {
def toJson = mapper.writeValueAsString(this)
}
case class JqlQuery(
jql: String,
startAt: Int = 0,
maxResults: Int = 10,
validateQuery: Boolean = false,
fields: List[String] = List.empty) extends JiraQuery
private val mapper = new ObjectMapper with ScalaObjectMapper {
registerModule(DefaultScalaModule)
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
}
/**
* Given a JQL query, extract the flaky tests from the Affected Tests field
* @param jql The query to run
*/
def getIssues(startAt: Long, query: String): String = {
val post = new HttpPost(url)
post.setEntity(new StringEntity(JqlQuery(query, startAt = startAt.toInt).toJson))
val json = getRestContent(post)
json
}
private val base64Auth =
Base64.encodeBase64String(s"$user:$password".getBytes(Charset.forName("UTF-8")))
private def getContent(url: String): String = {
val request = new HttpGet(url)
getRestContent(request)
}
private def getRestContent(request: HttpUriRequest): String = {
request.setHeader("Authorization", s"Basic $base64Auth")
request.setHeader("Content-Type", "application/json")
val httpClient = new DefaultHttpClient()
val httpResponse = httpClient.execute(request)
val content = Option(httpResponse.getEntity()).map { entity =>
val inputStream = entity.getContent()
try {
Source.fromInputStream(inputStream).getLines.mkString
} finally {
inputStream.close()
httpClient.getConnectionManager().shutdown()
}
}.getOrElse("")
return content
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment