Created
August 23, 2019 14:06
-
-
Save tgruben/69b67881d47a8a4fd9f86130f3f667f7 to your computer and use it in GitHub Desktop.
hanging pilosa client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//import org.apache.spark.sql.SparkSession | |
import org.apache.log4j.{Level, Logger} | |
import util.Random | |
import org.apache.spark.sql.functions._ | |
import scala.reflect.ClassTag | |
import org.apache.spark.{Partition, TaskContext,SparkContext} | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.Partitioner | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.SparkContext._ | |
import com.pilosa.client.{ImportOptions,PilosaClient,RecordIterator,Column} | |
import com.pilosa.client.orm.{Field,Record} | |
case class RowCol(row: Int, col: Int) | |
object RowCol { | |
implicit def orderingByRowCol[A <: RowCol] : Ordering[A] = { | |
Ordering.by(fk => (fk.row, fk.col)) | |
} | |
} | |
class ShuffleToShard(maxShards: Int, shardWidth: Int ) extends Partitioner { | |
override def numPartitions: Int = maxShards | |
override def getPartition(key: Any ): Int = { | |
val k = key.asInstanceOf[RowCol] | |
return k.col/shardWidth | |
} | |
} | |
class RowColIterator( iterator: Iterator[(RowCol,List[String])]) extends RecordIterator { | |
var i:Int=0 | |
override def hasNext:Boolean ={ | |
println("HAS NEXT",iterator.hasNext) | |
iterator.hasNext | |
} | |
override def next():Record ={ | |
println("NEXT"+i) | |
i+=1 | |
val (bit,viewName) = iterator.next() | |
Column.create(bit.row.toLong,bit.col.toLong) | |
} | |
def remove ={ | |
// this is just to avoid compilation problems on JDK 7 | |
} | |
} | |
object SimpleApp { | |
def processShard(iterator: Iterator[(RowCol,List[String])], indexName:String,fieldName:String) ={ | |
val client = PilosaClient.defaultClient() | |
val schema = client.readSchema() | |
val index = schema.index(indexName) | |
val field = index.field(fieldName) | |
client.ensureField(field) | |
println("GO") | |
var wrappedIterator = new RowColIterator(iterator) | |
val importOptions = ImportOptions.builder() | |
.setRoaring(true) | |
.setBatchSize(300000) | |
.build() | |
client.importField(field, wrappedIterator, importOptions) | |
println("DONE") | |
} | |
/* | |
println("sendPilosa") | |
var calcShard :Int = 0 | |
var views = scala.collection.mutable.Map[String, Bitmap]() | |
while (iterator.hasNext) { | |
val (bit,viewName) = iterator.next() | |
// println(item._1+" "+item._2._1+" "+item._2._2) | |
if ( !views.contains(viewName(0))){ | |
val bm = new Bitmap() | |
views(viewName(0))= bm | |
} | |
views(viewName(0)).add(bit.row * shardWidth + (bit.col % shardWidth)) | |
calcShard= bit.col | |
} | |
calcShard = calcShard/shardWidth | |
sendToPilosa(views,calcShard,indexName,fieldName) | |
} | |
// can import roaring handle multiple views? | |
// Yes a import roaring request can have many views for a single field | |
def sendToPilosa(views : scala.collection.mutable.Map[String,Bitmap], shard :Int, indexName:String, fieldName:String) ={ | |
val reqBuilder = Internal.ImportRoaringRequest.newBuilder() | |
views.foreach{ | |
case(viewName,bitmap)=> | |
val data = ByteString.copyFrom(bitmap.serialize.array); | |
val view = Internal.ImportRoaringRequestView.newBuilder() | |
.setName(viewName) | |
.setData(data) | |
.build(); | |
reqBuilder.addViews(view); | |
} | |
val payload = reqBuilder.build().toByteArray() | |
val data = new ByteArrayEntity(payload) | |
//ugh ImportRequest is not public | |
// | |
val path = "/index/%s/field/%s/import".format(indexName, fieldName) | |
// HEADERS | |
// new BasicHeader("Content-Type","application/x-protobuf"), | |
// new BasicHeader("Accept", "application/x-protobuf"), | |
// new BasicHeader("PQL-Version", "1.0") | |
client.httpRequest("POST", request.Path, data, request.headers) | |
} | |
*/ | |
//def createKeyValueTuple(data: Array[String]) :(RowCol,List[String]) = { | |
def createKeyValueTuple(data: Array[String]) :(RowCol,List[String]) = { | |
(createKey(data),listData(data)) | |
} | |
def createKey(data: Array[String]): RowCol = { | |
RowCol(data(0).toInt, data(1).toInt) | |
} | |
def listData(data: Array[String]): List[String] = { | |
List("view") | |
} | |
def main(args: Array[String]) { | |
//Logger.getLogger("org.apache.spark").setLevel(Level.WARN) | |
val client = PilosaClient.defaultClient() | |
val schema = client.readSchema() | |
val index = schema.index("i0") | |
val field = index.field("f2") | |
client.syncSchema(schema) | |
val iterator = new XColumnIterator(300000000, 1000000) | |
//val iterator = new XColumnIterator(30, 10) | |
val importOptions = ImportOptions.builder() | |
//.setBatchSize(100000) | |
.setRoaring(true) | |
.build() | |
client.importField(field, iterator, importOptions) | |
/* | |
println("GO NOW") | |
val shardWidth:Int= 1<<20 | |
val spark = SparkSession.builder.appName("Sample Application").getOrCreate() | |
val rdd = spark.sparkContext.textFile("file:////mnt/disks/data1/132/132-d.bits").map(line => line.split(",")) | |
val maxKey = rdd.max()(new Ordering[Array[String]]() { | |
override def compare(x: Array[String], y: Array[String]): Int = | |
Ordering[Int].compare(x(1).toInt, y(1).toInt) | |
}) | |
val numShards= maxKey(1).toInt/shardWidth + 1 | |
println("Numshards:",numShards) | |
val keyValueTuple = rdd | |
.map(arr => createKeyValueTuple(arr)) | |
.repartitionAndSortWithinPartitions(new ShuffleToShard(numShards,shardWidth)) | |
keyValueTuple.foreachPartition(partition => processShard(partition, "i0","f1")) | |
spark.stop() | |
*/ | |
} | |
class XColumnIterator(var maxID: Long, var maxColumns: Long) extends RecordIterator { | |
override def hasNext: Boolean = { | |
this.maxColumns > 0 | |
} | |
override def next(): Record = { | |
this.maxColumns -= 1 | |
val rowID = (Math.random() * this.maxID).toLong | |
val columnID = (Math.random() * this.maxID).toLong | |
Column.create(rowID, columnID) | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment