Skip to content

Instantly share code, notes, and snippets.

@RussellSpitzer
Created September 15, 2015 22:17
Show Gist options
  • Save RussellSpitzer/40d6aa6c1f0ff29c033d to your computer and use it in GitHub Desktop.
Save RussellSpitzer/40d6aa6c1f0ff29c033d to your computer and use it in GitHub Desktop.
package com.datastax.spark.connector.rdd
import com.datastax.driver.core.Session
import com.datastax.spark.connector.{PartitionKeyColumns, AllColumns, ColumnSelector}
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector.writer.{BoundStatementBuilder, RowWriter}
import com.datastax.spark.connector.rdd.reader.{PrefetchingResultSetIterator, RowReader}
/**
* This most likely will have issues with generating a RowReader for type Option[R]
*/
class CassandraLeftJoinRDD[L,R]( left: RDD[L],
keyspaceName: String,
tableName: String,
connector: CassandraConnector,
columnNames: ColumnSelector = AllColumns,
joinColumns: ColumnSelector = PartitionKeyColumns,
where: CqlWhereClause = CqlWhereClause.empty,
limit: Option[Long] = None,
clusteringOrder: Option[ClusteringOrder] = None,
readConf: ReadConf = ReadConf(),
manualRowReader: Option[RowReader[Option[R]]] = None, // <<--- This is terrible we need to fix this in the grand refactor we have planned
manualRowWriter: Option[RowWriter[L]] = None) extends CassandraJoinRDD[L,Option[R]](
left,
keyspaceName,
tableName,
connector,
columnNames,
joinColumns,
where,
limit,
clusteringOrder,
readConf,
manualRowReader,
manualRowWriter
) {
override
protected def fetchIterator(
session: Session,
bsb: BoundStatementBuilder[L],
lastIt: Iterator[L]): Iterator[(L, R)] = {
val columnNamesArray = selectedColumnRefs.map(_.selectedAs).toArray
implicit val pv = protocolVersion(session)
for (leftSide <- lastIt;
rightSide <- {
val rs = session.execute(bsb.bind(leftSide))
val iterator = new PrefetchingResultSetIterator(rs, fetchSize)
if ( iterator.isEmpty ) Seq(None)
else iterator.map( r => Some( rowReader.read(r, columnNamesArray) ) )
}) yield (leftSide, rightSide)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment