Skip to content

Instantly share code, notes, and snippets.

@yssharma
Created July 6, 2015 16:36
Show Gist options
  • Save yssharma/2581ae8a97c559b2677f to your computer and use it in GitHub Desktop.
Save yssharma/2581ae8a97c559b2677f to your computer and use it in GitHub Desktop.
Apache Drill Cassandra storage patch - Rebased on Drill 1.2.0
commit e1f17a09b6d402e78268753897cbdbd4f8bed169
Author: Yash Sharma <yash360@gmail.com>
Date: Mon Jul 6 09:51:22 2015 +0530
DRILL-92 : Cassandra storage plugin - rebased on Drill-1.2.0
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 8c00e76..0269efb 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -37,5 +37,6 @@
<module>storage-mongo</module>
<module>sqlline</module>
<module>data</module>
+ <module>storage-cassandra</module>
</modules>
</project>
diff --git a/contrib/storage-cassandra/pom.xml b/contrib/storage-cassandra/pom.xml
new file mode 100644
index 0000000..08184fc
--- /dev/null
+++ b/contrib/storage-cassandra/pom.xml
@@ -0,0 +1,106 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>drill-contrib-parent</artifactId>
+ <groupId>org.apache.drill.contrib</groupId>
+ <version>1.2.0-SNAPSHOT</version>
+ </parent>
+
+
+ <artifactId>drill-storage-cassandra</artifactId>
+ <name>contrib/cassandra-storage-plugin</name>
+
+ <properties>
+ <cassandra.TestSuite>**/CassandraTestSuite.class</cassandra.TestSuite>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ <version>2.0.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>hadoop-common</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>drill-common</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yammer.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>2.1.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>${cassandra.TestSuite}</include>
+ </includes>
+ <systemProperties>
+ <property>
+ <name>logback.log.dir</name>
+ <value>${project.build.directory}/surefire-reports</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraCompareFunctionsProcessor.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraCompareFunctionsProcessor.java
new file mode 100644
index 0000000..761ae5d
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraCompareFunctionsProcessor.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.cassandra;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.ConvertExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+import java.nio.ByteOrder;
+
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.nio.ByteOrder;
+
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.ConvertExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class CassandraCompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
+ private String value;
+ private boolean success;
+ private boolean isEqualityFn;
+ private SchemaPath path;
+ private String functionName;
+
+ public static boolean isCompareFunction(String functionName) {
+ return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+ }
+
+ public static CassandraCompareFunctionsProcessor process(FunctionCall call) {
+ String functionName = call.getName();
+ LogicalExpression nameArg = call.args.get(0);
+ LogicalExpression valueArg = call.args.size() >= 2 ? call.args.get(1) : null;
+ CassandraCompareFunctionsProcessor evaluator = new CassandraCompareFunctionsProcessor(functionName);
+
+ if (valueArg != null) { // binary function
+ if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+ LogicalExpression swapArg = valueArg;
+ valueArg = nameArg;
+ nameArg = swapArg;
+ evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
+ }
+ evaluator.success = nameArg.accept(evaluator, valueArg);
+ } else if (call.args.get(0) instanceof SchemaPath) {
+ evaluator.success = true;
+ evaluator.path = (SchemaPath) nameArg;
+ }
+
+ return evaluator;
+ }
+
+ public CassandraCompareFunctionsProcessor(String functionName) {
+ this.success = false;
+ this.functionName = functionName;
+ this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP.containsKey(functionName)
+ && COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(functionName);
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public SchemaPath getPath() {
+ return path;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ @Override
+ public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException {
+ if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) {
+ return e.getInput().accept(this, valueArg);
+ }
+ return false;
+ }
+
+ @Override
+ public Boolean visitConvertExpression(ConvertExpression e, LogicalExpression valueArg) throws RuntimeException {
+ if (e.getConvertFunction() == ConvertExpression.CONVERT_FROM && e.getInput() instanceof SchemaPath) {
+ ByteBuf bb = null;
+ String encodingType = e.getEncodingType();
+ switch (encodingType) {
+ case "INT_BE":
+ case "INT":
+ case "UINT_BE":
+ case "UINT":
+ case "UINT4_BE":
+ case "UINT4":
+ if (valueArg instanceof ValueExpressions.IntExpression
+ && (isEqualityFn || encodingType.startsWith("U"))) {
+ bb = Unpooled.wrappedBuffer(new byte[4]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+ bb.writeInt(((ValueExpressions.IntExpression)valueArg).getInt());
+ }
+ break;
+ case "BIGINT_BE":
+ case "BIGINT":
+ case "UINT8_BE":
+ case "UINT8":
+ if (valueArg instanceof ValueExpressions.LongExpression
+ && (isEqualityFn || encodingType.startsWith("U"))) {
+ bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+ bb.writeLong(((ValueExpressions.LongExpression)valueArg).getLong());
+ }
+ break;
+ case "FLOAT":
+ if (valueArg instanceof ValueExpressions.FloatExpression && isEqualityFn) {
+ bb = Unpooled.wrappedBuffer(new byte[4]).order(ByteOrder.BIG_ENDIAN);
+ bb.writeFloat(((ValueExpressions.FloatExpression)valueArg).getFloat());
+ }
+ break;
+ case "DOUBLE":
+ if (valueArg instanceof ValueExpressions.DoubleExpression && isEqualityFn) {
+ bb = Unpooled.wrappedBuffer(new byte[8]).order(ByteOrder.BIG_ENDIAN);;
+ bb.writeDouble(((ValueExpressions.DoubleExpression)valueArg).getDouble());
+ }
+ break;
+ case "TIME_EPOCH":
+ case "TIME_EPOCH_BE":
+ if (valueArg instanceof ValueExpressions.TimeExpression) {
+ bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+ bb.writeLong(((ValueExpressions.TimeExpression)valueArg).getTime());
+ }
+ break;
+ case "DATE_EPOCH":
+ case "DATE_EPOCH_BE":
+ if (valueArg instanceof ValueExpressions.DateExpression) {
+ bb = Unpooled.wrappedBuffer(new byte[8]).order(encodingType.endsWith("_BE") ? ByteOrder.BIG_ENDIAN : ByteOrder.LITTLE_ENDIAN);
+ bb.writeLong(((ValueExpressions.DateExpression)valueArg).getDate());
+ }
+ break;
+ case "BOOLEAN_BYTE":
+ if (valueArg instanceof ValueExpressions.BooleanExpression) {
+ bb = Unpooled.wrappedBuffer(new byte[1]);
+ bb.writeByte(((ValueExpressions.BooleanExpression)valueArg).getBoolean() ? 1 : 0);
+ }
+ break;
+ case "UTF8":
+ // let visitSchemaPath() handle this.
+ return e.getInput().accept(this, valueArg);
+ }
+
+ if (bb != null) {
+ this.value = bb.toString();
+ this.path = (SchemaPath)e.getInput();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
+ return false;
+ }
+
+ @Override
+ public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
+ if (valueArg instanceof ValueExpressions.QuotedString) {
+ this.value = new String(((ValueExpressions.QuotedString) valueArg).value.getBytes(Charsets.UTF_8));
+ this.path = path;
+ return true;
+ }
+ return false;
+ }
+
+ private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
+ static {
+ ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
+ VALUE_EXPRESSION_CLASSES = builder
+ .add(ValueExpressions.BooleanExpression.class)
+ .add(ValueExpressions.DateExpression.class)
+ .add(ValueExpressions.DoubleExpression.class)
+ .add(ValueExpressions.FloatExpression.class)
+ .add(ValueExpressions.IntExpression.class)
+ .add(ValueExpressions.LongExpression.class)
+ .add(ValueExpressions.QuotedString.class)
+ .add(ValueExpressions.TimeExpression.class)
+ .build();
+ }
+
+ private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+ static {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+ // unary functions
+ .put("isnotnull", "isnotnull")
+ .put("isNotNull", "isNotNull")
+ .put("is not null", "is not null")
+ .put("isnull", "isnull")
+ .put("isNull", "isNull")
+ .put("is null", "is null")
+ // binary functions
+ .put("like", "like")
+ .put("equal", "equal")
+ .put("not_equal", "not_equal")
+ .put("greater_than_or_equal_to", "less_than_or_equal_to")
+ .put("greater_than", "less_than")
+ .put("less_than_or_equal_to", "greater_than_or_equal_to")
+ .put("less_than", "greater_than")
+ .build();
+ }
+
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraDatabaseSchema.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraDatabaseSchema.java
new file mode 100644
index 0000000..4287ae4
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraDatabaseSchema.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.schema.Table;
+
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.cassandra.CassandraSchemaFactory.CassandraSchema;
+
+import com.google.common.collect.Sets;
+
+public class CassandraDatabaseSchema extends AbstractSchema {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CassandraDatabaseSchema.class);
+ private final CassandraSchema cassandraSchema;
+ private final Set<String> tables;
+
+ public CassandraDatabaseSchema(List<String> tableList, CassandraSchema cassandraSchema,
+ String name) {
+ super(cassandraSchema.getSchemaPath(), name);
+ this.cassandraSchema = cassandraSchema;
+ this.tables = Sets.newHashSet(tableList);
+ }
+
+ @Override
+ public Table getTable(String tableName) {
+ return cassandraSchema.getDrillTable(this.name, tableName);
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return tables;
+ }
+
+ @Override
+ public String getTypeName() {
+ return CassandraStoragePluginConfig.NAME;
+ }
+
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraFilterBuilder.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraFilterBuilder.java
new file mode 100644
index 0000000..ffb6c6b
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraFilterBuilder.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import java.util.Arrays;
+import java.util.List;
+
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class CassandraFilterBuilder extends
+ AbstractExprVisitor<CassandraScanSpec, Void, RuntimeException> implements
+ DrillCassandraConstants {
+ static final Logger logger = LoggerFactory.getLogger(CassandraFilterBuilder.class);
+ final CassandraGroupScan groupScan;
+ final LogicalExpression le;
+ private boolean allExpressionsConverted = true;
+
+ public CassandraFilterBuilder(CassandraGroupScan groupScan,
+ LogicalExpression conditionExp) {
+ this.groupScan = groupScan;
+ this.le = conditionExp;
+
+ }
+
+ /* Called by CassandraPushDownFilterForScan */
+ public CassandraScanSpec parseTree() {
+ CassandraScanSpec parsedSpec = le.accept(this, null);
+
+ if (parsedSpec != null) {
+ parsedSpec = mergeScanSpecs("booleanAnd", this.groupScan.getCassandraScanSpec(), parsedSpec);
+ }
+ return parsedSpec;
+ }
+
+ private CassandraScanSpec mergeScanSpecs(String functionName,
+ CassandraScanSpec leftScanSpec, CassandraScanSpec rightScanSpec) {
+
+ List<Clause> newFilters = Lists.newArrayList();
+ CassandraScanSpec scanSpec = null;
+ switch (functionName) {
+ case "booleanAnd":
+ if (leftScanSpec != null && leftScanSpec.getFilters() != null) {
+ newFilters.addAll(leftScanSpec.getFilters());
+ }
+ if (rightScanSpec != null && rightScanSpec.getFilters() != null) {
+ newFilters.addAll(rightScanSpec.getFilters());
+ }
+
+ scanSpec = new CassandraScanSpec(groupScan.getCassandraScanSpec().getKeyspace(),
+ groupScan.getCassandraScanSpec().getTable(), newFilters);
+ break;
+ }
+
+ return scanSpec;
+ }
+
+ public boolean isAllExpressionsConverted() {
+ return allExpressionsConverted;
+ }
+
+ @Override
+ public CassandraScanSpec visitUnknown(LogicalExpression e, Void value)
+ throws RuntimeException {
+ allExpressionsConverted = false;
+ return null;
+ }
+
+ @Override
+ public CassandraScanSpec visitBooleanOperator(BooleanOperator op, Void value) {
+
+ List<LogicalExpression> args = op.args;
+ CassandraScanSpec nodeScanSpec = null;
+ String functionName = op.getName();
+ for (int i = 0; i < args.size(); ++i) {
+ switch (functionName) {
+ case "booleanAnd":
+ if (nodeScanSpec == null) {
+ nodeScanSpec = args.get(i).accept(this, null);
+ } else {
+ CassandraScanSpec scanSpec = args.get(i).accept(this, null);
+ if (scanSpec != null) {
+ nodeScanSpec = mergeScanSpecs(functionName, nodeScanSpec, scanSpec);
+ } else {
+ allExpressionsConverted = false;
+ }
+ }
+ break;
+ }
+ }
+ return nodeScanSpec;
+ }
+
+ @Override
+ public CassandraScanSpec visitFunctionCall(FunctionCall call, Void value)
+ throws RuntimeException {
+
+ CassandraScanSpec nodeScanSpec = null;
+ String functionName = call.getName();
+ ImmutableList<LogicalExpression> args = call.args;
+
+ if (CassandraCompareFunctionsProcessor.isCompareFunction(functionName)) {
+ CassandraCompareFunctionsProcessor processor = CassandraCompareFunctionsProcessor.process(call);
+ if (processor.isSuccess()) {
+ nodeScanSpec = createCassandraScanSpec(call, processor);
+ }
+ } else {
+ switch (functionName) {
+ case "booleanAnd":
+ CassandraScanSpec firstScanSpec = args.get(0).accept(this, null);
+ for (int i = 1; i < args.size(); ++i) {
+ CassandraScanSpec nextScanSpec = args.get(i).accept(this, null);
+ if (firstScanSpec != null && nextScanSpec != null) {
+ nodeScanSpec = mergeScanSpecs(functionName, firstScanSpec, nextScanSpec);
+ } else {
+ allExpressionsConverted = false;
+ if ("booleanAnd".equals(functionName)) {
+ nodeScanSpec = firstScanSpec == null ? nextScanSpec : firstScanSpec;
+ }
+ }
+ firstScanSpec = nodeScanSpec;
+ }
+ break;
+ }
+ }
+
+ if (nodeScanSpec == null) {
+ allExpressionsConverted = false;
+ }
+
+ return nodeScanSpec;
+ }
+
+
+
+ private CassandraScanSpec createCassandraScanSpec(FunctionCall call, CassandraCompareFunctionsProcessor processor) {
+ String functionName = processor.getFunctionName();
+ SchemaPath field = processor.getPath();
+ String fieldValue = processor.getValue();
+
+ Clause clause = null;
+ boolean isNullTest = false;
+ switch (functionName) {
+ case "equal":
+ clause = QueryBuilder.eq(field.getAsNamePart().getName(), fieldValue);
+ break;
+ case "greater_than_or_equal_to":
+ clause = QueryBuilder.gte(field.getAsNamePart().getName(), fieldValue);
+ break;
+ case "greater_than":
+ clause = QueryBuilder.gt(field.getAsNamePart().getName(), fieldValue);
+ break;
+ case "less_than_or_equal_to":
+ clause = QueryBuilder.lte(field.getAsNamePart().getName(), fieldValue);
+ break;
+ case "less_than":
+ clause = QueryBuilder.lt(field.getAsNamePart().getName(), fieldValue);
+ break;
+ case "isnull":
+ case "isNull":
+ case "is null":
+ clause = QueryBuilder.eq(field.getAsNamePart().getName(), null);
+ break;
+ }
+
+ if (clause != null) {
+ List<Clause> filters = Lists.newArrayList();
+ filters.add(clause);
+ return new CassandraScanSpec(groupScan.getCassandraScanSpec().getKeyspace(),
+ groupScan.getCassandraScanSpec().getTable(), filters);
+ }
+ // else
+ return null;
+ }
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraGroupScan.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraGroupScan.java
new file mode 100644
index 0000000..10b3362
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraGroupScan.java
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import java.io.IOException;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.cassandra.CassandraSubScan.CassandraSubScanSpec;
+
+import org.apache.drill.exec.store.cassandra.connection.CassandraConnectionManager;
+import parquet.org.codehaus.jackson.annotate.JsonCreator;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+@JsonTypeName("cassandra-scan")
+public class CassandraGroupScan extends AbstractGroupScan implements DrillCassandraConstants {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CassandraGroupScan.class);
+
+ private static final Comparator<List<CassandraSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<CassandraSubScanSpec>>() {
+ @Override
+ public int compare(List<CassandraSubScanSpec> list1, List<CassandraSubScanSpec> list2) {
+ return list1.size() - list2.size();
+ }
+ };
+
+ private static final Comparator<List<CassandraSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
+
+ private String userName;
+
+ private CassandraStoragePluginConfig storagePluginConfig;
+
+ private List<SchemaPath> columns;
+
+ private CassandraScanSpec cassandraScanSpec;
+
+ private CassandraStoragePlugin storagePlugin;
+
+ private Stopwatch watch = new Stopwatch();
+
+ private Map<Integer, List<CassandraSubScanSpec>> endpointFragmentMapping;
+
+ private Set<Host> keyspaceHosts;
+
+ private int totalAssignmentsTobeDone;
+
+ private boolean filterPushedDown = false;
+
+ private Map<String, CassandraPartitionToken> hostTokenMapping = new HashMap<>();
+
+ //private TableStatsCalculator statsCalculator;
+
+ private long scanSizeInBytes = 0;
+
+ private Metadata metadata;
+
+ private Cluster cluster;
+
+ private Session session;
+
+ private ResultSet rs;
+
+ @JsonCreator
+ public CassandraGroupScan(@JsonProperty("userName") String userName,
+ @JsonProperty("cassandraScanSpec") CassandraScanSpec cassandraScanSpec,
+ @JsonProperty("storage") CassandraStoragePluginConfig storagePluginConfig,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
+ this (userName, (CassandraStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig), cassandraScanSpec, columns);
+ }
+
+ public CassandraGroupScan(String userName, CassandraStoragePlugin storagePlugin,
+ CassandraScanSpec scanSpec, List<SchemaPath> columns) {
+ super(userName);
+ this.storagePlugin = storagePlugin;
+ this.storagePluginConfig = storagePlugin.getConfig();
+ this.cassandraScanSpec = scanSpec;
+ this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
+ init();
+ }
+
+ /**
+ * Private constructor, used for cloning.
+ * @param that The CassandraGroupScan to clone
+ */
+ private CassandraGroupScan(CassandraGroupScan that) {
+ super(that);
+ this.columns = that.columns;
+ this.cassandraScanSpec = that.cassandraScanSpec;
+ this.endpointFragmentMapping = that.endpointFragmentMapping;
+ this.keyspaceHosts = that.keyspaceHosts;
+ this.metadata = that.metadata;
+ this.storagePlugin = that.storagePlugin;
+ this.storagePluginConfig = that.storagePluginConfig;
+ this.filterPushedDown = that.filterPushedDown;
+ this.totalAssignmentsTobeDone = that.totalAssignmentsTobeDone;
+ this.hostTokenMapping = that.hostTokenMapping;
+ //this.statsCalculator = that.statsCalculator;
+ this.scanSizeInBytes = that.scanSizeInBytes;
+ }
+
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ CassandraGroupScan newScan = new CassandraGroupScan(this);
+ newScan.columns = columns;
+ return newScan;
+ }
+
+ private void init() {
+ try {
+ logger.info(String.format("Getting cassandra session from host %s, port: %s.",
+ storagePluginConfig.getHosts(), storagePluginConfig.getPort()));
+
+ cluster = CassandraConnectionManager.getCluster(storagePluginConfig.getHosts(),
+ storagePluginConfig.getPort());
+
+ session = cluster.connect();
+
+ metadata = session.getCluster().getMetadata();
+
+ Charset charset = Charset.forName("UTF-8");
+ CharsetEncoder encoder = charset.newEncoder();
+
+ keyspaceHosts = session.getCluster().getMetadata().getAllHosts();
+
+ logger.info("KeySpace hosts for Cassandra : {}", keyspaceHosts);
+
+ if(null == keyspaceHosts){
+ logger.error(String.format("No Keyspace Hosts Found for Cassandra %s:%s .",
+ storagePluginConfig.getHosts(), storagePluginConfig.getPort()));
+ throw new DrillRuntimeException(String.format("No Keyspace Hosts Found for Cassandra %s:%s .",
+ storagePluginConfig.getHosts(), storagePluginConfig.getPort()));
+ }
+
+ String[] tokens = CassandraUtil.getPartitionTokens(metadata.getPartitioner(), keyspaceHosts.size());
+ int index = 0;
+ for(Host h : keyspaceHosts){
+ CassandraPartitionToken token = new CassandraPartitionToken();
+ token.setLow(tokens[index]);
+ if(index+1 < tokens.length){
+ token.setHigh(tokens[index+1]);
+ }
+ hostTokenMapping.put(h.getAddress().getHostName(), token);
+ index++;
+ }
+ logger.info("Host token mapping: {}", hostTokenMapping);
+
+ Statement q = QueryBuilder.select().all().from(cassandraScanSpec.getKeyspace(),
+ cassandraScanSpec.getTable());
+
+ if(session.isClosed()){
+ logger.error("Error in initializing CasandraGroupScan. Session Closed.");
+ throw new DrillRuntimeException("Error in initializing CasandraGroupScan. Session Closed.");
+ }
+
+ rs = session.execute(q);
+ this.totalAssignmentsTobeDone = rs.getAvailableWithoutFetching();
+
+ }
+ catch(Exception e){
+ logger.error("Error in initializing CasandraGroupScan, Error: "+e.getMessage());
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ watch.reset();
+ watch.start();
+ Map<String, DrillbitEndpoint> endpointMap = new HashMap<String, DrillbitEndpoint>();
+ for (DrillbitEndpoint ep : storagePlugin.getContext().getBits()) {
+ endpointMap.put(ep.getAddress(), ep);
+ }
+
+ logger.info("Building affinity map. Endpoints: {}, KeyspaceHosts: {}", endpointMap, keyspaceHosts);
+
+ Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>();
+ for (Host host : keyspaceHosts) {
+ DrillbitEndpoint ep = endpointMap.get(host.getAddress().getHostName());
+ if (ep != null) {
+ EndpointAffinity affinity = affinityMap.get(ep);
+ if (affinity == null) {
+ affinityMap.put(ep, new EndpointAffinity(ep, 1));
+ } else {
+ affinity.addAffinity(1);
+ }
+ }
+ }
+
+ logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS) / 1000);
+
+ if(null == affinityMap || affinityMap.size()==0){
+ logger.info("Affinity map is empty for CassandraGroupScan {}:{}", storagePluginConfig.getHosts(), storagePluginConfig.getPort());
+ }
+ return Lists.newArrayList(affinityMap.values());
+ }
+
+ /**
+ *
+ * @param incomingEndpoints
+ */
+ @Override
+ public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+ watch.reset();
+ watch.start();
+
+ final int numSlots = incomingEndpoints.size();
+
+ Preconditions.checkArgument(numSlots <= totalAssignmentsTobeDone,
+ String.format("Incoming endpoints %d is greater than number of chunks %d",
+ numSlots, totalAssignmentsTobeDone));
+
+ final int minPerEndpointSlot = (int) Math.floor((double) totalAssignmentsTobeDone / numSlots);
+ final int maxPerEndpointSlot = (int) Math.ceil((double) totalAssignmentsTobeDone / numSlots);
+
+ /* Map for (index,endpoint)'s */
+ endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots);
+ /* Reverse mapping for above indexes */
+ Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap();
+
+ /*
+ * Initialize these two maps
+ */
+ for (int i = 0; i < numSlots; ++i) {
+ endpointFragmentMapping.put(i, new ArrayList<CassandraSubScanSpec>(maxPerEndpointSlot));
+ String hostname = incomingEndpoints.get(i).getAddress();
+ Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname);
+ if (hostIndexQueue == null) {
+ hostIndexQueue = Lists.newLinkedList();
+ endpointHostIndexListMap.put(hostname, hostIndexQueue);
+ }
+ hostIndexQueue.add(i);
+ }
+
+ Set<Host> hostsToAssignSet = Sets.newHashSet(keyspaceHosts);
+
+ for (Iterator<Host> hostIterator = hostsToAssignSet.iterator(); hostIterator.hasNext(); /*nothing*/) {
+ Host hostEntry = hostIterator.next();
+
+ Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(hostEntry.getAddress().getHostName());
+ if (endpointIndexlist != null) {
+ Integer slotIndex = endpointIndexlist.poll();
+ List<CassandraSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex);
+ endpointSlotScanList.add(hostToSubScanSpec(hostEntry, storagePluginConfig.getHosts()));
+ // add to the tail of the slot list, to add more later in round robin fashion
+ endpointIndexlist.offer(slotIndex);
+ // this region has been assigned
+ hostIterator.remove();
+ }
+ }
+
+ /*
+ * Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more.
+ */
+ PriorityQueue<List<CassandraSubScanSpec>> minHeap = new PriorityQueue<List<CassandraSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR);
+ PriorityQueue<List<CassandraSubScanSpec>> maxHeap = new PriorityQueue<List<CassandraSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV);
+ for(List<CassandraSubScanSpec> listOfScan : endpointFragmentMapping.values()) {
+ if (listOfScan.size() < minPerEndpointSlot) {
+ minHeap.offer(listOfScan);
+ } else if (listOfScan.size() > minPerEndpointSlot) {
+ maxHeap.offer(listOfScan);
+ }
+ }
+
+ /*
+ * Now, let's process any regions which remain unassigned and assign them to slots with minimum number of assignments.
+ */
+ if (hostsToAssignSet.size() > 0) {
+ for (Host hostEntry : hostsToAssignSet) {
+ List<CassandraSubScanSpec> smallestList = minHeap.poll();
+ smallestList.add(hostToSubScanSpec(hostEntry, storagePluginConfig.getHosts()));
+ if (smallestList.size() < maxPerEndpointSlot) {
+ minHeap.offer(smallestList);
+ }
+ }
+ }
+
+ /*
+ * While there are slots with lesser than 'minPerEndpointSlot' unit work, balance from those with more.
+ */
+ try {
+ // If there is more work left
+ if(maxHeap.peek() != null && maxHeap.peek().size() > 0) {
+ while (minHeap.peek() != null && minHeap.peek().size() <= minPerEndpointSlot) {
+ List<CassandraSubScanSpec> smallestList = minHeap.poll();
+ List<CassandraSubScanSpec> largestList = maxHeap.poll();
+
+ smallestList.add(largestList.remove(largestList.size() - 1));
+ if (largestList.size() > minPerEndpointSlot) {
+ maxHeap.offer(largestList);
+ }
+ if (smallestList.size() <= minPerEndpointSlot) {
+ minHeap.offer(smallestList);
+ }
+ }
+ }
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+
+
+ /* no slot should be empty at this point */
+ assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format(
+ "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.",
+ incomingEndpoints, endpointFragmentMapping.toString());
+
+ logger.debug("Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}",
+ watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString());
+
+ }
+
+ private CassandraSubScanSpec hostToSubScanSpec(Host host, List<String> contactPoints) {
+ CassandraScanSpec spec = cassandraScanSpec;
+ CassandraPartitionToken token = hostTokenMapping.get(host.getAddress().getHostName());
+
+ return new CassandraSubScanSpec()
+ .setTable(spec.getTable())
+ .setKeyspace(spec.getKeyspace())
+ .setFilter(spec.getFilters())
+ .setHosts(contactPoints)
+ .setPort(storagePluginConfig.getPort())
+ .setStartToken(token != null ? token.getLow() : null)
+ .setEndToken(token != null ? token.getHigh() : null);
+
+ }
+
+ private boolean isNullOrEmpty(byte[] key) {
+ return key == null || key.length == 0;
+ }
+
+ @Override
+ public CassandraSubScan getSpecificScan(int minorFragmentId) {
+ assert minorFragmentId < endpointFragmentMapping.size() : String.format(
+ "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
+ minorFragmentId);
+ return new CassandraSubScan(storagePlugin, storagePluginConfig, endpointFragmentMapping.get(minorFragmentId), columns);
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return -1;
+ }
+
+ @Override
+ public ScanStats getScanStats() {
+ //TODO
+ return ScanStats.TRIVIAL_TABLE;
+ }
+
+ @Override
+ @JsonIgnore
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new CassandraGroupScan(this);
+ }
+
+ @JsonIgnore
+ public CassandraStoragePlugin getStoragePlugin() {
+ return storagePlugin;
+ }
+
+ @JsonIgnore
+ public String getTableName() {
+ return getCassandraScanSpec().getTable();
+ }
+
+ @Override
+ public String getDigest() {
+ return toString();
+ }
+
+ @Override
+ public String toString() {
+ return "CassandraGroupScan [CassandraScanSpec="
+ + cassandraScanSpec + ", columns="
+ + columns + "]";
+ }
+
+ @JsonProperty("storage")
+ public CassandraStoragePluginConfig getStorageConfig() {
+ return this.storagePluginConfig;
+ }
+
+ @JsonProperty
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ @JsonProperty
+ public CassandraScanSpec getCassandraScanSpec() {
+ return cassandraScanSpec;
+ }
+
+ @Override
+ @JsonIgnore
+ public boolean canPushdownProjects(List<SchemaPath> columns) {
+ return true;
+ }
+
+ @JsonIgnore
+ public void setFilterPushedDown(boolean b) {
+ this.filterPushedDown = true;
+ }
+
+ @JsonIgnore
+ public boolean isFilterPushedDown() {
+ return filterPushedDown;
+ }
+
+ /**
+ * Empty constructor, do not use, only for testing.
+ */
+ @VisibleForTesting
+ public CassandraGroupScan() {
+ super((String)null);
+ }
+
+ /**
+ * Do not use, only for testing.
+ */
+ @VisibleForTesting
+ public void setCassandraScanSpec(CassandraScanSpec cassandraScanSpec) {
+ this.cassandraScanSpec = cassandraScanSpec;
+ }
+
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraPartitionToken.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraPartitionToken.java
new file mode 100644
index 0000000..816324e
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraPartitionToken.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+public class CassandraPartitionToken {
+
+ private String low;
+ private String high;
+
+ public String getLow() {
+ return low;
+ }
+
+ public void setLow(String low) {
+ this.low = low;
+ }
+
+ public String getHigh() {
+ return high;
+ }
+
+ public void setHigh(String high) {
+ this.high = high;
+ }
+
+ @Override
+ public String toString() {
+ return "CassandraPartitionToken{" +
+ "low='" + low + '\'' +
+ ", high='" + high + '\'' +
+ '}';
+ }
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraPushDownFilterForScan.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraPushDownFilterForScan.java
new file mode 100644
index 0000000..4b5a295
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraPushDownFilterForScan.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rex.RexNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+public class CassandraPushDownFilterForScan extends StoragePluginOptimizerRule {
+
+ /* Flag to bypass filter pushdown */
+ static final boolean BYPASS_CASSANDRA_FILTER_PUSHDOWN = true;
+
+ public static final StoragePluginOptimizerRule INSTANCE = new CassandraPushDownFilterForScan();
+
+ private CassandraPushDownFilterForScan() {
+ super(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "CassandraPushDownFilterForScan");
+ }
+
+ public CassandraPushDownFilterForScan(RelOptRuleOperand operand, String description) {
+ super(operand, description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ScanPrel scan = (ScanPrel) call.rel(1);
+ final FilterPrel filter = (FilterPrel) call.rel(0);
+ final RexNode condition = filter.getCondition();
+
+ CassandraGroupScan groupScan = (CassandraGroupScan)scan.getGroupScan();
+ if (groupScan.isFilterPushedDown()) {
+
+ /*
+ * The rule can get triggered again due to the transformed "scan => filter" sequence
+ * created by the earlier execution of this rule when we could not do a complete
+ * conversion of Optiq Filter's condition to Cassandra Filter. In such cases, we rely upon
+ * this flag to not do a re-processing of the rule on the already transformed call.
+ */
+ return;
+ }
+
+ LogicalExpression conditionExp = DrillOptiq.toDrill(
+ new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
+ CassandraFilterBuilder cassandraFilterBuilder = new CassandraFilterBuilder(groupScan, conditionExp);
+ CassandraScanSpec newScanSpec = cassandraFilterBuilder.parseTree();
+
+ // TODO : Fix Pushdown
+ if(BYPASS_CASSANDRA_FILTER_PUSHDOWN){
+ return;
+ }
+
+ if (newScanSpec == null) {
+ return; //no filter pushdown ==> No transformation.
+ }
+
+ final CassandraGroupScan newGroupsScan = new CassandraGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns());
+ newGroupsScan.setFilterPushedDown(true);
+
+ final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
+ if (cassandraFilterBuilder.isAllExpressionsConverted()) {
+ /*
+ * Since we could convert the entire filter condition expression into an cassandra filter,
+ * we can eliminate the filter operator altogether.
+ */
+ call.transformTo(newScanPrel);
+ } else {
+ call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of((RelNode)newScanPrel)));
+ }
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final ScanPrel scan = (ScanPrel) call.rel(1);
+ if (scan.getGroupScan() instanceof CassandraGroupScan) {
+ return super.matches(call);
+ }
+ return false;
+ }
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraRecordReader.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraRecordReader.java
new file mode 100644
index 0000000..51fa6ce
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraRecordReader.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.cassandra;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.PathSegment.NameSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.cassandra.connection.CassandraConnectionManager;
+import org.apache.drill.exec.store.ischema.Records;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.RepeatedVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class CassandraRecordReader extends AbstractRecordReader implements DrillCassandraConstants{
+ static final Logger logger = LoggerFactory.getLogger(CassandraRecordReader.class);
+
+ private static final int TARGET_RECORD_COUNT = 3000;
+
+ private Cluster cluster;
+ private Session session;
+ private ResultSet rs;
+ private Iterator<Row> it;
+
+ private NullableVarCharVector valueVector;
+ private OutputMutator outputMutator;
+ private Map<String, MapVector> familyVectorMap;
+
+
+ private String cassandraTableName;
+
+ private CassandraSubScan.CassandraSubScanSpec subScanSpec;
+ private String cassandraKeySpace;
+ private CassandraStoragePluginConfig cassandraConf;
+ private List<SchemaPath> projectedColumns;
+ private boolean allColumnsProjected;
+
+ private NullableVarCharVector vector;
+ private List<ValueVector> vectors = Lists.newArrayList();
+
+ private VarBinaryVector rowKeyVector;
+
+
+ private FragmentContext fragmentContext;
+ private OperatorContext operatorContext;
+
+ public CassandraRecordReader(CassandraStoragePluginConfig conf, CassandraSubScan.CassandraSubScanSpec subScanSpec,
+ List<SchemaPath> projectedColumns, FragmentContext context) {
+ this.cassandraTableName = Preconditions.checkNotNull(subScanSpec, "Cassandra reader needs a sub-scan spec").getTable();
+ this.cassandraKeySpace = Preconditions.checkNotNull(subScanSpec, "Cassandra reader needs a sub-scan spec").getKeyspace();
+ this.subScanSpec = subScanSpec;
+ this.projectedColumns = projectedColumns;
+ this.cassandraConf = conf;
+
+ setColumns(projectedColumns);
+ }
+
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ public void setOperatorContext(OperatorContext operatorContext) {
+ this.operatorContext = operatorContext;
+ }
+
+ @Override
+ protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) {
+ //TODO:
+ return columns;
+ }
+
+
+ @Override
+ public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+ this.outputMutator = output;
+ this.operatorContext = context;
+
+ try{
+ logger.debug("Opening scanner for Cassandra table '{}'.", cassandraTableName);
+
+ List<String> host = subScanSpec.getHosts();
+ int port = subScanSpec.getPort();
+
+ cluster = CassandraConnectionManager.getCluster(host, port);
+ session = cluster.connect();
+
+ List<ColumnMetadata> partitioncols = session.getCluster().getMetadata()
+ .getKeyspace(subScanSpec.getKeyspace()).getTable(subScanSpec.getTable()).getPartitionKey();
+
+ String[] partitionkeys = new String[partitioncols.size()];
+ for(int index = 0; index < partitioncols.size(); index++){
+ partitionkeys[index] = partitioncols.get(index).getName();
+ }
+
+ Statement q = null;
+
+ /* Check projected columns */
+ for(SchemaPath path : getColumns()){
+ if(path.getAsNamePart().getName().equals("*")){
+ allColumnsProjected = true;
+ break;
+ }
+ }
+
+ /* Project only required columns */
+ Select.Where where;
+ Select.Selection select = QueryBuilder.select();
+ if(allColumnsProjected){
+ where = select.all().from(subScanSpec.getKeyspace(), subScanSpec.getTable()).where();
+ }
+ else{
+ for(SchemaPath path : getColumns()){
+ if(path.getAsNamePart().getName().equals("*")){
+ continue;
+ }
+ else{
+ select = select.column(path.getAsNamePart().getName());
+ }
+ }
+ where = select.from(subScanSpec.getKeyspace(), subScanSpec.getTable()).where();
+ }
+
+ if(subScanSpec.getStartToken() != null){
+ where = where.and(QueryBuilder.gte(QueryBuilder.token(partitionkeys), new Long(subScanSpec.getStartToken())));
+ }
+ if(subScanSpec.getEndToken() != null){
+ where = where.and(QueryBuilder.lt(QueryBuilder.token(partitionkeys), new Long(subScanSpec.getEndToken())));
+ }
+
+ q = where;
+ rs = session.execute(q);
+
+ for(SchemaPath column : getColumns()){
+ if(column.getAsNamePart().getName().equals("*") ){
+ Iterator<ColumnDefinitions.Definition> iter = rs.getColumnDefinitions().iterator();
+
+ /* Add all columns to ValueVector */
+ while(iter.hasNext()) {
+ ColumnDefinitions.Definition def = iter.next();
+ MaterializedField field = MaterializedField.create(def.getName(), COLUMN_TYPE);
+ vector = this.outputMutator.addField(field, NullableVarCharVector.class);
+ }
+ }
+ else {
+ MaterializedField field = MaterializedField.create(column.getRootSegment().getPath(), COLUMN_TYPE);
+ vector = this.outputMutator.addField(field, NullableVarCharVector.class);
+ }
+ }
+
+ } catch (SchemaChangeException | IOException e) {
+ throw new ExecutionSetupException("Failure in Cassandra Record Reader setup. Cause: ",e);
+ }
+ }
+
+ @Override
+ public int next() {
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ int rowCount = 0;
+ Row row=null;
+ int start, end, batchsize=0;
+ start = end = -1;
+ try{
+ vectors = Lists.newArrayList();
+ // TODO: Use Batch Size - TARGET_RECORD_COUNT(3000)
+ for (; rs.iterator().hasNext(); rowCount++) {
+
+ if (operatorContext != null) {
+ operatorContext.getStats().startWait();
+ }
+ try {
+ if (rs.iterator().hasNext()) {
+ row = rs.iterator().next();
+ }
+ }finally {
+ if (operatorContext != null) {
+ operatorContext.getStats().stopWait();
+ }
+ }
+ if (row == null) {
+ break;
+ }
+
+ start = end = -1;
+ for (SchemaPath col : getColumns()){
+
+ if(col.getAsNamePart().getName().equals("*") ){
+ /* Add all columns to ValueVector */
+ for(ColumnDefinitions.Definition def : row.getColumnDefinitions()){
+ updateValueVector(row, def.getName(), rowCount);
+ }
+ }
+ else {
+ updateValueVector(row, col.getAsNamePart().getName(), rowCount);
+ }
+ }
+ logger.debug("text scan batch size {}", batchsize);
+ }
+
+ for (ValueVector v : vectors) {
+ v.getMutator().setValueCount(rowCount);
+ }
+ logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), rowCount);
+ return rowCount;
+ } catch (Exception e) {
+ if (operatorContext != null) {
+ operatorContext.getStats().stopWait();
+ }
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ private void updateValueVector(Row row, String colname, int rowCount){
+ try {
+ String val = getAsString(row, colname);
+ int start = 0;
+ int end = val.length();
+
+ MaterializedField field = MaterializedField.create(colname, COLUMN_TYPE);
+ vector = outputMutator.addField(field, NullableVarCharVector.class);
+
+ vector.getMutator().setSafe(rowCount, val.getBytes(), start, end - start);
+ vectors.add(vector);
+ } catch(Exception e){
+ e.printStackTrace();
+
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+
+ @Override
+ public void cleanup() {
+ try {
+ if (session != null) {
+ session.close();
+ }
+ } catch (Exception e) {
+ logger.error("Failure while closing Cassandra table. Error: {}", e.getMessage());
+ throw new DrillRuntimeException(String.format("Failure while closing Cassandra table. Error: %s",e.getMessage()));
+ }
+ }
+
+
+
+ /**
+ * Utility function to get the type of the column and return its String value.
+ * TODO: Convert to appropriate Drill Type.
+ *
+ * @param r
+ * @param colname
+ * @return
+ */
+ public String getAsString(Row r, String colname){
+ String value = null;
+ try {
+ Class clazz = r.getColumnDefinitions().getType(colname).asJavaClass();
+
+ if (clazz.isInstance(Long.MIN_VALUE)) {
+ value = String.valueOf(r.getLong(colname));
+ } else if (clazz.isInstance(Boolean.FALSE)) {
+ value = String.valueOf(r.getBool(colname));
+ } else if (clazz.isInstance(Byte.MIN_VALUE)) {
+ value = String.valueOf(r.getBytes(colname));
+ } else if (clazz.isInstance(new Date())) {
+ value = String.valueOf(r.getDate(colname));
+ } else if (clazz.isInstance(BigDecimal.ZERO)) {
+ value = String.valueOf(r.getDecimal(colname));
+ } else if (clazz.isInstance(Double.MIN_VALUE)) {
+ value = String.valueOf(r.getDouble(colname));
+ } else if (clazz.isInstance(Float.MIN_VALUE)) {
+ value = String.valueOf(r.getFloat(colname));
+ } else if (clazz.isInstance(Integer.MIN_VALUE)) {
+ value = String.valueOf(r.getInt(colname));
+ } else if (clazz.isInstance(new String())) {
+ value = r.getString(colname);
+ } else if (clazz.isInstance(BigInteger.ZERO)) {
+ value = String.valueOf(r.getVarint(colname));
+ } else {
+ value = null;
+ }
+ }
+ catch(Exception e){
+ throw new DrillRuntimeException(String.format("Unable to get Cassandra column: %s, of type: %s."
+ , colname, r.getColumnDefinitions().getType(colname).asJavaClass().getCanonicalName()));
+ }
+ return value;
+ }
+
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraScanBatchCreator.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraScanBatchCreator.java
new file mode 100644
index 0000000..3f9914c
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraScanBatchCreator.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class CassandraScanBatchCreator implements BatchCreator<CassandraSubScan>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CassandraScanBatchCreator.class);
+
+ @Override
+ public ScanBatch getBatch(FragmentContext context, CassandraSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ List<RecordReader> readers = Lists.newArrayList();
+ List<SchemaPath> columns = null;
+ for(CassandraSubScan.CassandraSubScanSpec scanSpec : subScan.getChunkScanSpecList()){
+ try {
+ if ((columns = subScan.getColumns())==null) {
+ columns = GroupScan.ALL_COLUMNS;
+ }
+ readers.add(new CassandraRecordReader(subScan.getCassandraStoragePlugin().getConfig(), scanSpec, columns, context));
+ } catch (Exception e1) {
+ throw new ExecutionSetupException(e1);
+ }
+ }
+ return new ScanBatch(subScan, context, readers.iterator());
+ }
+
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraScanSpec.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraScanSpec.java
new file mode 100644
index 0000000..428a12e
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraScanSpec.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import com.datastax.driver.core.querybuilder.Clause;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public class CassandraScanSpec {
+ private String keyspace;
+ private String table;
+
+ @JsonIgnore
+ private List<Clause> filters;
+
+ @JsonCreator
+ public CassandraScanSpec(@JsonProperty("keyspace") String keyspace,
+ @JsonProperty("table") String table) {
+ this.keyspace = keyspace;
+ this.table = table;
+ }
+
+ public CassandraScanSpec(String keyspace, String table, List<Clause> filters) {
+ this.keyspace = keyspace;
+ this.table = table;
+ this.filters = filters;
+ }
+
+ public String getKeyspace() {
+ return keyspace;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public List<Clause> getFilters() {
+ return filters;
+ }
+
+
+ @Override
+ public String toString() {
+ return "CassandraScanSpec [keyspace=" + keyspace + ", table="
+ + table + ", filters=" + filters + "]";
+ }
+
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraSchemaFactory.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraSchemaFactory.java
new file mode 100644
index 0000000..48d5ce5
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraSchemaFactory.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.TableMetadata;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+
+public class CassandraSchemaFactory implements SchemaFactory {
+
+ static final Logger logger = LoggerFactory.getLogger(CassandraSchemaFactory.class);
+
+ private static final String DATABASES = "keyspaces";
+
+ private LoadingCache<String, List<String>> keyspaceCache;
+ private LoadingCache<String, List<String>> tableCache;
+ private final String schemaName;
+ private final CassandraStoragePlugin plugin;
+
+ private final Cluster cluster;
+
+ public CassandraSchemaFactory(CassandraStoragePlugin schema, String schemaName)
+ throws ExecutionSetupException, UnknownHostException {
+ List<String> hosts = schema.getConfig().getHosts();
+ int port = schema.getConfig().getPort();
+
+ this.plugin = schema;
+ this.schemaName = schemaName;
+
+ Cluster.Builder builder = Cluster.builder();
+ for(String host : hosts){
+ builder = builder.addContactPoint(host);
+ }
+ builder = builder.withPort(port);
+ cluster = builder.build();
+
+ keyspaceCache = CacheBuilder //
+ .newBuilder() //
+ .expireAfterAccess(1, TimeUnit.MINUTES) //
+ .build(new KeyspaceLoader());
+
+ tableCache = CacheBuilder //
+ .newBuilder() //
+ .expireAfterAccess(1, TimeUnit.MINUTES) //
+ .build(new TableNameLoader());
+ }
+
+
+ /**
+ * Utility class for fetching all the key spaces in cluster.
+ */
+ private class KeyspaceLoader extends CacheLoader<String, List<String>> {
+
+ @Override
+ public List<String> load(String key) throws Exception {
+ if (!DATABASES.equals(key)) {
+ throw new UnsupportedOperationException();
+ }
+ List<KeyspaceMetadata> keyspaces = cluster.getMetadata().getKeyspaces();
+ List<String> keys = Lists.newArrayList();
+ for(KeyspaceMetadata k : keyspaces){
+ keys.add(k.getName());
+ }
+ return keys;
+ }
+ }
+
+ /**
+ * Utility class for populating all tables in a provided key space.
+ */
+ private class TableNameLoader extends CacheLoader<String, List<String>> {
+
+ @Override
+ public List<String> load(String keyspace) throws Exception {
+ Collection<TableMetadata> tables = cluster.getMetadata().getKeyspace(keyspace).getTables();
+ List<String> tabs = Lists.newArrayList();
+ for(TableMetadata t : tables){
+ tabs.add(t.getName());
+ }
+ return tabs;
+ }
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ CassandraSchema schema = new CassandraSchema(schemaName);
+ SchemaPlus schemaPlus = parent.add(schemaName, schema);
+ schema.setHolder(schemaPlus);
+ }
+
+ class CassandraSchema extends AbstractSchema {
+
+ public CassandraSchema(String name) {
+ super(ImmutableList.<String> of(), name);
+ }
+
+ @Override
+ public AbstractSchema getSubSchema(String name) {
+ List<String> tables;
+ try {
+ tables = tableCache.get(name);
+ return new CassandraDatabaseSchema(tables, this, name);
+ } catch (ExecutionException e) {
+ logger.warn("Failure while attempting to access Cassandra DataBase '{}'.",
+ name, e.getCause());
+ return null;
+ }
+ }
+
+ void setHolder(SchemaPlus plusOfThis) {
+ for (String s : getSubSchemaNames()) {
+ plusOfThis.add(s, getSubSchema(s));
+ }
+ }
+
+ @Override
+ public boolean showInInformationSchema() {
+ return false;
+ }
+
+ @Override
+ public Set<String> getSubSchemaNames() {
+ try {
+ List<String> dbs = keyspaceCache.get(DATABASES);
+ return Sets.newHashSet(dbs);
+ } catch (ExecutionException e) {
+ logger.warn("Failure while getting Cassandra keyspace list.", e);
+ return Collections.emptySet();
+ }
+ }
+
+ List<String> getTableNames(String dbName) {
+ try {
+ return tableCache.get(dbName);
+ } catch (ExecutionException e) {
+ logger.warn("Failure while loading table names for keyspace '{}'.",
+ dbName, e.getCause());
+ return Collections.emptyList();
+ }
+ }
+
+ DrillTable getDrillTable(String dbName, String tableName) {
+ CassandraScanSpec cassandraScanSpec = new CassandraScanSpec(dbName, tableName);
+ return new DynamicDrillTable(plugin, schemaName, cassandraScanSpec);
+ }
+
+ @Override
+ public String getTypeName() {
+ return CassandraStoragePluginConfig.NAME;
+ }
+ }
+}
\ No newline at end of file
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java
new file mode 100644
index 0000000..2f89d85
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePlugin.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.store.SchemaConfig;
+
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+
+public class CassandraStoragePlugin extends AbstractStoragePlugin {
+ static final Logger logger = LoggerFactory.getLogger(CassandraStoragePlugin.class);
+
+ private DrillbitContext context;
+ private CassandraStoragePluginConfig cassandraConfig;
+ private CassandraSchemaFactory schemaFactory;
+
+ public CassandraStoragePlugin(CassandraStoragePluginConfig cassandraConfig,
+ DrillbitContext context, String name) throws IOException,
+ ExecutionSetupException {
+ this.context = context;
+ this.cassandraConfig = cassandraConfig;
+ this.schemaFactory = new CassandraSchemaFactory(this, name);
+ }
+
+ public DrillbitContext getContext() {
+ return this.context;
+ }
+
+ @Override
+ public CassandraStoragePluginConfig getConfig() {
+ return cassandraConfig;
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection)
+ throws IOException {
+ CassandraScanSpec cassandraScanSpec = selection.getListWith(new ObjectMapper(),
+ new TypeReference<CassandraScanSpec>() {});
+
+ return new CassandraGroupScan(userName, this, cassandraScanSpec, null);
+ }
+
+ public Set<StoragePluginOptimizerRule> getOptimizerRules() {
+ return ImmutableSet.of(CassandraPushDownFilterForScan.INSTANCE);
+ }
+
+ @Override
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ schemaFactory.registerSchemas(schemaConfig, parent);
+ }
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePluginConfig.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePluginConfig.java
new file mode 100644
index 0000000..4dcd160
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraStoragePluginConfig.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+@JsonTypeName(CassandraStoragePluginConfig.NAME)
+public class CassandraStoragePluginConfig extends StoragePluginConfig {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CassandraStoragePluginConfig.class);
+
+ public static final String NAME = "cassandra";
+ public Map<String, Object> config;
+
+ @JsonIgnore
+ private List<String> hosts;
+
+ @JsonIgnore
+ private int port;
+
+ @JsonCreator
+ public CassandraStoragePluginConfig(@JsonProperty("config") Map<String, Object> config) {
+ this.config = config;
+ if(config==null){
+ config = Maps.newHashMap();
+ return;
+ }
+
+ this.hosts = (ArrayList<String>)this.config.get(DrillCassandraConstants.CASSANDRA_CONFIG_HOSTS);
+ this.port = (Integer)this.config.get(DrillCassandraConstants.CASSANDRA_CONFIG_PORT);
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (this == that) {
+ return true;
+ } else if (that == null || getClass() != that.getClass()) {
+ return false;
+ }
+ CassandraStoragePluginConfig thatConfig = (CassandraStoragePluginConfig) that;
+ return (this.hosts.equals(thatConfig.hosts)) && (this.port==thatConfig.port);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return this.hosts != null ? this.hosts.hashCode() : 0;
+ }
+
+ public Map<String, Object> getConfig() {
+ return config;
+ }
+
+ public List<String> getHosts() {
+ return hosts;
+ }
+
+ public int getPort(){ return port; }
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraSubScan.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraSubScan.java
new file mode 100644
index 0000000..699516e
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraSubScan.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.datastax.driver.core.querybuilder.Clause;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+
+@JsonTypeName("cassandra-subscan")
+public class CassandraSubScan extends AbstractBase implements SubScan {
+ static final Logger logger = LoggerFactory.getLogger(CassandraSubScan.class);
+
+ @JsonProperty
+ private final CassandraStoragePluginConfig cassandraPluginConfig;
+ @JsonProperty
+ private final List<SchemaPath> columns;
+ @JsonProperty
+ private final List<CassandraSubScanSpec> chunkScanSpecList;
+
+ @JsonIgnore
+ private final CassandraStoragePlugin cassandraStoragePlugin;
+
+
+ @JsonCreator
+ public CassandraSubScan(
+ @JacksonInject("registry") StoragePluginRegistry registry,
+ @JsonProperty("cassandraPluginConfig") StoragePluginConfig cassandraPluginConfig,
+ @JsonProperty("chunkScanSpecList") LinkedList<CassandraSubScanSpec> chunkScanSpecList,
+ @JsonProperty("columns") List<SchemaPath> columns)
+ throws ExecutionSetupException {
+
+ System.out.println("CassandraSubScan constructor");
+ this.columns = columns;
+ this.cassandraPluginConfig = (CassandraStoragePluginConfig) cassandraPluginConfig;
+ this.cassandraStoragePlugin = (CassandraStoragePlugin) registry
+ .getPlugin(cassandraPluginConfig);
+ this.chunkScanSpecList = chunkScanSpecList;
+ }
+
+ public CassandraSubScan(CassandraStoragePlugin storagePlugin,
+ CassandraStoragePluginConfig storagePluginConfig,
+ List<CassandraSubScanSpec> chunkScanSpecList, List<SchemaPath> columns) {
+ this.cassandraStoragePlugin = storagePlugin;
+ this.cassandraPluginConfig = storagePluginConfig;
+ this.columns = columns;
+ this.chunkScanSpecList = chunkScanSpecList;
+ }
+
+
+ @Override
+ public <T, X, E extends Throwable> T accept(
+ PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ @JsonIgnore
+ public CassandraStoragePluginConfig getCassandraPluginConfig() {
+ return cassandraPluginConfig;
+ }
+
+ @JsonIgnore
+ public CassandraStoragePlugin getCassandraStoragePlugin() {
+ return cassandraStoragePlugin;
+ }
+
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ public List<CassandraSubScanSpec> getChunkScanSpecList() {
+ return chunkScanSpecList;
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
+ throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ return new CassandraSubScan(cassandraStoragePlugin, cassandraPluginConfig,
+ chunkScanSpecList, columns);
+ }
+
+ @Override
+ public int getOperatorType() {
+ /* Copied shamelessly from HBase SubScan */
+ return 1009;
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Iterators.emptyIterator();
+ }
+
+ public static class CassandraSubScanSpec {
+
+ protected String keyspace;
+ protected String table;
+ protected List<String> hosts;
+ protected int port;
+
+ protected String startToken;
+ protected String endToken;
+
+ @JsonIgnore
+ protected List<Clause> filter;
+
+ @JsonCreator
+ public CassandraSubScanSpec(@JsonProperty("keyspace") String keyspace,
+ @JsonProperty("table") String table,
+ @JsonProperty("hosts") List<String> hosts,
+ @JsonProperty("port") int port,
+ @JsonProperty("startToken") String startToken,
+ @JsonProperty("endToken") String endToken
+ ) {
+ this.keyspace = keyspace;
+ this.table = table;
+ this.hosts = hosts;
+ this.port = port;
+ this.startToken = startToken;
+ this.endToken = endToken;
+ this.filter = filter;
+ }
+
+ CassandraSubScanSpec() {
+
+ }
+
+ public String getKeyspace() {
+ return keyspace;
+ }
+
+ public CassandraSubScanSpec setKeyspace(String keyspace) {
+ this.keyspace = keyspace;
+ return this;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public CassandraSubScanSpec setTable(String table) {
+ this.table = table;
+ return this;
+ }
+
+ public List<String> getHosts() {
+ return hosts;
+ }
+
+ public CassandraSubScanSpec setHosts(List<String> hosts) {
+ this.hosts = hosts;
+ return this;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public CassandraSubScanSpec setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public List<Clause> getFilter() {
+ return filter;
+ }
+
+ public CassandraSubScanSpec setFilter(List<Clause> filter) {
+ this.filter = filter;
+ return this;
+ }
+
+ public String getStartToken() {
+ return startToken;
+ }
+
+ public CassandraSubScanSpec setStartToken(String startToken) {
+ this.startToken = startToken;
+ return this;
+ }
+
+ public String getEndToken() {
+ return endToken;
+ }
+
+ public CassandraSubScanSpec setEndToken(String endToken) {
+ this.endToken = endToken;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "CassandraSubScanSpec [keyspace=" + keyspace + ", table="
+ + table + ", host=" + hosts +", port=" + port + ", startToken=" + startToken
+ + ", endToken=" + endToken + ", filter=" + filter + "]";
+ }
+
+ }
+
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraUtil.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraUtil.java
new file mode 100644
index 0000000..2b2ac2c
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/CassandraUtil.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.cassandra;
+
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.text.DecimalFormat;
+
+public class CassandraUtil {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CassandraUtil.class);
+
+
+ public static String[] getPartitionTokens(String partitionTechnique, int numberOfNodes){
+
+ logger.info("Getting partition bounds for nodes. PartitionScheme: {}, Node Count: {}.", partitionTechnique, numberOfNodes);
+
+ switch(partitionTechnique){
+ case "org.apache.cassandra.dht.Murmur3Partitioner":
+ return getMurmur3PartitionTokens(numberOfNodes);
+
+ case "org.apache.cassandra.dht.RandomPartitioner":
+ return getRandomPartitionTokens(numberOfNodes);
+
+ default:
+ logger.error("Cassandra partition scheme {} not supported.", partitionTechnique);
+ return null;
+ }
+ }
+
+ private static String[] getMurmur3PartitionTokens(int numberOfNodes){
+
+ String[] tokens = new String[numberOfNodes];
+
+ BigDecimal two = new BigDecimal(2);
+ BigDecimal nodes = new BigDecimal(numberOfNodes);
+ BigDecimal val = two.pow(64).divide(nodes, RoundingMode.DOWN);
+ BigDecimal twoPow63 = two.pow(63);
+ BigDecimal index;
+
+ for(int i=0; i<numberOfNodes; i++){
+ index = new BigDecimal(i);
+ tokens[i] = index.multiply(val).subtract(twoPow63).toPlainString();
+ }
+ return tokens;
+ }
+
+ private static String[] getRandomPartitionTokens(int numberOfNodes){
+
+ String[] tokens = new String[numberOfNodes];
+
+ BigDecimal two = new BigDecimal(2);
+ BigDecimal nodes = new BigDecimal(numberOfNodes);
+ BigDecimal val = two.pow(127).divide(nodes, RoundingMode.DOWN);
+
+ BigDecimal index;
+
+ for(int i=0; i<numberOfNodes; i++){
+ index = new BigDecimal(i);
+ tokens[i] = index.multiply(val).toPlainString();
+ }
+ return tokens;
+ }
+
+ public static void main(String[] args) {
+
+ String[] out = new CassandraUtil().getPartitionTokens("org.apache.cassandra.dht.Murmur3Partitioner", 5);
+
+ for(String s : out){
+ System.out.println(s);
+ }
+ }
+
+
+
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/DrillCassandraConstants.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/DrillCassandraConstants.java
new file mode 100644
index 0000000..80777ff
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/DrillCassandraConstants.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.cassandra;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.vector.RepeatedVarCharVector;
+
+public interface DrillCassandraConstants {
+
+ public static final TypeProtos.MajorType ROW_KEY_TYPE = Types.required(TypeProtos.MinorType.VARCHAR);
+
+ public static final TypeProtos.MajorType COLUMN_TYPE = Types.optional(TypeProtos.MinorType.VARCHAR);
+
+ public static final String CASSANDRA_CONFIG_HOSTS = "cassandra.hosts";
+
+ public static final String CASSANDRA_CONFIG_PORT = "cassandra.port";
+}
diff --git a/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/connection/CassandraConnectionManager.java b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/connection/CassandraConnectionManager.java
new file mode 100644
index 0000000..04c7e0b
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/java/org/apache/drill/exec/store/cassandra/connection/CassandraConnectionManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.cassandra.connection;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class CassandraConnectionManager{
+
+ private static final Logger logger = LoggerFactory.getLogger(CassandraConnectionManager.class);
+
+ private static Cache<String, Cluster> hostConnectionMap;
+
+ static {
+ hostConnectionMap = CacheBuilder.newBuilder().maximumSize(5)
+ .expireAfterAccess(10, TimeUnit.MINUTES)
+ .removalListener(new AddressCloser()).build();
+ }
+
+ public synchronized static Cluster getCluster(List<String> hosts, int port)
+ throws UnknownHostException {
+ Cluster cluster = hostConnectionMap.getIfPresent(hosts);
+ if (cluster == null || cluster.isClosed()) {
+ Cluster.Builder builder = Cluster.builder();
+ for(String host : hosts){
+ builder = builder.addContactPoints(host);
+ }
+ builder = builder.withPort(port);
+ cluster = builder.build();
+
+ for(String host : hosts) {
+ hostConnectionMap.put(host, cluster);
+ }
+
+ logger.debug("Created connection to {}.", hosts);
+ logger.debug("Number of sessions opened are {}.", hostConnectionMap.size());
+ }
+ return cluster;
+ }
+
+ private static class AddressCloser implements
+ RemovalListener<String, Cluster> {
+ @Override
+ public synchronized void onRemoval(RemovalNotification<String, Cluster> removal) {
+ removal.getValue().close();
+ ;
+ logger.debug("Closed connection to {}.", removal.getKey().toString());
+ }
+ }
+}
\ No newline at end of file
diff --git a/contrib/storage-cassandra/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-cassandra/src/main/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..47966b6
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,12 @@
+{
+ "storage":{
+ cassandra : {
+ type:"cassandra",
+ enabled: false,
+ config : {
+ "cassandra.hosts" : ["127.0.0.1","127.0.0.2"],
+ "cassandra.port" : 9042
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/contrib/storage-cassandra/src/main/resources/checkstyle-suppressions.xml b/contrib/storage-cassandra/src/main/resources/checkstyle-suppressions.xml
new file mode 100644
index 0000000..9d4682b
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/resources/checkstyle-suppressions.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<!DOCTYPE suppressions PUBLIC
+ "-//Puppy Crawl//DTD Suppressions 1.1//EN"
+ "suppressions_1_1.dtd">
+
+<!-- Checkstyle Suppressions for Apache Drill -->
+<suppressions>
+ <suppress files="[\\/]generated-sources[\\/]" checks="AvoidStarImport,NeedBraces"/>
+</suppressions>
diff --git a/contrib/storage-cassandra/src/main/resources/drill-module.conf b/contrib/storage-cassandra/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..3ec4ada
--- /dev/null
+++ b/contrib/storage-cassandra/src/main/resources/drill-module.conf
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// This file tells Drill to consider this module when class path scanning.
+// This file can also include any supplementary configuration information.
+// This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.exec: {
+
+ sys.store.provider: {
+ cassandra : {
+ type:"cassandra",
+ enabled: false,
+ config : {
+ "cassandra.hosts" : ["127.0.0.1"],
+ "cassandra.port" : 9042
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/BaseCassandraTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/BaseCassandraTest.java
new file mode 100644
index 0000000..943e83c
--- /dev/null
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/BaseCassandraTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.cassandra;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.cassandra.CassandraStoragePlugin;
+import org.apache.drill.exec.store.cassandra.CassandraStoragePluginConfig;
+import org.apache.drill.exec.store.cassandra.connection.CassandraConnectionManager;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class BaseCassandraTest extends BaseTestQuery implements CassandraTestConstants {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseCassandraTest.class);
+
+
+ private static final String CASSANDRA_STORAGE_PLUGIN_NAME = "cassandra";
+
+
+ protected static CassandraStoragePlugin storagePlugin;
+ protected static CassandraStoragePluginConfig storagePluginConfig;
+ private static boolean testTablesCreated;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+ storagePlugin = (CassandraStoragePlugin) pluginRegistry.getPlugin(CASSANDRA_STORAGE_PLUGIN_NAME);
+ storagePluginConfig = storagePlugin.getConfig();
+ storagePluginConfig.setEnabled(true);
+ pluginRegistry.createOrUpdate(CASSANDRA_STORAGE_PLUGIN_NAME, storagePluginConfig, true);
+
+ if(!testTablesCreated) {
+ createTestCassandraTableIfNotExists(storagePluginConfig.getHosts(), storagePluginConfig.getPort());
+ testTablesCreated = true;
+ }
+ }
+
+ private static boolean createTestCassandraTableIfNotExists(List<String> host, int port) throws DrillException {
+ try {
+ logger.info("Initiating Cassandra Keyspace/Table for Test cases. Host: {}, Port: {}", host, port);
+ Cluster cluster = CassandraConnectionManager.getCluster(host, port);
+ Session session = cluster.connect();
+
+ /* Create Schema: Keyspace */
+ if(session.getCluster().getMetadata().getKeyspace(KEYSPACE_NAME) == null) {
+ logger.info("Creating Keyspace: {}",KEYSPACE_NAME);
+ session.execute("CREATE KEYSPACE " +KEYSPACE_NAME+ " WITH replication " +
+ "= {'class':'SimpleStrategy', 'replication_factor':3};");
+ }
+
+ /* Create Schema: Table */
+ if(session.getCluster().getMetadata().getKeyspace(KEYSPACE_NAME).getTable(TABLE_NAME) == null) {
+ logger.info("Creating Table [{}] in Keyspace [{}]",TABLE_NAME, KEYSPACE_NAME);
+ session.execute(
+ "CREATE TABLE " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (\n" +
+ " id text,\n" +
+ " pog_rank int,\n" +
+ " pog_id bigint,\n" +
+ " PRIMARY KEY (id, pog_rank)\n" +
+ ");");
+
+ /* Load Data */
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0001', 1, 10001);");
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0005', 1, 10001);");
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0002', 1, 10001);");
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0002', 2, 10001);");
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0002', 3, 10001);");
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0006', 1, 10001);");
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0006', 2, 10001);");
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0004', 1, 10001);");
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0004', 2, 10001);");
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0004', 3, 10002);");
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0004', 4, 10002);");
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0004', 5, 10002);");
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0004', 6, 10002);");
+ session.execute("INSERT INTO " +KEYSPACE_NAME+ "." +TABLE_NAME+ " (id, pog_rank, pog_id) VALUES ('id0003', 1, 10001);");
+ }
+ return true;
+ }
+ catch (Exception e) {
+ throw new DrillException("Failure while Cassandra Test table creation", e);
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+
+ }
+
+ protected String getPlanText(String planFile, String keyspaceName, String tableName) throws IOException {
+ return Files.toString(FileUtils.getResourceAsFile(planFile), Charsets.UTF_8).replace("[TABLE_NAME]", tableName).replace("[KEYSPACE_NAME]", keyspaceName);
+ }
+
+ protected void runCassandraPhysicalVerifyCount(String planFile, String keyspaceName, String tableName, int expectedRowCount) throws Exception{
+ String physicalPlan = getPlanText(planFile, keyspaceName, tableName);
+ List<QueryDataBatch> results = testPhysicalWithResults(physicalPlan);
+ printResultAndVerifyRowCount(results, expectedRowCount);
+ }
+
+ protected List<QueryDataBatch> runCassandraSQLlWithResults(String sql) throws Exception {
+ System.out.println("Running query:\n" + sql);
+ return testSqlWithResults(sql);
+ }
+
+ protected void runCassandraSQLVerifyCount(String sql, int expectedRowCount) throws Exception{
+ List<QueryDataBatch> results = runCassandraSQLlWithResults(sql);
+ printResultAndVerifyRowCount(results, expectedRowCount);
+ }
+
+ private void printResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount) throws SchemaChangeException {
+ int rowCount;
+ rowCount = printResult(results);
+ if (expectedRowCount != -1) {
+ Assert.assertEquals(expectedRowCount, rowCount);
+ }
+ }
+}
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraFilterPushdownTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraFilterPushdownTest.java
new file mode 100644
index 0000000..71238b2
--- /dev/null
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraFilterPushdownTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.cassandra;
+
+import org.junit.Test;
+
+public class CassandraFilterPushdownTest extends BaseCassandraTest implements CassandraTestConstants {
+
+ @Test
+ public void testSelectAll() throws Exception{
+ runCassandraSQLVerifyCount(SELECT_ALL, 14);
+ }
+
+ @Test
+ public void testFilter() throws Exception{
+ runCassandraSQLVerifyCount(SELECT_QUERY_FILTER, 6);
+ }
+
+ @Test
+ public void testFilter1() throws Exception{
+ runCassandraSQLVerifyCount(SELECT_QUERY_FILTER_1, 4);
+ }
+
+ @Test
+ public void testFilter2() throws Exception{
+ runCassandraSQLVerifyCount(SELECT_QUERY_FILTER_2, 4);
+ }
+
+ @Test
+ public void testFilterWithOrCondition() throws Exception{
+ runCassandraSQLVerifyCount(SELECT_QUERY_FILTER_With_OR, 8);
+ }
+
+ @Test
+ public void testFilterWithAndCondition() throws Exception{
+ runCassandraSQLVerifyCount(SELECT_QUERY_FILTER_WITH_AND, 1);
+ }
+
+}
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraProjectPushdownTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraProjectPushdownTest.java
new file mode 100644
index 0000000..d7cdaa3
--- /dev/null
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraProjectPushdownTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.cassandra;
+
+import org.junit.Test;
+
+public class CassandraProjectPushdownTest extends BaseCassandraTest implements CassandraTestConstants{
+
+ @Test
+ public void testProjection() throws Exception{
+ runCassandraSQLVerifyCount(SELECT_QUERY_PROJECT, 14);
+ }
+
+}
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraRecordReaderTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraRecordReaderTest.java
new file mode 100644
index 0000000..7ec1036
--- /dev/null
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraRecordReaderTest.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.cassandra;
+
+import org.junit.Test;
+
+public class CassandraRecordReaderTest extends BaseCassandraTest implements CassandraTestConstants {
+
+ @Test
+ public void testPlanLocal() throws Exception {
+ runCassandraPhysicalVerifyCount(PHYSICAL_PLAN_SCAN, KEYSPACE_NAME, TABLE_NAME, 14);
+ }
+
+ @Test
+ public void testPlanLocalWithColumns() throws Exception {
+ runCassandraPhysicalVerifyCount(PHYSICAL_PLAN_SCAN_WITH_COLS, KEYSPACE_NAME, TABLE_NAME, 14);
+ }
+}
\ No newline at end of file
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTest.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTest.java
new file mode 100644
index 0000000..4f40eeb
--- /dev/null
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.apache.drill.exec.store.cassandra.connection.CassandraConnectionManager;
+
+/**
+ * Created by yash on 15/3/15.
+ */
+public class CassandraTest {
+
+ public static void main(String[] args) {
+ Cluster.Builder builder = Cluster.builder()
+ .addContactPoint("127.0.0.2")
+ .addContactPoint("127.0.0.3")
+ .withPort(9042);
+
+ Cluster cluster = builder.build();
+ Session session = cluster.connect();
+ ResultSet rs = session.execute("SELECT * FROM drilltest.trending_now");
+
+
+ while(!rs.isExhausted()){
+ Row r = rs.one();
+ System.out.println(r.getString(0));
+ System.out.println(r.getInt(1));
+ System.out.println(r.getLong(2));
+ }
+ }
+}
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTestConstants.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTestConstants.java
new file mode 100644
index 0000000..684034d
--- /dev/null
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTestConstants.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.cassandra;
+
+public interface CassandraTestConstants {
+
+ static final String COL_NAME_1 = "id";
+ static final String COL_NAME_2 = "pog_id";
+
+ static final String KEYSPACE_NAME = "drilltest";
+ static final String TABLE_NAME = "trending_now";
+
+ static final String PHYSICAL_PLAN_SCAN = "/cassandra-plans/cassandra_scan_screen_physical.json";
+ static final String PHYSICAL_PLAN_SCAN_WITH_COLS = "/cassandra-plans/cassandra_scan_screen_with_columns_physical.json";
+
+ static final String SELECT_ALL = "SELECT * FROM cassandra."+KEYSPACE_NAME +".`"+ TABLE_NAME +"` t";
+
+ static final String SELECT_QUERY_PROJECT =
+ "SELECT "
+ + COL_NAME_1 +"," + COL_NAME_2
+ + " FROM cassandra."+ KEYSPACE_NAME +".`"+ TABLE_NAME +"` t";
+
+ static final String SELECT_QUERY_FILTER =
+ "SELECT * FROM cassandra."+ KEYSPACE_NAME +".`"+ TABLE_NAME +"` t WHERE "
+ +COL_NAME_1+" = 'id0004'";
+
+ static final String SELECT_QUERY_FILTER_1 =
+ "SELECT * FROM cassandra."+ KEYSPACE_NAME +".`"+ TABLE_NAME +"` t WHERE "
+ +COL_NAME_2+" = 10002";
+
+ static final String SELECT_QUERY_FILTER_2 =
+ "SELECT * FROM cassandra."+ KEYSPACE_NAME +".`"+ TABLE_NAME +"` t WHERE "
+ +COL_NAME_1+" = 'id0004' and " +COL_NAME_2+" = '10002'";
+
+ static final String SELECT_QUERY_FILTER_With_OR =
+ "SELECT * FROM cassandra."+ KEYSPACE_NAME +".`"+ TABLE_NAME +"` t WHERE " +
+ "(" +COL_NAME_1+" = 'id0004' or " +COL_NAME_1+" = 'id0002') and " +
+ "(" +COL_NAME_2+" = '10001' or "+COL_NAME_2+" = '10002') " +
+ "order by " + COL_NAME_2 +" asc, "+ COL_NAME_1 +" desc limit 8";
+
+ static final String SELECT_QUERY_FILTER_WITH_AND =
+ "SELECT * FROM cassandra."+ KEYSPACE_NAME +".`"+ TABLE_NAME +"` t WHERE "
+ +COL_NAME_1+" = 'id0004' and pog_rank = 2";
+
+}
diff --git a/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTestSuite.java b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTestSuite.java
new file mode 100644
index 0000000..38afe97
--- /dev/null
+++ b/contrib/storage-cassandra/src/test/java/org/apache/drill/cassandra/CassandraTestSuite.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.cassandra;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ CassandraRecordReaderTest.class,
+ CassandraProjectPushdownTest.class,
+ CassandraFilterPushdownTest.class
+})
+public class CassandraTestSuite {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CassandraTestSuite.class);
+}
diff --git a/contrib/storage-cassandra/src/test/resources/cassandra-plans/cassandra_scan_screen_physical.json b/contrib/storage-cassandra/src/test/resources/cassandra-plans/cassandra_scan_screen_physical.json
new file mode 100644
index 0000000..9f4e0e3
--- /dev/null
+++ b/contrib/storage-cassandra/src/test/resources/cassandra-plans/cassandra_scan_screen_physical.json
@@ -0,0 +1,30 @@
+{
+ head : {
+ type : "APACHE_DRILL_PHYSICAL",
+ version : 1,
+ generator : {
+ type : "manual"
+ }
+ },
+ graph : [ {
+ pop : "cassandra-scan",
+ @id : 1,
+ cassandraScanSpec : {
+ keyspace : "[KEYSPACE_NAME]",
+ table : "[TABLE_NAME]"
+ },
+ storage:
+ {
+ "type":"cassandra",
+ config : {
+ "cassandra.hosts" : ["127.0.0.1","127.0.0.2"],
+ "cassandra.port" : 9042
+ }
+ }
+ },
+ {
+ pop : "screen",
+ @id : 2,
+ child : 1
+ } ]
+}
\ No newline at end of file
diff --git a/contrib/storage-cassandra/src/test/resources/cassandra-plans/cassandra_scan_screen_with_columns_physical.json b/contrib/storage-cassandra/src/test/resources/cassandra-plans/cassandra_scan_screen_with_columns_physical.json
new file mode 100644
index 0000000..31eac67
--- /dev/null
+++ b/contrib/storage-cassandra/src/test/resources/cassandra-plans/cassandra_scan_screen_with_columns_physical.json
@@ -0,0 +1,33 @@
+{
+ head : {
+ type : "APACHE_DRILL_PHYSICAL",
+ version : 1,
+ generator : {
+ type : "manual"
+ }
+ },
+ graph : [ {
+ pop : "cassandra-scan",
+ @id : 1,
+ cassandraScanSpec : {
+ keyspace : "[KEYSPACE_NAME]",
+ table : "[TABLE_NAME]"
+ },
+ storage:
+ {
+ "type":"cassandra",
+ config : {
+ "cassandra.hosts" : ["127.0.0.1","127.0.0.2","127.0.0.3"],
+ "cassandra.port" : 9042
+ }
+ },
+ columns: [
+ "id", "pog_id"
+ ]
+ },
+ {
+ pop : "screen",
+ @id : 2,
+ child : 1
+ } ]
+}
\ No newline at end of file
diff --git a/distribution/pom.xml b/distribution/pom.xml
index c08efc7..fe3a313 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -157,6 +157,17 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.drill.contrib</groupId>
+ <artifactId>drill-storage-cassandra</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.apache.drill.contrib.storage-hive</groupId>
<artifactId>drill-storage-hive-core</artifactId>
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index 244230f..3a67020 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -92,6 +92,7 @@
<include>org.apache.drill.contrib.data:tpch-sample-data:jar</include>
<include>org.apache.drill.contrib:drill-mongo-storage</include>
<include>org.apache.drill.contrib:drill-storage-hbase</include>
+ <include>org.apache.drill.contrib:drill-storage-cassandra</include>
</includes>
<excludes>
<exclude>org.apache.drill.contrib.storage-hive:drill-storage-hive-core:jar:tests</exclude>
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index d428920..97849de 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -461,8 +461,10 @@ public class BaseTestQuery extends ExecTest {
int rowCount = 0;
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
for(QueryDataBatch result : results) {
- rowCount += result.getHeader().getRowCount();
- loader.load(result.getHeader().getDef(), result.getData());
+ if (result.hasData()) {
+ rowCount += result.getHeader().getRowCount();
+ loader.load(result.getHeader().getDef(), result.getData());
+ }
// TODO: Clean: DRILL-2933: That load(...) no longer throws
// SchemaChangeException, so check/clean throw clause above.
if (loader.getRecordCount() <= 0) {
@yangl
Copy link

yangl commented Jul 7, 2015

good job

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment