Skip to content

Instantly share code, notes, and snippets.

@msukmanowsky
Last active August 29, 2015 14:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save msukmanowsky/3fcaea4509d8a44fae76 to your computer and use it in GitHub Desktop.
Save msukmanowsky/3fcaea4509d8a44fae76 to your computer and use it in GitHub Desktop.
Custom version of CassandraConverters.scala in the spark/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala. Provides better (though not perfect) serialization of keys and values for CqlOutputFormat.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.pythonconverters
import org.apache.cassandra.serializers._
import java.util.{GregorianCalendar, UUID, Date}
import java.net.InetAddress
import org.apache.spark.Logging
import org.apache.spark.api.python.Converter
import java.nio.ByteBuffer
import org.apache.cassandra.utils.ByteBufferUtil
import collection.JavaConversions._
/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra
* output to a Map[String, Int]
*/
class CassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, Int]] {
override def convert(obj: Any): java.util.Map[String, Int] = {
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb)))
}
}
/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra
* output to a Map[String, String]
*/
class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, String]] {
override def convert(obj: Any): java.util.Map[String, String] = {
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
}
}
/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts a
* Map[String, Int] to Cassandra key
*/
// class ToCassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, ByteBuffer]]
// with Logging {
// override def convert(obj: Any): java.util.Map[String, ByteBuffer] = {
// // val input = obj.asInstanceOf[java.util.Map[String, Int]]
// val input = obj.asInstanceOf[java.util.Map[String, Any]]
// mapAsJavaMap(input.mapValues(i => ByteBufferUtil.bytes(i)))
// }
// }
trait GenericCassandraSerializer {
def serializeToCassandra(obj: Any): ByteBuffer = obj match {
case b:Boolean => BooleanSerializer.instance.serialize(b)
case bd:BigDecimal => DecimalSerializer.instance.serialize(bd.underlying)
case d:Double => DoubleSerializer.instance.serialize(d)
case f:Float => FloatSerializer.instance.serialize(f)
case iNetAddress:InetAddress => InetAddressSerializer.instance.serialize(iNetAddress)
case i:Int => Int32Serializer.instance.serialize(i)
case bi:BigInt => IntegerSerializer.instance.serialize(bi.underlying)
// TODO: Generic list serializer
case slist:List[_] =>
ListSerializer.getInstance(UTF8Serializer.instance)
.serialize(slist.asInstanceOf[java.util.List[String]])
case l:Long => LongSerializer.instance.serialize(l)
// TODO: Generic map serializer
case hashMap:java.util.HashMap[_, _] =>
// Have to do something super hacky here. To play nice with CQL, it
// actually stores Maps as <map_name>:<key> => <value> so we have to
MapSerializer.getInstance(UTF8Serializer.instance, UTF8Serializer.instance)
.serialize(hashMap.asInstanceOf[java.util.HashMap[String, String]])
case m:Map[_, _] =>
MapSerializer.getInstance(UTF8Serializer.instance, UTF8Serializer.instance)
.serialize(m.asInstanceOf[java.util.Map[String, String]])
// TODO: Generic set serializer
case set:Set[_] =>
SetSerializer.getInstance(UTF8Serializer.instance)
.serialize(set.asInstanceOf[java.util.Set[String]])
case cal:GregorianCalendar => TimestampSerializer.instance.serialize(cal.getTime())
case date:Date => TimestampSerializer.instance.serialize(date)
case str:String => UTF8Serializer.instance.serialize(str)
case uuid:UUID =>
uuid.version() match {
case 1 => TimeUUIDSerializer.instance.serialize(uuid)
case _ => UUIDSerializer.instance.serialize(uuid)
}
case x =>
if (x == null) {
EmptySerializer.instance.serialize(null)
}
else {
throw new Exception(s"Cannot serialize: ${obj.getClass()}")
}
}
}
class ToCassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, ByteBuffer]]
with Logging with GenericCassandraSerializer {
override def convert(obj: Any): java.util.Map[String, ByteBuffer] = {
val input = obj.asInstanceOf[java.util.Map[String, Any]]
val res = mapAsJavaMap(input.mapValues(i => serializeToCassandra(i)))
// println(s"res = ${res}")
return res
}
}
class ToCassandraCQLValueConverter extends Converter[Any, java.util.List[ByteBuffer]]
with Logging with GenericCassandraSerializer {
override def convert(obj: Any): java.util.List[ByteBuffer] = {
val input = obj.asInstanceOf[java.util.List[_]]
seqAsJavaList(input.map(s => serializeToCassandra(s)))
}
}
/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts a
* List[String] to Cassandra value
*/
// class ToCassandraCQLValueConverter extends Converter[Any, java.util.List[ByteBuffer]]
// with Logging {
// override def convert(obj: Any): java.util.List[ByteBuffer] = {
// val input = obj.asInstanceOf[java.util.List[String]]
// seqAsJavaList(input.map(s => ByteBufferUtil.bytes(s)))
// }
// }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment