Skip to content

Instantly share code, notes, and snippets.

@tgruben
Created August 23, 2019 14:06
Show Gist options
  • Save tgruben/69b67881d47a8a4fd9f86130f3f667f7 to your computer and use it in GitHub Desktop.
Save tgruben/69b67881d47a8a4fd9f86130f3f667f7 to your computer and use it in GitHub Desktop.
hanging pilosa client
//import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
import util.Random
import org.apache.spark.sql.functions._
import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext,SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.Partitioner
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import com.pilosa.client.{ImportOptions,PilosaClient,RecordIterator,Column}
import com.pilosa.client.orm.{Field,Record}
case class RowCol(row: Int, col: Int)
object RowCol {
implicit def orderingByRowCol[A <: RowCol] : Ordering[A] = {
Ordering.by(fk => (fk.row, fk.col))
}
}
class ShuffleToShard(maxShards: Int, shardWidth: Int ) extends Partitioner {
override def numPartitions: Int = maxShards
override def getPartition(key: Any ): Int = {
val k = key.asInstanceOf[RowCol]
return k.col/shardWidth
}
}
class RowColIterator( iterator: Iterator[(RowCol,List[String])]) extends RecordIterator {
var i:Int=0
override def hasNext:Boolean ={
println("HAS NEXT",iterator.hasNext)
iterator.hasNext
}
override def next():Record ={
println("NEXT"+i)
i+=1
val (bit,viewName) = iterator.next()
Column.create(bit.row.toLong,bit.col.toLong)
}
def remove ={
// this is just to avoid compilation problems on JDK 7
}
}
object SimpleApp {
def processShard(iterator: Iterator[(RowCol,List[String])], indexName:String,fieldName:String) ={
val client = PilosaClient.defaultClient()
val schema = client.readSchema()
val index = schema.index(indexName)
val field = index.field(fieldName)
client.ensureField(field)
println("GO")
var wrappedIterator = new RowColIterator(iterator)
val importOptions = ImportOptions.builder()
.setRoaring(true)
.setBatchSize(300000)
.build()
client.importField(field, wrappedIterator, importOptions)
println("DONE")
}
/*
println("sendPilosa")
var calcShard :Int = 0
var views = scala.collection.mutable.Map[String, Bitmap]()
while (iterator.hasNext) {
val (bit,viewName) = iterator.next()
// println(item._1+" "+item._2._1+" "+item._2._2)
if ( !views.contains(viewName(0))){
val bm = new Bitmap()
views(viewName(0))= bm
}
views(viewName(0)).add(bit.row * shardWidth + (bit.col % shardWidth))
calcShard= bit.col
}
calcShard = calcShard/shardWidth
sendToPilosa(views,calcShard,indexName,fieldName)
}
// can import roaring handle multiple views?
// Yes a import roaring request can have many views for a single field
def sendToPilosa(views : scala.collection.mutable.Map[String,Bitmap], shard :Int, indexName:String, fieldName:String) ={
val reqBuilder = Internal.ImportRoaringRequest.newBuilder()
views.foreach{
case(viewName,bitmap)=>
val data = ByteString.copyFrom(bitmap.serialize.array);
val view = Internal.ImportRoaringRequestView.newBuilder()
.setName(viewName)
.setData(data)
.build();
reqBuilder.addViews(view);
}
val payload = reqBuilder.build().toByteArray()
val data = new ByteArrayEntity(payload)
//ugh ImportRequest is not public
//
val path = "/index/%s/field/%s/import".format(indexName, fieldName)
// HEADERS
// new BasicHeader("Content-Type","application/x-protobuf"),
// new BasicHeader("Accept", "application/x-protobuf"),
// new BasicHeader("PQL-Version", "1.0")
client.httpRequest("POST", request.Path, data, request.headers)
}
*/
//def createKeyValueTuple(data: Array[String]) :(RowCol,List[String]) = {
def createKeyValueTuple(data: Array[String]) :(RowCol,List[String]) = {
(createKey(data),listData(data))
}
def createKey(data: Array[String]): RowCol = {
RowCol(data(0).toInt, data(1).toInt)
}
def listData(data: Array[String]): List[String] = {
List("view")
}
def main(args: Array[String]) {
//Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val client = PilosaClient.defaultClient()
val schema = client.readSchema()
val index = schema.index("i0")
val field = index.field("f2")
client.syncSchema(schema)
val iterator = new XColumnIterator(300000000, 1000000)
//val iterator = new XColumnIterator(30, 10)
val importOptions = ImportOptions.builder()
//.setBatchSize(100000)
.setRoaring(true)
.build()
client.importField(field, iterator, importOptions)
/*
println("GO NOW")
val shardWidth:Int= 1<<20
val spark = SparkSession.builder.appName("Sample Application").getOrCreate()
val rdd = spark.sparkContext.textFile("file:////mnt/disks/data1/132/132-d.bits").map(line => line.split(","))
val maxKey = rdd.max()(new Ordering[Array[String]]() {
override def compare(x: Array[String], y: Array[String]): Int =
Ordering[Int].compare(x(1).toInt, y(1).toInt)
})
val numShards= maxKey(1).toInt/shardWidth + 1
println("Numshards:",numShards)
val keyValueTuple = rdd
.map(arr => createKeyValueTuple(arr))
.repartitionAndSortWithinPartitions(new ShuffleToShard(numShards,shardWidth))
keyValueTuple.foreachPartition(partition => processShard(partition, "i0","f1"))
spark.stop()
*/
}
class XColumnIterator(var maxID: Long, var maxColumns: Long) extends RecordIterator {
override def hasNext: Boolean = {
this.maxColumns > 0
}
override def next(): Record = {
this.maxColumns -= 1
val rowID = (Math.random() * this.maxID).toLong
val columnID = (Math.random() * this.maxID).toLong
Column.create(rowID, columnID)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment