Skip to content

Instantly share code, notes, and snippets.

@heuristicfencepost
Created May 25, 2011 04:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save heuristicfencepost/990361 to your computer and use it in GitHub Desktop.
Save heuristicfencepost/990361 to your computer and use it in GitHub Desktop.
Experiments in interacting with Cassandra via Thrift
(import '(org.apache.thrift.transport TFramedTransport TSocket)
'(org.apache.thrift.protocol TBinaryProtocol)
'(org.apache.cassandra.thrift Cassandra$Client SliceRange SlicePredicate ColumnParent KeyRange ConsistencyLevel)
)
(defn connect [host port keyspace]
"Connect to a Cassandra instance on the specified host and port. Set things up to use the specified key space."
(let [transport (TFramedTransport. (TSocket. host port))
protocol (TBinaryProtocol. transport)
client (Cassandra$Client. protocol)]
(.open transport)
(.set_keyspace client keyspace)
client
)
)
(defn get_range_slices [client cf start end]
"Simple front end into the get_range_slices function exposed via Thrift"
(let [slice_range
(doto (SliceRange.)
(.setStart (byte-array 0))
(.setFinish (byte-array 0))
(.setReversed false)
(.setCount 100)
)
slice_predicate
(doto (SlicePredicate.)
(.setSlice_range slice_range))
column_parent (ColumnParent. cf)
key_range
(doto (KeyRange.)
(.setStart_key (.getBytes start))
(.setEnd_key (.getBytes end)))
]
(.get_range_slices client column_parent slice_predicate key_range ConsistencyLevel/ONE)
)
)
(defn range_slices_keys [slices]
"Retrieve the set of keys in a get_range_slices result"
(map #(String. (.getKey %)) slices)
)
(defn range_slices_columns [slices key]
"Retrieve a map of the columns associated with the specified key in a get_range_slices result"
(let [match (first (filter #(= key (String. (.getKey %))) slices))]
(cond (nil? match) nil
(true? true)
(let [urcols (.getColumns match)
cols (map #(.getColumn %) urcols)]
(zipmap (map #(keyword (String. (.getName %))) cols)
(map #(String. (.getValue %)) cols))
)
)
)
)
(let [client (connect "localhost" 9160 "twitter")
key_slices (get_range_slices client "authors" "!" "~")
five_keys (take 5 (range_slices_keys key_slices))]
(print five_keys)
(let [formatfn
(fn [key]
(let [cols (range_slices_columns key_slices key)]
(format "Key %s: name => %s, following => %s\n" key (cols :name) (cols :following))
)
)]
(print (reduce str (map formatfn five_keys)))
)
)
package org.fencepost.cassandra
import scala.collection.JavaConversions
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport._
import org.apache.cassandra.service._
import org.apache.cassandra.thrift._
object ThriftCassandraClient {
def connect(host:String,port:Int,keyspace:String):Cassandra.Client = {
val transport = new TFramedTransport(new TSocket(host, port))
val protocol = new TBinaryProtocol(transport)
val client = new Cassandra.Client(protocol)
transport.open()
client set_keyspace keyspace
client
}
// Execute a range slice query against the specified Cassandra instance. Method returns
// an object suitable for later interrogation by range_slices_keys() or range_slices_columns()
def get_range_slices(client:Cassandra.Client,cf:String,start:String,end:String):Iterable[KeySlice] = {
val sliceRange = new SliceRange()
sliceRange setStart new Array[Byte](0)
sliceRange setFinish new Array[Byte](0)
sliceRange setReversed false
sliceRange setCount 100
val slicePredicate = new SlicePredicate()
slicePredicate setSlice_range sliceRange
val columnParent = new ColumnParent(cf)
val keyRange = new KeyRange()
keyRange setStart_key (start getBytes)
keyRange setEnd_key (end getBytes)
val javakeys = client.get_range_slices(columnParent,slicePredicate,keyRange,ConsistencyLevel.ONE)
// Return from Thrift Java client is List<KeySlice> so we have to explicitly convert it here
JavaConversions asScalaIterable javakeys
}
// Return an Iterable for all keys in an input query state object
def range_slices_keys(slices:Iterable[KeySlice]) = slices map { c => new String(c.getKey) }
// Return an Option containing column information for the specified key in the input query
// state object. If the key isn't found None is returned, otherwise the Option contains a
// map of column names to column values.
def range_slices_columns(slices:Iterable[KeySlice], key:String):Option[Map[String,String]] = {
slices find { c => new String(c.getKey()) == key } match {
case None => None
case Some(keyval) =>
val urcols = JavaConversions asScalaIterable (keyval getColumns)
val cols:Seq[Column] = (urcols map (_ getColumn)).toSeq
Some(Map(cols map { c => (new String(c.getName())) -> (new String(c.getValue())) }:_*))
}
}
def main(args: Array[String]): Unit = {
val client = connect("localhost", 9160,"twitter")
val slices = get_range_slices(client,"authors","!","~")
val fivekeys = range_slices_keys(slices) take 5
println("fivekeys: " + fivekeys)
for (key <- fivekeys) {
range_slices_columns(slices,key) match {
case None =>
case Some(cols) =>
println(
"Key " + key +
": name => " + (cols getOrElse ("name","")) +
", following => " + (cols getOrElse ("following",""))
)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment