Created
August 26, 2019 16:26
-
-
Save tgruben/e5bf6382ba75e7f0e1d871b5d85e3299 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.log4j.{Level, Logger} | |
import util.Random | |
import org.apache.spark.sql.functions._ | |
import scala.reflect.ClassTag | |
import org.apache.spark.{Partition, TaskContext,SparkContext} | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.Partitioner | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.sql.SparkSession | |
import com.pilosa.client.{Internal} | |
import com.pilosa.roaring.Bitmap | |
import org.apache.http.client.methods.HttpPost | |
import com.google.protobuf.ByteString | |
import org.apache.http.entity.ByteArrayEntity | |
import org.apache.http.impl.client.DefaultHttpClient | |
case class RowCol(row: Int, col: Int) | |
object RowCol { | |
implicit def orderingByRowCol[A <: RowCol] : Ordering[A] = { | |
Ordering.by(fk => (fk.row, fk.col)) | |
} | |
} | |
class ShuffleToShard(maxShards: Int, shardWidth: Int ) extends Partitioner { | |
override def numPartitions: Int = maxShards | |
override def getPartition(key: Any ): Int = { | |
val k = key.asInstanceOf[RowCol] | |
return k.col/shardWidth | |
} | |
} | |
object SimpleApp { | |
val shardWidth:Int= 1<<20 | |
def processShard(iterator: Iterator[(RowCol,List[String])], indexName:String,fieldName:String) ={ | |
println("sendPilosa") | |
var calcShard :Int = 0 | |
var views = scala.collection.mutable.Map[String, Bitmap]() | |
while (iterator.hasNext) { | |
val (bit,viewName) = iterator.next() | |
// println(item._1+" "+item._2._1+" "+item._2._2) | |
if ( !views.contains(viewName(0))){ | |
val bm = new Bitmap() | |
views(viewName(0))= bm | |
} | |
views(viewName(0)).add(bit.row * shardWidth + (bit.col % shardWidth)) | |
calcShard= bit.col | |
} | |
calcShard = calcShard/shardWidth | |
sendToPilosa(views,calcShard,indexName,fieldName) | |
} | |
// can import roaring handle multiple views? | |
// Yes a import roaring request can have many views for a single field | |
def sendToPilosa(views : scala.collection.mutable.Map[String,Bitmap], shard :Int, indexName:String, fieldName:String) ={ | |
val reqBuilder = Internal.ImportRoaringRequest.newBuilder() | |
views.foreach{ | |
case(viewName,bitmap)=> | |
val data = ByteString.copyFrom(bitmap.serialize.array); | |
val view = Internal.ImportRoaringRequestView.newBuilder() | |
.setName(viewName) | |
.setData(data) | |
.build(); | |
reqBuilder.addViews(view); | |
} | |
println("BUILD") | |
val payload = reqBuilder.build().toByteArray() | |
println("CONVERT") | |
val data = new ByteArrayEntity(payload) | |
val path = "http://localhost:10101/index/%s/field/%s/import".format(indexName, fieldName) | |
println("path",path) | |
val post = new HttpPost(path) | |
// set the Content-type | |
post.setHeader("Content-Type","application/x-protobuf") | |
post.setHeader("Accept", "application/x-protobuf") | |
post.setHeader("PQL-Version", "1.0") | |
post.setEntity(data) | |
// send the post request | |
println("upload") | |
val response = (new DefaultHttpClient).execute(post) | |
// print the response headers | |
println("--- HEADERS ---") | |
response.getAllHeaders.foreach(arg => println(arg)) | |
} | |
def createKeyValueTuple(data: Array[String]) :(RowCol,List[String]) = { | |
(createKey(data),listData(data)) | |
} | |
def createKey(data: Array[String]): RowCol = { | |
RowCol(data(0).toInt, data(1).toInt) | |
} | |
def listData(data: Array[String]): List[String] = { | |
List("standard") | |
} | |
def main(args: Array[String]) { | |
Logger.getLogger("org.apache.spark").setLevel(Level.WARN) | |
println("GO NOW") | |
val spark = SparkSession.builder.appName("Sample Application").getOrCreate() | |
val rdd = spark.sparkContext.textFile("file:////mnt/disks/data1/132/132-d.bits").map(line => line.split(",")) | |
val maxKey = rdd.max()(new Ordering[Array[String]]() { | |
override def compare(x: Array[String], y: Array[String]): Int = | |
Ordering[Int].compare(x(1).toInt, y(1).toInt) | |
}) | |
val numShards= maxKey(1).toInt/shardWidth + 1 | |
println("Numshards:",numShards) | |
val keyValueTuple = rdd | |
.map(arr => createKeyValueTuple(arr)) | |
.repartitionAndSortWithinPartitions(new ShuffleToShard(numShards,shardWidth)) | |
keyValueTuple.foreachPartition(partition => processShard(partition, "i0","f1")) | |
spark.stop() | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment