Skip to content

Instantly share code, notes, and snippets.

@tgruben
Created August 26, 2019 16:26
Show Gist options
  • Save tgruben/e5bf6382ba75e7f0e1d871b5d85e3299 to your computer and use it in GitHub Desktop.
Save tgruben/e5bf6382ba75e7f0e1d871b5d85e3299 to your computer and use it in GitHub Desktop.
import org.apache.log4j.{Level, Logger}
import util.Random
import org.apache.spark.sql.functions._
import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext,SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.Partitioner
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SparkSession
import com.pilosa.client.{Internal}
import com.pilosa.roaring.Bitmap
import org.apache.http.client.methods.HttpPost
import com.google.protobuf.ByteString
import org.apache.http.entity.ByteArrayEntity
import org.apache.http.impl.client.DefaultHttpClient
case class RowCol(row: Int, col: Int)
object RowCol {
implicit def orderingByRowCol[A <: RowCol] : Ordering[A] = {
Ordering.by(fk => (fk.row, fk.col))
}
}
class ShuffleToShard(maxShards: Int, shardWidth: Int ) extends Partitioner {
override def numPartitions: Int = maxShards
override def getPartition(key: Any ): Int = {
val k = key.asInstanceOf[RowCol]
return k.col/shardWidth
}
}
object SimpleApp {
val shardWidth:Int= 1<<20
def processShard(iterator: Iterator[(RowCol,List[String])], indexName:String,fieldName:String) ={
println("sendPilosa")
var calcShard :Int = 0
var views = scala.collection.mutable.Map[String, Bitmap]()
while (iterator.hasNext) {
val (bit,viewName) = iterator.next()
// println(item._1+" "+item._2._1+" "+item._2._2)
if ( !views.contains(viewName(0))){
val bm = new Bitmap()
views(viewName(0))= bm
}
views(viewName(0)).add(bit.row * shardWidth + (bit.col % shardWidth))
calcShard= bit.col
}
calcShard = calcShard/shardWidth
sendToPilosa(views,calcShard,indexName,fieldName)
}
// can import roaring handle multiple views?
// Yes a import roaring request can have many views for a single field
def sendToPilosa(views : scala.collection.mutable.Map[String,Bitmap], shard :Int, indexName:String, fieldName:String) ={
val reqBuilder = Internal.ImportRoaringRequest.newBuilder()
views.foreach{
case(viewName,bitmap)=>
val data = ByteString.copyFrom(bitmap.serialize.array);
val view = Internal.ImportRoaringRequestView.newBuilder()
.setName(viewName)
.setData(data)
.build();
reqBuilder.addViews(view);
}
println("BUILD")
val payload = reqBuilder.build().toByteArray()
println("CONVERT")
val data = new ByteArrayEntity(payload)
val path = "http://localhost:10101/index/%s/field/%s/import".format(indexName, fieldName)
println("path",path)
val post = new HttpPost(path)
// set the Content-type
post.setHeader("Content-Type","application/x-protobuf")
post.setHeader("Accept", "application/x-protobuf")
post.setHeader("PQL-Version", "1.0")
post.setEntity(data)
// send the post request
println("upload")
val response = (new DefaultHttpClient).execute(post)
// print the response headers
println("--- HEADERS ---")
response.getAllHeaders.foreach(arg => println(arg))
}
def createKeyValueTuple(data: Array[String]) :(RowCol,List[String]) = {
(createKey(data),listData(data))
}
def createKey(data: Array[String]): RowCol = {
RowCol(data(0).toInt, data(1).toInt)
}
def listData(data: Array[String]): List[String] = {
List("standard")
}
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
println("GO NOW")
val spark = SparkSession.builder.appName("Sample Application").getOrCreate()
val rdd = spark.sparkContext.textFile("file:////mnt/disks/data1/132/132-d.bits").map(line => line.split(","))
val maxKey = rdd.max()(new Ordering[Array[String]]() {
override def compare(x: Array[String], y: Array[String]): Int =
Ordering[Int].compare(x(1).toInt, y(1).toInt)
})
val numShards= maxKey(1).toInt/shardWidth + 1
println("Numshards:",numShards)
val keyValueTuple = rdd
.map(arr => createKeyValueTuple(arr))
.repartitionAndSortWithinPartitions(new ShuffleToShard(numShards,shardWidth))
keyValueTuple.foreachPartition(partition => processShard(partition, "i0","f1"))
spark.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment